diff --git a/Tiltfile b/Tiltfile index 8b1fafb0a..864a649bf 100644 --- a/Tiltfile +++ b/Tiltfile @@ -51,10 +51,26 @@ helm_repo( labels=['Repositories'], ) +########### Certmanager +# Certmanager is required for the validating webhooks in the cortex bundles, so +# we need to deploy it before the bundles. If you don't need the webhooks locally, +# you can disable them in the values.yaml and skip deploying certmanager. +cache_dir = '.tilt/cert-manager' +cert_manager_version = 'v1.19.3' +if not os.path.exists(cache_dir): + local('mkdir -p ' + cache_dir) +if not os.path.exists(cache_dir + '/cert-manager-' + cert_manager_version + '.yaml'): + url = 'https://github.com/cert-manager/cert-manager/releases/download/' + cert_manager_version + '/cert-manager.yaml' + local('curl -L ' + url + ' -o ' + cache_dir + '/cert-manager-' + cert_manager_version + '.yaml') +k8s_yaml(cache_dir + '/cert-manager-' + cert_manager_version + '.yaml') +k8s_resource('cert-manager', labels=['ZZ_Certmanager']) +k8s_resource('cert-manager-webhook', labels=['ZZ_Certmanager']) +k8s_resource('cert-manager-cainjector', labels=['ZZ_Certmanager']) + ########### Dependency CRDs # Make sure the local cluster is running if you are running into startup issues here. url = 'https://raw.githubusercontent.com/cobaltcore-dev/openstack-hypervisor-operator/refs/heads/main/charts/openstack-hypervisor-operator/crds/hypervisor-crd.yaml' -local('curl ' + url + ' | kubectl apply -f -') +local('curl -L ' + url + ' | kubectl apply -f -') ########### Cortex Operator & CRDs docker_build('ghcr.io/cobaltcore-dev/cortex', '.', diff --git a/cmd/main.go b/cmd/main.go index 294d9b242..4e4865567 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -289,26 +289,27 @@ func main() { // The pipeline monitor is a bucket for all metrics produced during the // execution of individual steps (see step monitor below) and the overall // pipeline. - pipelineMonitor := schedulinglib.NewPipelineMonitor() - metrics.Registry.MustRegister(&pipelineMonitor) + // TODO: Only initialize me for scheduling domains that actually use pipelines. + filterWeigherPipelineMonitor := schedulinglib.NewPipelineMonitor() + metrics.Registry.MustRegister(&filterWeigherPipelineMonitor) + detectorPipelineMonitor := schedulinglib.NewDetectorPipelineMonitor() + metrics.Registry.MustRegister(&detectorPipelineMonitor) - if slices.Contains(mainConfig.EnabledControllers, "nova-decisions-pipeline-controller") { - decisionController := &nova.FilterWeigherPipelineController{ - Monitor: pipelineMonitor, + if slices.Contains(mainConfig.EnabledControllers, "nova-pipeline-controllers") { + // Filter-weigher pipeline controller setup. + filterWeigherController := &nova.FilterWeigherPipelineController{ + Monitor: filterWeigherPipelineMonitor, } // Inferred through the base controller. - decisionController.Client = multiclusterClient - if err := (decisionController).SetupWithManager(mgr, multiclusterClient); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "DecisionReconciler") + filterWeigherController.Client = multiclusterClient + if err := filterWeigherController.SetupWithManager(mgr, multiclusterClient); err != nil { + setupLog.Error(err, "unable to create controller", "controller", "nova FilterWeigherPipelineController") os.Exit(1) } httpAPIConf := conf.GetConfigOrDie[nova.HTTPAPIConfig]() - nova.NewAPI(httpAPIConf, decisionController).Init(mux) - } - if slices.Contains(mainConfig.EnabledControllers, "nova-deschedulings-pipeline-controller") { - // Deschedulings controller - monitor := schedulinglib.NewDetectorPipelineMonitor() - metrics.Registry.MustRegister(&monitor) + nova.NewAPI(httpAPIConf, filterWeigherController).Init(mux) + + // Detector pipeline controller setup. novaClient := nova.NewNovaClient() novaClientConfig := conf.GetConfigOrDie[nova.NovaClientConfig]() if err := mgr.Add(manager.RunnableFunc(func(ctx context.Context) error { @@ -317,15 +318,14 @@ func main() { setupLog.Error(err, "unable to initialize nova client") os.Exit(1) } - cycleBreaker := &nova.DetectorCycleBreaker{NovaClient: novaClient} deschedulingsController := &nova.DetectorPipelineController{ - Monitor: monitor, - Breaker: cycleBreaker, + Monitor: detectorPipelineMonitor, + Breaker: &nova.DetectorCycleBreaker{NovaClient: novaClient}, } // Inferred through the base controller. deschedulingsController.Client = multiclusterClient if err := (deschedulingsController).SetupWithManager(mgr, multiclusterClient); err != nil { - setupLog.Error(err, "unable to create controller", "controller", "DeschedulingsReconciler") + setupLog.Error(err, "unable to create controller", "controller", "nova DetectorPipelineController") os.Exit(1) } go deschedulingsController.CreateDeschedulingsPeriodically(ctx) @@ -337,6 +337,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "Cleanup") os.Exit(1) } + + // Webhook that validates all pipelines. + novaPipelineWebhook := nova.NewPipelineWebhook() + if err := novaPipelineWebhook.SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to setup nova pipeline webhook") + os.Exit(1) + } } if slices.Contains(mainConfig.EnabledControllers, "nova-deschedulings-executor") { executorConfig := conf.GetConfigOrDie[nova.DeschedulingsExecutorConfig]() @@ -360,7 +367,7 @@ func main() { } if slices.Contains(mainConfig.EnabledControllers, "manila-decisions-pipeline-controller") { controller := &manila.FilterWeigherPipelineController{ - Monitor: pipelineMonitor, + Monitor: filterWeigherPipelineMonitor, } // Inferred through the base controller. controller.Client = multiclusterClient @@ -369,10 +376,17 @@ func main() { os.Exit(1) } manila.NewAPI(controller).Init(mux) + + // Webhook that validates all pipelines. + manilaPipelineWebhook := manila.NewPipelineWebhook() + if err := manilaPipelineWebhook.SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to setup manila pipeline webhook") + os.Exit(1) + } } if slices.Contains(mainConfig.EnabledControllers, "cinder-decisions-pipeline-controller") { controller := &cinder.FilterWeigherPipelineController{ - Monitor: pipelineMonitor, + Monitor: filterWeigherPipelineMonitor, } // Inferred through the base controller. controller.Client = multiclusterClient @@ -381,10 +395,17 @@ func main() { os.Exit(1) } cinder.NewAPI(controller).Init(mux) + + // Webhook that validates all pipelines. + cinderPipelineWebhook := cinder.NewPipelineWebhook() + if err := cinderPipelineWebhook.SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to setup cinder pipeline webhook") + os.Exit(1) + } } if slices.Contains(mainConfig.EnabledControllers, "ironcore-decisions-pipeline-controller") { controller := &machines.FilterWeigherPipelineController{ - Monitor: pipelineMonitor, + Monitor: filterWeigherPipelineMonitor, } // Inferred through the base controller. controller.Client = multiclusterClient @@ -392,10 +413,17 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "DecisionReconciler") os.Exit(1) } + + // Webhook that validates all pipelines. + ironcorePipelineWebhook := machines.NewPipelineWebhook() + if err := ironcorePipelineWebhook.SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to setup ironcore pipeline webhook") + os.Exit(1) + } } if slices.Contains(mainConfig.EnabledControllers, "pods-decisions-pipeline-controller") { controller := &pods.FilterWeigherPipelineController{ - Monitor: pipelineMonitor, + Monitor: filterWeigherPipelineMonitor, } // Inferred through the base controller. controller.Client = multiclusterClient @@ -403,6 +431,13 @@ func main() { setupLog.Error(err, "unable to create controller", "controller", "DecisionReconciler") os.Exit(1) } + + // Webhook that validates all pipelines. + podsPipelineWebhook := pods.NewPipelineWebhook() + if err := podsPipelineWebhook.SetupWebhookWithManager(mgr); err != nil { + setupLog.Error(err, "unable to setup pods pipeline webhook") + os.Exit(1) + } } if slices.Contains(mainConfig.EnabledControllers, "explanation-controller") { // Setup a controller which will reconcile the history and explanation for diff --git a/helm/bundles/cortex-cinder/values.yaml b/helm/bundles/cortex-cinder/values.yaml index 33f287722..f002fc58b 100644 --- a/helm/bundles/cortex-cinder/values.yaml +++ b/helm/bundles/cortex-cinder/values.yaml @@ -85,6 +85,9 @@ cortex: &cortex cortex-scheduling-controllers: <<: *cortex namePrefix: cortex-cinder-scheduling + # Enable webhook that will validate CRDs for the scheduling controllers. + webhook: {enable: true} + certmanager: {enable: true} # Needed for the webhook TLS certificates. conf: <<: *cortexConf leaderElectionID: cortex-cinder-scheduling diff --git a/helm/bundles/cortex-ironcore/values.yaml b/helm/bundles/cortex-ironcore/values.yaml index 29458a2b3..2f885c7a5 100644 --- a/helm/bundles/cortex-ironcore/values.yaml +++ b/helm/bundles/cortex-ironcore/values.yaml @@ -24,6 +24,9 @@ cortex: crd: {enable: false} # Use this to unambiguate multiple cortex deployments in the same cluster. namePrefix: cortex-ironcore + # Enable webhook that will validate CRDs for the scheduling controllers. + webhook: {enable: true} + certmanager: {enable: true} # Needed for the webhook TLS certificates. conf: # The operator will only touch CRs with this scheduling domain name. schedulingDomain: machines diff --git a/helm/bundles/cortex-manila/values.yaml b/helm/bundles/cortex-manila/values.yaml index cf74e4444..cc341a112 100644 --- a/helm/bundles/cortex-manila/values.yaml +++ b/helm/bundles/cortex-manila/values.yaml @@ -85,6 +85,9 @@ cortex: &cortex cortex-scheduling-controllers: <<: *cortex namePrefix: cortex-manila-scheduling + # Enable webhook that will validate CRDs for the scheduling controllers. + webhook: {enable: true} + certmanager: {enable: true} # Needed for the webhook TLS certificates. conf: <<: *cortexConf leaderElectionID: cortex-manila-scheduling diff --git a/helm/bundles/cortex-nova/values.yaml b/helm/bundles/cortex-nova/values.yaml index 6df34a9df..7ad361369 100644 --- a/helm/bundles/cortex-nova/values.yaml +++ b/helm/bundles/cortex-nova/values.yaml @@ -72,7 +72,6 @@ cortex: &cortex # A centralized ServiceMonitor and metrics service are defined in the cortex-nova chart. metrics: {enable: false} prometheus: {enable: false} - namePrefix: cortex-nova conf: &cortexConf schedulingDomain: nova keystoneSecretRef: @@ -93,6 +92,9 @@ cortex-scheduling-controllers: rbac: # The cortex nova scheduling controllers need hypervisor crd access. hypervisor: {enable: true} + # Enable webhook that will validate CRDs for the scheduling controllers. + webhook: {enable: true} + certmanager: {enable: true} # Needed for the webhook TLS certificates. conf: <<: *cortexConf leaderElectionID: cortex-nova-scheduling @@ -103,8 +105,7 @@ cortex-scheduling-controllers: <<: *cortexMonitoringLabels component: nova-scheduling enabledControllers: - - nova-decisions-pipeline-controller - - nova-deschedulings-pipeline-controller + - nova-pipeline-controllers - nova-deschedulings-executor - explanation-controller enabledTasks: diff --git a/helm/bundles/cortex-pods/values.yaml b/helm/bundles/cortex-pods/values.yaml index 598283003..b7aab8a6d 100644 --- a/helm/bundles/cortex-pods/values.yaml +++ b/helm/bundles/cortex-pods/values.yaml @@ -24,6 +24,9 @@ cortex: crd: { enable: false } # Use this to unambiguate multiple cortex deployments in the same cluster. namePrefix: cortex-pods + # Enable webhook that will validate CRDs for the scheduling controllers. + webhook: {enable: true} + certmanager: {enable: true} # Needed for the webhook TLS certificates. conf: # The operator will only touch CRs with this scheduling domain name. schedulingDomain: pods diff --git a/helm/library/cortex/templates/certmanager/certificate.yaml b/helm/library/cortex/templates/certmanager/certificate.yaml index b0f27c763..c4d20742f 100644 --- a/helm/library/cortex/templates/certmanager/certificate.yaml +++ b/helm/library/cortex/templates/certmanager/certificate.yaml @@ -5,7 +5,7 @@ kind: Issuer metadata: labels: {{- include "chart.labels" . | nindent 4 }} - name: selfsigned-issuer + name: {{ .Values.namePrefix }}-selfsigned-issuer namespace: {{ .Release.Namespace }} spec: selfSigned: {} @@ -30,7 +30,26 @@ spec: - cortex-metrics-service.{{ .Release.Namespace }}.svc issuerRef: kind: Issuer - name: selfsigned-issuer + name: {{ .Values.namePrefix }}-selfsigned-issuer secretName: metrics-server-cert {{- end }} +{{- if .Values.webhook.enable }} +--- +# Certificate for the webhook +apiVersion: cert-manager.io/v1 +kind: Certificate +metadata: + labels: + {{- include "chart.labels" . | nindent 4 }} + name: {{ .Values.namePrefix }}-webhook-cert + namespace: {{ .Release.Namespace }} +spec: + dnsNames: + - {{ .Values.namePrefix }}-webhook-service.{{ .Release.Namespace }}.svc + - {{ .Values.namePrefix }}-webhook-service.{{ .Release.Namespace }}.svc.cluster.local + issuerRef: + kind: Issuer + name: {{ .Values.namePrefix }}-selfsigned-issuer + secretName: {{ .Values.namePrefix }}-webhook-server-cert +{{- end }} {{- end }} diff --git a/helm/library/cortex/templates/manager/manager.yaml b/helm/library/cortex/templates/manager/manager.yaml index bf756c75b..73672164f 100644 --- a/helm/library/cortex/templates/manager/manager.yaml +++ b/helm/library/cortex/templates/manager/manager.yaml @@ -35,6 +35,9 @@ spec: {{- range .Values.controllerManager.container.args }} - {{ . }} {{- end }} + {{- if and .Values.webhook.enable .Values.certmanager.enable }} + - "--webhook-cert-path=/tmp/k8s-webhook-server/serving-certs" + {{- end }} ports: - name: api containerPort: 8080 @@ -42,6 +45,11 @@ spec: - name: metrics containerPort: 2112 protocol: TCP + {{- if .Values.webhook.enable }} + - name: webhook + containerPort: 9443 + protocol: TCP + {{- end }} command: - /manager image: {{ .Values.controllerManager.container.image.repository }}:{{ .Values.controllerManager.container.image.tag | default .Chart.AppVersion }} @@ -74,6 +82,11 @@ spec: mountPath: /tmp/k8s-metrics-server/metrics-certs readOnly: true {{- end }} + {{- if and .Values.webhook.enable .Values.certmanager.enable }} + - name: webhook-certs + mountPath: /tmp/k8s-webhook-server/serving-certs + readOnly: true + {{- end }} securityContext: {{- toYaml .Values.controllerManager.securityContext | nindent 8 }} serviceAccountName: {{ .Values.namePrefix }}-{{ .Values.controllerManager.serviceAccountName }} @@ -91,6 +104,11 @@ spec: secret: secretName: metrics-server-cert {{- end }} + {{- if and .Values.webhook.enable .Values.certmanager.enable }} + - name: webhook-certs + secret: + secretName: {{ .Values.namePrefix }}-webhook-server-cert + {{- end }} --- apiVersion: v1 kind: ConfigMap diff --git a/helm/library/cortex/templates/webhook/validating-webhook-configuration.yaml b/helm/library/cortex/templates/webhook/validating-webhook-configuration.yaml new file mode 100644 index 000000000..14f3f324f --- /dev/null +++ b/helm/library/cortex/templates/webhook/validating-webhook-configuration.yaml @@ -0,0 +1,48 @@ +{{- if .Values.webhook.enable }} +apiVersion: admissionregistration.k8s.io/v1 +kind: ValidatingWebhookConfiguration +metadata: + name: {{ .Values.namePrefix }}-validating-webhook-configuration + labels: + {{- include "chart.labels" . | nindent 4 }} + {{- if .Values.certmanager.enable }} + annotations: + "cert-manager.io/inject-ca-from": {{ .Release.Namespace }}/{{ .Values.namePrefix }}-webhook-cert + {{- end }} +webhooks: + # This webhook validates the creation and update of cortex pipelines. + - name: {{ .Values.namePrefix }}-validate-v1alpha1-pipeline.cortex.cloud + admissionReviewVersions: [v1] + clientConfig: + # If we want to talk to the cortex webhook in another kubernetes cluster, + # we can template this out to use the ingress url provided by cortex. E.g.: + # url: "https:///validate-cortex-cloud-v1alpha1-pipeline" for cross-cluster + service: + name: {{ .Values.namePrefix }}-webhook-service + namespace: {{ .Release.Namespace }} + # This path is managed by controller-runtime. It will route to the + # correct validating webhook handler based on the path. + path: /validate-cortex-cloud-v1alpha1-pipeline + {{- if not .Values.certmanager.enable }} + {{- if .Values.webhook.caBundle }} + caBundle: {{ .Values.webhook.caBundle }} + {{- end }} + {{- end }} + timeoutSeconds: 10 + failurePolicy: Ignore + rules: + - apiGroups: + - cortex.cloud + apiVersions: + - v1alpha1 + operations: + - CREATE + - UPDATE + resources: + - pipelines + sideEffects: None + {{- if .Values.webhook.namespaceSelector }} + namespaceSelector: + {{- toYaml .Values.webhook.namespaceSelector | nindent 6 }} + {{- end }} +{{- end }} \ No newline at end of file diff --git a/helm/library/cortex/templates/webhook/webhook-service.yaml b/helm/library/cortex/templates/webhook/webhook-service.yaml new file mode 100644 index 000000000..4b0c82a2d --- /dev/null +++ b/helm/library/cortex/templates/webhook/webhook-service.yaml @@ -0,0 +1,17 @@ +{{- if .Values.webhook.enable }} +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.namePrefix }}-webhook-service + namespace: {{ .Release.Namespace }} + labels: + {{- include "chart.labels" . | nindent 4 }} +spec: + ports: + - port: 443 + protocol: TCP + targetPort: 9443 + selector: + {{- include "chart.selectorLabels" . | nindent 4 }} + control-plane: controller-manager +{{- end }} \ No newline at end of file diff --git a/helm/library/cortex/values.yaml b/helm/library/cortex/values.yaml index 50a9d48c0..e7a475184 100644 --- a/helm/library/cortex/values.yaml +++ b/helm/library/cortex/values.yaml @@ -89,9 +89,16 @@ prometheus: enable: true # [CERT-MANAGER]: To enable cert-manager injection to webhooks set true +# Cert-manager must be enabled when webhooks are enabled to generate the required +# TLS certificates so kubernetes can talk to the webhook endpoint over HTTPS. certmanager: enable: false +# [WEBHOOK]: To enable validating webhook for Pipeline resources set true +# Note: When enabled, cert-manager is typically also required for TLS certificates +webhook: + enable: false + # [NETWORK POLICIES]: To enable NetworkPolicies set true networkPolicy: enable: false diff --git a/internal/scheduling/cinder/pipeline_webhook.go b/internal/scheduling/cinder/pipeline_webhook.go new file mode 100644 index 000000000..73585116a --- /dev/null +++ b/internal/scheduling/cinder/pipeline_webhook.go @@ -0,0 +1,31 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package cinder + +import ( + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/cinder/plugins/filters" + "github.com/cobaltcore-dev/cortex/internal/scheduling/cinder/plugins/weighers" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" +) + +// Create a new pipeline admission webhook for the cinder scheduling domain, +// using the known filters, weighers and detectors for validation. +func NewPipelineWebhook() lib.PipelineAdmissionWebhook { + validatableFilters := map[string]lib.Validatable{} + for name, constructor := range filters.Index { + validatableFilters[name] = constructor() + } + validatableWeighers := map[string]lib.Validatable{} + for name, constructor := range weighers.Index { + validatableWeighers[name] = constructor() + } + validatableDetectors := map[string]lib.Validatable{} // No detectors for cinder yet. + return lib.PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: validatableFilters, + ValidatableWeighers: validatableWeighers, + ValidatableDetectors: validatableDetectors, + } +} diff --git a/internal/scheduling/lib/detector.go b/internal/scheduling/lib/detector.go index 752c637cd..3b4048727 100644 --- a/internal/scheduling/lib/detector.go +++ b/internal/scheduling/lib/detector.go @@ -12,6 +12,7 @@ import ( "github.com/cobaltcore-dev/cortex/pkg/conf" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -30,20 +31,26 @@ type Detector[DetectionType Detection] interface { // Detect resources such as VMs on their current hosts that should be // considered for descheduling. Run() ([]DetectionType, error) - // Configure the step with a database and options. + + // Initialize the step. Init(ctx context.Context, client client.Client, step v1alpha1.DetectorSpec) error + + // Validate that the given config is valid for this step. This is used in + // the pipeline validation to check if the pipeline configuration is valid + // without actually initializing the step. + Validate(ctx context.Context, params runtime.RawExtension) error } // Common base for all descheduler steps that provides some functionality // that would otherwise be duplicated across all steps. -type BaseDetector[Opts any] struct { +type BaseDetector[Opts DetectionStepOpts] struct { // Options to pass via yaml to this step. conf.JsonOpts[Opts] // The kubernetes client to use. Client client.Client } -// Init the step with the database and options. +// Init the step. func (d *BaseDetector[Opts]) Init(ctx context.Context, client client.Client, step v1alpha1.DetectorSpec) error { d.Client = client @@ -54,6 +61,20 @@ func (d *BaseDetector[Opts]) Init(ctx context.Context, client client.Client, ste return nil } +// Validate that the given config is valid for this step. This is used in +// the pipeline validation to check if the pipeline configuration is valid +// without actually initializing the step. +func (d *BaseDetector[Opts]) Validate(ctx context.Context, params runtime.RawExtension) error { + opts := conf.NewRawOptsBytes(params.Raw) + if err := d.Load(opts); err != nil { + return err + } + if err := d.Options.Validate(); err != nil { + return err + } + return nil +} + // Check if all knowledges are ready, and if not, return an error indicating why not. func (d *BaseDetector[Opts]) CheckKnowledges(ctx context.Context, kns ...corev1.ObjectReference) error { if d.Client == nil { diff --git a/internal/scheduling/lib/detector_monitor.go b/internal/scheduling/lib/detector_monitor.go index cb307d410..0ef05ac2e 100644 --- a/internal/scheduling/lib/detector_monitor.go +++ b/internal/scheduling/lib/detector_monitor.go @@ -8,6 +8,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/prometheus/client_golang/prometheus" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -103,6 +104,11 @@ func (m DetectorMonitor[DetectionType]) Init( return m.step.Init(ctx, client, step) } +// Validate the wrapped step configuration. +func (m DetectorMonitor[DetectionType]) Validate(ctx context.Context, params runtime.RawExtension) error { + return m.step.Validate(ctx, params) +} + // Run the step and measure its execution time. func (m DetectorMonitor[DetectionType]) Run() ([]DetectionType, error) { if m.runTimer != nil { diff --git a/internal/scheduling/lib/detector_monitor_test.go b/internal/scheduling/lib/detector_monitor_test.go index 008d9411f..1e3060e96 100644 --- a/internal/scheduling/lib/detector_monitor_test.go +++ b/internal/scheduling/lib/detector_monitor_test.go @@ -11,6 +11,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/testutil" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" ) @@ -69,11 +70,12 @@ func TestMonitor_Collect(t *testing.T) { } type mockMonitorStep struct { - decisions []mockDetection - initError error - runError error - initCalled bool - runCalled bool + decisions []mockDetection + initError error + validateError error + runError error + initCalled bool + runCalled bool } func (m *mockMonitorStep) Init(ctx context.Context, client client.Client, step v1alpha1.DetectorSpec) error { @@ -81,6 +83,10 @@ func (m *mockMonitorStep) Init(ctx context.Context, client client.Client, step v return m.initError } +func (m *mockMonitorStep) Validate(ctx context.Context, params runtime.RawExtension) error { + return m.validateError +} + func (m *mockMonitorStep) Run() ([]mockDetection, error) { m.runCalled = true return m.decisions, m.runError diff --git a/internal/scheduling/lib/detector_pipeline_test.go b/internal/scheduling/lib/detector_pipeline_test.go index 4d1857f1f..2daac8aa3 100644 --- a/internal/scheduling/lib/detector_pipeline_test.go +++ b/internal/scheduling/lib/detector_pipeline_test.go @@ -17,15 +17,20 @@ import ( // mockDetectorStep implements Detector[mockDetection] type mockDetectorStep struct { - decisions []mockDetection - initErr error - runErr error + decisions []mockDetection + initErr error + validateErr error + runErr error } func (m *mockDetectorStep) Init(ctx context.Context, client client.Client, step v1alpha1.DetectorSpec) error { return m.initErr } +func (m *mockDetectorStep) Validate(ctx context.Context, params runtime.RawExtension) error { + return m.validateErr +} + func (m *mockDetectorStep) Run() ([]mockDetection, error) { return m.decisions, m.runErr } diff --git a/internal/scheduling/lib/detector_step_opts.go b/internal/scheduling/lib/detector_step_opts.go new file mode 100644 index 000000000..53edaa104 --- /dev/null +++ b/internal/scheduling/lib/detector_step_opts.go @@ -0,0 +1,15 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package lib + +// Interface to which step options must conform. +type DetectionStepOpts interface { + // Validate the options for this step. + Validate() error +} + +// Empty step opts conforming to the StepOpts interface (validation always succeeds). +type EmptyDetectionStepOpts struct{} + +func (EmptyDetectionStepOpts) Validate() error { return nil } diff --git a/internal/scheduling/lib/detector_step_opts_test.go b/internal/scheduling/lib/detector_step_opts_test.go new file mode 100644 index 000000000..abd6832a8 --- /dev/null +++ b/internal/scheduling/lib/detector_step_opts_test.go @@ -0,0 +1,21 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package lib + +import ( + "testing" +) + +func TestEmptyDetectionStepOpts_Validate(t *testing.T) { + opts := EmptyDetectionStepOpts{} + err := opts.Validate() + if err != nil { + t.Errorf("expected no error from EmptyDetectionStepOpts.Validate(), got: %v", err) + } +} + +func TestDetectionStepOpts_Interface(t *testing.T) { + // Verify EmptyDetectionStepOpts implements DetectionStepOpts interface + var _ DetectionStepOpts = EmptyDetectionStepOpts{} +} diff --git a/internal/scheduling/lib/detector_test.go b/internal/scheduling/lib/detector_test.go index e80f07a7a..f9d1d49a1 100644 --- a/internal/scheduling/lib/detector_test.go +++ b/internal/scheduling/lib/detector_test.go @@ -4,6 +4,7 @@ package lib import ( + "errors" "strings" "testing" @@ -254,3 +255,66 @@ func TestBaseDetector_CheckKnowledges_NilClient(t *testing.T) { t.Errorf("expected error message about client not initialized, got %q", err.Error()) } } + +func TestBaseDetector_Validate(t *testing.T) { + tests := []struct { + name string + params runtime.RawExtension + expectError bool + }{ + { + name: "valid params", + params: runtime.RawExtension{ + Raw: []byte(`{"option1": "value1", "option2": 2}`), + }, + expectError: false, + }, + { + name: "empty params", + params: runtime.RawExtension{ + Raw: []byte(`{}`), + }, + expectError: false, + }, + { + name: "invalid JSON", + params: runtime.RawExtension{ + Raw: []byte(`{invalid json}`), + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + detector := &BaseDetector[mockDetectorOptions]{} + err := detector.Validate(t.Context(), tt.params) + + if tt.expectError && err == nil { + t.Error("expected error but got nil") + } + if !tt.expectError && err != nil { + t.Errorf("expected no error but got: %v", err) + } + }) + } +} + +// failingDetectorOptions implements DetectionStepOpts and returns an error on Validate. +type failingDetectorOptions struct{} + +func (o failingDetectorOptions) Validate() error { + return errors.New("validation failed") +} + +func TestBaseDetector_Validate_ValidationError(t *testing.T) { + detector := &BaseDetector[failingDetectorOptions]{} + err := detector.Validate(t.Context(), runtime.RawExtension{Raw: []byte(`{}`)}) + + if err == nil { + t.Error("expected error from validation but got nil") + } + if !strings.Contains(err.Error(), "validation failed") { + t.Errorf("expected error message to contain 'validation failed', got %q", err.Error()) + } +} diff --git a/internal/scheduling/lib/filter.go b/internal/scheduling/lib/filter.go index 552a6bd2d..0af80abeb 100644 --- a/internal/scheduling/lib/filter.go +++ b/internal/scheduling/lib/filter.go @@ -7,6 +7,7 @@ import ( "context" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -16,6 +17,9 @@ type Filter[RequestType FilterWeigherPipelineRequest] interface { // Configure the filter and initialize things like a database connection. Init(ctx context.Context, client client.Client, step v1alpha1.FilterSpec) error + + // Validate the given config parameters for this filter. + Validate(ctx context.Context, params runtime.RawExtension) error } // Common base for all steps that provides some functionality diff --git a/internal/scheduling/lib/filter_monitor.go b/internal/scheduling/lib/filter_monitor.go index c60dbe090..db40158bd 100644 --- a/internal/scheduling/lib/filter_monitor.go +++ b/internal/scheduling/lib/filter_monitor.go @@ -8,6 +8,7 @@ import ( "log/slog" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -37,6 +38,11 @@ func (fm *FilterMonitor[RequestType]) Init(ctx context.Context, client client.Cl return fm.filter.Init(ctx, client, step) } +// Validate the wrapped filter. +func (fm *FilterMonitor[RequestType]) Validate(ctx context.Context, params runtime.RawExtension) error { + return fm.filter.Validate(ctx, params) +} + // Run the filter and observe its execution. func (fm *FilterMonitor[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) { return fm.monitor.RunWrapped(traceLog, request, fm.filter) diff --git a/internal/scheduling/lib/filter_test.go b/internal/scheduling/lib/filter_test.go index ac7c4d1d7..aa6354ac9 100644 --- a/internal/scheduling/lib/filter_test.go +++ b/internal/scheduling/lib/filter_test.go @@ -15,8 +15,9 @@ import ( ) type mockFilter[RequestType FilterWeigherPipelineRequest] struct { - InitFunc func(ctx context.Context, client client.Client, step v1alpha1.FilterSpec) error - RunFunc func(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) + InitFunc func(ctx context.Context, client client.Client, step v1alpha1.FilterSpec) error + ValidateFunc func(ctx context.Context, params runtime.RawExtension) error + RunFunc func(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) } func (m *mockFilter[RequestType]) Init(ctx context.Context, client client.Client, step v1alpha1.FilterSpec) error { @@ -25,6 +26,12 @@ func (m *mockFilter[RequestType]) Init(ctx context.Context, client client.Client } return m.InitFunc(ctx, client, step) } +func (m *mockFilter[RequestType]) Validate(ctx context.Context, params runtime.RawExtension) error { + if m.ValidateFunc == nil { + return nil + } + return m.ValidateFunc(ctx, params) +} func (m *mockFilter[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) { if m.RunFunc == nil { return &FilterWeigherPipelineStepResult{}, nil diff --git a/internal/scheduling/lib/filter_validation.go b/internal/scheduling/lib/filter_validation.go index 5fdecb848..27f2ddc4e 100644 --- a/internal/scheduling/lib/filter_validation.go +++ b/internal/scheduling/lib/filter_validation.go @@ -9,6 +9,7 @@ import ( "log/slog" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -24,6 +25,11 @@ func (s *FilterValidator[RequestType]) Init(ctx context.Context, client client.C return s.Filter.Init(ctx, client, step) } +// Validate the wrapped filter. +func (s *FilterValidator[RequestType]) Validate(ctx context.Context, params runtime.RawExtension) error { + return s.Filter.Validate(ctx, params) +} + // Validate the wrapped filter with the database and options. func validateFilter[RequestType FilterWeigherPipelineRequest](filter Filter[RequestType]) *FilterValidator[RequestType] { return &FilterValidator[RequestType]{Filter: filter} diff --git a/internal/scheduling/lib/filter_weigher_pipeline_step.go b/internal/scheduling/lib/filter_weigher_pipeline_step.go index c10de1a6d..59ba998fd 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_step.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_step.go @@ -57,6 +57,20 @@ func (s *BaseFilterWeigherPipelineStep[RequestType, Opts]) Init(ctx context.Cont return nil } +// Validate that the given config is valid for this step. This is used in +// the pipeline validation to check if the pipeline configuration is valid +// without actually initializing the step. +func (s *BaseFilterWeigherPipelineStep[RequestType, Opts]) Validate(ctx context.Context, params runtime.RawExtension) error { + opts := conf.NewRawOptsBytes(params.Raw) + if err := s.Load(opts); err != nil { + return err + } + if err := s.Options.Validate(); err != nil { + return err + } + return nil +} + // Get a default result (no action) for the input weight keys given in the request. // Use this to initialize the result before applying filtering/weighing logic. func (s *BaseFilterWeigherPipelineStep[RequestType, Opts]) IncludeAllHostsFromRequest(request RequestType) *FilterWeigherPipelineStepResult { diff --git a/internal/scheduling/lib/filter_weigher_pipeline_step_test.go b/internal/scheduling/lib/filter_weigher_pipeline_step_test.go index 71de3b8cc..2b045810a 100644 --- a/internal/scheduling/lib/filter_weigher_pipeline_step_test.go +++ b/internal/scheduling/lib/filter_weigher_pipeline_step_test.go @@ -187,3 +187,49 @@ func TestBaseFilterWeigherPipelineStep_PrepareStats(t *testing.T) { }) } } + +func TestBaseFilterWeigherPipelineStep_Validate(t *testing.T) { + tests := []struct { + name string + params runtime.RawExtension + expectError bool + }{ + { + name: "valid params", + params: runtime.RawExtension{ + Raw: []byte(`{}`), + }, + expectError: false, + }, + { + name: "invalid JSON", + params: runtime.RawExtension{ + Raw: []byte(`{invalid json}`), + }, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + step := &BaseFilterWeigherPipelineStep[mockFilterWeigherPipelineRequest, testStepOptions]{} + err := step.Validate(t.Context(), tt.params) + + if tt.expectError && err == nil { + t.Error("expected error but got nil") + } + if !tt.expectError && err != nil { + t.Errorf("expected no error but got: %v", err) + } + }) + } +} + +func TestBaseFilterWeigherPipelineStep_Validate_ValidationError(t *testing.T) { + step := &BaseFilterWeigherPipelineStep[mockFilterWeigherPipelineRequest, failingValidationOptions]{} + err := step.Validate(t.Context(), runtime.RawExtension{Raw: []byte(`{}`)}) + + if err == nil { + t.Error("expected error from validation but got nil") + } +} diff --git a/internal/scheduling/lib/pipeline_webhook.go b/internal/scheduling/lib/pipeline_webhook.go new file mode 100644 index 000000000..80c919070 --- /dev/null +++ b/internal/scheduling/lib/pipeline_webhook.go @@ -0,0 +1,165 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package lib + +import ( + "context" + "fmt" + "strings" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/webhook/admission" +) + +// Validatable is implemented by all pipeline steps (filters, weighers, detectors). +// It allows validation of step parameters without full initialization. +type Validatable interface { + // Validate checks if the given parameters are valid for this step. + Validate(ctx context.Context, params runtime.RawExtension) error +} + +// PipelineAdmissionWebhook validates Pipeline resources for a specific scheduling domain. +// It checks that all configured steps (filters, weighers, detectors) exist in the +// provided indexes and that their parameters are valid. +type PipelineAdmissionWebhook struct { + // The scheduling domain this webhook handles (e.g., nova, cinder, manila). + SchedulingDomain v1alpha1.SchedulingDomain + // ValidatableFilters maps filter names to validatable filter instances. + ValidatableFilters map[string]Validatable + // ValidatableWeighers maps weigher names to validatable weigher instances. + ValidatableWeighers map[string]Validatable + // ValidatableDetectors maps detector names to validatable detector instances. + ValidatableDetectors map[string]Validatable +} + +// ValidateCreate implements admission.Validator. +func (w *PipelineAdmissionWebhook) ValidateCreate( + ctx context.Context, + pipeline *v1alpha1.Pipeline, +) (admission.Warnings, error) { + + return w.validatePipeline(ctx, pipeline) +} + +// ValidateUpdate implements admission.Validator. +func (w *PipelineAdmissionWebhook) ValidateUpdate( + ctx context.Context, + oldPipeline, newPipeline *v1alpha1.Pipeline, +) (admission.Warnings, error) { + + return w.validatePipeline(ctx, newPipeline) +} + +// ValidateDelete implements admission.Validator. +func (w *PipelineAdmissionWebhook) ValidateDelete( + ctx context.Context, + pipeline *v1alpha1.Pipeline, +) (admission.Warnings, error) { + + return nil, nil // No validation needed on delete. +} + +// validatePipeline performs the actual validation logic. +func (w *PipelineAdmissionWebhook) validatePipeline( + ctx context.Context, + pipeline *v1alpha1.Pipeline, +) (admission.Warnings, error) { + + log := ctrl.Log.WithName("pipeline-webhook") + + // Check if this pipeline's scheduling domain matches our webhook's domain. + // If not, skip validation and allow the request - another webhook should handle it. + if pipeline.Spec.SchedulingDomain != w.SchedulingDomain { + log.V(1).Info("skipping validation for pipeline with different scheduling domain", + "pipeline", pipeline.Name, + "pipelineDomain", pipeline.Spec.SchedulingDomain, + "webhookDomain", w.SchedulingDomain) + return nil, nil + } + + log.Info("validating pipeline", "pipelineName", pipeline.Name, "pipelineType", pipeline.Spec.Type) + + var errMsgs []string + + // Validate based on pipeline type + switch pipeline.Spec.Type { + case v1alpha1.PipelineTypeFilterWeigher: + // Check there are no detectors configured, + // as they are not allowed in a filter/weigher pipeline. + if len(pipeline.Spec.Detectors) > 0 { + errMsgs = append(errMsgs, "detectors are not allowed in a filter/weigher pipeline") + } + for _, filterSpec := range pipeline.Spec.Filters { + if err := w.validateFilter(ctx, filterSpec); err != nil { + errMsgs = append(errMsgs, fmt.Sprintf("filter %q: %v", filterSpec.Name, err)) + } + } + for _, weigherSpec := range pipeline.Spec.Weighers { + if err := w.validateWeigher(ctx, weigherSpec); err != nil { + errMsgs = append(errMsgs, fmt.Sprintf("weigher %q: %v", weigherSpec.Name, err)) + } + } + case v1alpha1.PipelineTypeDetector: + // Check there are no filters or weighers configured, + // as they are not allowed in a detector pipeline. + if len(pipeline.Spec.Filters) > 0 { + errMsgs = append(errMsgs, "filters are not allowed in a detector pipeline") + } + if len(pipeline.Spec.Weighers) > 0 { + errMsgs = append(errMsgs, "weighers are not allowed in a detector pipeline") + } + for _, detectorSpec := range pipeline.Spec.Detectors { + if err := w.validateDetector(ctx, detectorSpec); err != nil { + errMsgs = append(errMsgs, fmt.Sprintf("detector %q: %v", detectorSpec.Name, err)) + } + } + default: + errMsgs = append(errMsgs, fmt.Sprintf("unknown pipeline type: %s", pipeline.Spec.Type)) + } + + if len(errMsgs) > 0 { + return nil, fmt.Errorf("pipeline is invalid: %s", strings.Join(errMsgs, "; ")) + } + + return nil, nil +} + +// validateFilter validates a single filter specification. +func (w *PipelineAdmissionWebhook) validateFilter(ctx context.Context, spec v1alpha1.FilterSpec) error { + filter, ok := w.ValidatableFilters[spec.Name] + if !ok { + return fmt.Errorf("unknown filter: %q", spec.Name) + } + return filter.Validate(ctx, spec.Params) +} + +// validateWeigher validates a single weigher specification. +func (w *PipelineAdmissionWebhook) validateWeigher(ctx context.Context, spec v1alpha1.WeigherSpec) error { + weigher, ok := w.ValidatableWeighers[spec.Name] + if !ok { + return fmt.Errorf("unknown weigher: %q", spec.Name) + } + return weigher.Validate(ctx, spec.Params) +} + +// validateDetector validates a single detector specification. +func (w *PipelineAdmissionWebhook) validateDetector(ctx context.Context, spec v1alpha1.DetectorSpec) error { + detector, ok := w.ValidatableDetectors[spec.Name] + if !ok { + return fmt.Errorf("unknown detector: %q", spec.Name) + } + return detector.Validate(ctx, spec.Params) +} + +// SetupWebhookWithManager sets up the validating webhook for Pipeline resources. +func (w *PipelineAdmissionWebhook) SetupWebhookWithManager(mgr ctrl.Manager) error { + log := ctrl.Log.WithName("pipeline-webhook-setup") + log.Info("setting up validating webhook for pipelines", + "schedulingDomain", w.SchedulingDomain) + return ctrl.NewWebhookManagedBy(mgr, &v1alpha1.Pipeline{}). + WithValidator(w). + Complete() +} diff --git a/internal/scheduling/lib/pipeline_webhook_test.go b/internal/scheduling/lib/pipeline_webhook_test.go new file mode 100644 index 000000000..33f94bb33 --- /dev/null +++ b/internal/scheduling/lib/pipeline_webhook_test.go @@ -0,0 +1,505 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package lib + +import ( + "context" + "errors" + "strings" + "testing" + + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +// mockValidatable implements Validatable for testing. +type mockValidatable struct { + ValidateFunc func(ctx context.Context, params runtime.RawExtension) error +} + +func (m *mockValidatable) Validate(ctx context.Context, params runtime.RawExtension) error { + if m.ValidateFunc == nil { + return nil + } + return m.ValidateFunc(ctx, params) +} + +func TestPipelineAdmissionWebhook_ValidateCreate_FilterWeigherPipeline(t *testing.T) { + tests := []struct { + name string + pipeline *v1alpha1.Pipeline + filters map[string]Validatable + weighers map[string]Validatable + detectors map[string]Validatable + expectError bool + errorMsg string + }{ + { + name: "valid filter-weigher pipeline with known filter and weigher", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + Filters: []v1alpha1.FilterSpec{ + {Name: "filter1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + Weighers: []v1alpha1.WeigherSpec{ + {Name: "weigher1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{ + "filter1": &mockValidatable{}, + }, + weighers: map[string]Validatable{ + "weigher1": &mockValidatable{}, + }, + detectors: map[string]Validatable{}, + expectError: false, + }, + { + name: "invalid filter-weigher pipeline with unknown filter", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + Filters: []v1alpha1.FilterSpec{ + {Name: "unknown-filter", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{}, + weighers: map[string]Validatable{}, + detectors: map[string]Validatable{}, + expectError: true, + errorMsg: "unknown filter", + }, + { + name: "invalid filter-weigher pipeline with unknown weigher", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + Weighers: []v1alpha1.WeigherSpec{ + {Name: "unknown-weigher", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{}, + weighers: map[string]Validatable{}, + detectors: map[string]Validatable{}, + expectError: true, + errorMsg: "unknown weigher", + }, + { + name: "invalid filter-weigher pipeline with detectors", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + Detectors: []v1alpha1.DetectorSpec{ + {Name: "detector1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{}, + weighers: map[string]Validatable{}, + detectors: map[string]Validatable{}, + expectError: true, + errorMsg: "detectors are not allowed in a filter/weigher pipeline", + }, + { + name: "filter validation error", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + Filters: []v1alpha1.FilterSpec{ + {Name: "filter1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{ + "filter1": &mockValidatable{ + ValidateFunc: func(ctx context.Context, params runtime.RawExtension) error { + return errors.New("filter validation failed") + }, + }, + }, + weighers: map[string]Validatable{}, + detectors: map[string]Validatable{}, + expectError: true, + errorMsg: "filter validation failed", + }, + { + name: "weigher validation error", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + Weighers: []v1alpha1.WeigherSpec{ + {Name: "weigher1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{}, + weighers: map[string]Validatable{ + "weigher1": &mockValidatable{ + ValidateFunc: func(ctx context.Context, params runtime.RawExtension) error { + return errors.New("weigher validation failed") + }, + }, + }, + detectors: map[string]Validatable{}, + expectError: true, + errorMsg: "weigher validation failed", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + webhook := &PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: tt.filters, + ValidatableWeighers: tt.weighers, + ValidatableDetectors: tt.detectors, + } + + _, err := webhook.ValidateCreate(t.Context(), tt.pipeline) + + if tt.expectError && err == nil { + t.Error("expected error but got nil") + } + if !tt.expectError && err != nil { + t.Errorf("expected no error but got: %v", err) + } + if tt.expectError && err != nil && tt.errorMsg != "" { + if !strings.Contains(err.Error(), tt.errorMsg) { + t.Errorf("expected error message to contain %q, got %q", tt.errorMsg, err.Error()) + } + } + }) + } +} + +func TestPipelineAdmissionWebhook_ValidateCreate_DetectorPipeline(t *testing.T) { + tests := []struct { + name string + pipeline *v1alpha1.Pipeline + filters map[string]Validatable + weighers map[string]Validatable + detectors map[string]Validatable + expectError bool + errorMsg string + }{ + { + name: "valid detector pipeline with known detector", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeDetector, + Detectors: []v1alpha1.DetectorSpec{ + {Name: "detector1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{}, + weighers: map[string]Validatable{}, + detectors: map[string]Validatable{ + "detector1": &mockValidatable{}, + }, + expectError: false, + }, + { + name: "invalid detector pipeline with unknown detector", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeDetector, + Detectors: []v1alpha1.DetectorSpec{ + {Name: "unknown-detector", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{}, + weighers: map[string]Validatable{}, + detectors: map[string]Validatable{}, + expectError: true, + errorMsg: "unknown detector", + }, + { + name: "invalid detector pipeline with filters", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeDetector, + Filters: []v1alpha1.FilterSpec{ + {Name: "filter1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{}, + weighers: map[string]Validatable{}, + detectors: map[string]Validatable{}, + expectError: true, + errorMsg: "filters are not allowed in a detector pipeline", + }, + { + name: "invalid detector pipeline with weighers", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeDetector, + Weighers: []v1alpha1.WeigherSpec{ + {Name: "weigher1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{}, + weighers: map[string]Validatable{}, + detectors: map[string]Validatable{}, + expectError: true, + errorMsg: "weighers are not allowed in a detector pipeline", + }, + { + name: "detector validation error", + pipeline: &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeDetector, + Detectors: []v1alpha1.DetectorSpec{ + {Name: "detector1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + }, + filters: map[string]Validatable{}, + weighers: map[string]Validatable{}, + detectors: map[string]Validatable{ + "detector1": &mockValidatable{ + ValidateFunc: func(ctx context.Context, params runtime.RawExtension) error { + return errors.New("detector validation failed") + }, + }, + }, + expectError: true, + errorMsg: "detector validation failed", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + webhook := &PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: tt.filters, + ValidatableWeighers: tt.weighers, + ValidatableDetectors: tt.detectors, + } + + _, err := webhook.ValidateCreate(t.Context(), tt.pipeline) + + if tt.expectError && err == nil { + t.Error("expected error but got nil") + } + if !tt.expectError && err != nil { + t.Errorf("expected no error but got: %v", err) + } + if tt.expectError && err != nil && tt.errorMsg != "" { + if !strings.Contains(err.Error(), tt.errorMsg) { + t.Errorf("expected error message to contain %q, got %q", tt.errorMsg, err.Error()) + } + } + }) + } +} + +func TestPipelineAdmissionWebhook_ValidateCreate_DifferentSchedulingDomain(t *testing.T) { + webhook := &PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: map[string]Validatable{}, + ValidatableWeighers: map[string]Validatable{}, + ValidatableDetectors: map[string]Validatable{}, + } + + // Pipeline for a different scheduling domain should be skipped (no validation error) + pipeline := &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainCinder, // Different domain + Type: v1alpha1.PipelineTypeFilterWeigher, + Filters: []v1alpha1.FilterSpec{ + {Name: "unknown-filter", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + } + + _, err := webhook.ValidateCreate(t.Context(), pipeline) + + if err != nil { + t.Errorf("expected no error for different scheduling domain, got: %v", err) + } +} + +func TestPipelineAdmissionWebhook_ValidateCreate_UnknownPipelineType(t *testing.T) { + webhook := &PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: map[string]Validatable{}, + ValidatableWeighers: map[string]Validatable{}, + ValidatableDetectors: map[string]Validatable{}, + } + + pipeline := &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: "unknown-type", + }, + } + + _, err := webhook.ValidateCreate(t.Context(), pipeline) + + if err == nil { + t.Error("expected error for unknown pipeline type, got nil") + } + if !strings.Contains(err.Error(), "unknown pipeline type") { + t.Errorf("expected error message to contain 'unknown pipeline type', got %q", err.Error()) + } +} + +func TestPipelineAdmissionWebhook_ValidateUpdate(t *testing.T) { + webhook := &PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: map[string]Validatable{ + "filter1": &mockValidatable{}, + }, + ValidatableWeighers: map[string]Validatable{}, + ValidatableDetectors: map[string]Validatable{}, + } + + oldPipeline := &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + }, + } + + newPipeline := &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + Filters: []v1alpha1.FilterSpec{ + {Name: "filter1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + } + + _, err := webhook.ValidateUpdate(t.Context(), oldPipeline, newPipeline) + + if err != nil { + t.Errorf("expected no error, got: %v", err) + } +} + +func TestPipelineAdmissionWebhook_ValidateDelete(t *testing.T) { + webhook := &PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: map[string]Validatable{}, + ValidatableWeighers: map[string]Validatable{}, + ValidatableDetectors: map[string]Validatable{}, + } + + pipeline := &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + }, + } + + _, err := webhook.ValidateDelete(t.Context(), pipeline) + + if err != nil { + t.Errorf("expected no error for delete, got: %v", err) + } +} + +func TestPipelineAdmissionWebhook_MultipleValidationErrors(t *testing.T) { + webhook := &PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: map[string]Validatable{}, + ValidatableWeighers: map[string]Validatable{}, + ValidatableDetectors: map[string]Validatable{}, + } + + // Pipeline with multiple unknown filters and weighers + pipeline := &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + Filters: []v1alpha1.FilterSpec{ + {Name: "unknown-filter1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + {Name: "unknown-filter2", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + Weighers: []v1alpha1.WeigherSpec{ + {Name: "unknown-weigher1", Params: runtime.RawExtension{Raw: []byte(`{}`)}}, + }, + }, + } + + _, err := webhook.ValidateCreate(t.Context(), pipeline) + + if err == nil { + t.Error("expected error for unknown filters and weighers, got nil") + } + + // Check that multiple errors are reported + errStr := err.Error() + if !strings.Contains(errStr, "unknown-filter1") { + t.Error("expected error to contain 'unknown-filter1'") + } + if !strings.Contains(errStr, "unknown-filter2") { + t.Error("expected error to contain 'unknown-filter2'") + } + if !strings.Contains(errStr, "unknown-weigher1") { + t.Error("expected error to contain 'unknown-weigher1'") + } +} + +func TestPipelineAdmissionWebhook_EmptyPipeline(t *testing.T) { + webhook := &PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: map[string]Validatable{}, + ValidatableWeighers: map[string]Validatable{}, + ValidatableDetectors: map[string]Validatable{}, + } + + // Empty filter-weigher pipeline (no filters, no weighers) should be valid + pipeline := &v1alpha1.Pipeline{ + ObjectMeta: metav1.ObjectMeta{Name: "test-pipeline"}, + Spec: v1alpha1.PipelineSpec{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + Type: v1alpha1.PipelineTypeFilterWeigher, + }, + } + + _, err := webhook.ValidateCreate(t.Context(), pipeline) + + if err != nil { + t.Errorf("expected no error for empty pipeline, got: %v", err) + } +} diff --git a/internal/scheduling/lib/weigher.go b/internal/scheduling/lib/weigher.go index ef3d213b3..2501e2249 100644 --- a/internal/scheduling/lib/weigher.go +++ b/internal/scheduling/lib/weigher.go @@ -11,6 +11,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/v1alpha1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -20,6 +21,9 @@ type Weigher[RequestType FilterWeigherPipelineRequest] interface { // Configure the step and initialize things like a database connection. Init(ctx context.Context, client client.Client, step v1alpha1.WeigherSpec) error + + // Validate the given config parameters for this weigher. + Validate(ctx context.Context, params runtime.RawExtension) error } // Common base for all steps that provides some functionality @@ -33,6 +37,11 @@ func (s *BaseWeigher[RequestType, Opts]) Init(ctx context.Context, client client return s.BaseFilterWeigherPipelineStep.Init(ctx, client, step.Params) } +// Validate the weigher. +func (s *BaseWeigher[RequestType, Opts]) Validate(ctx context.Context, params runtime.RawExtension) error { + return s.BaseFilterWeigherPipelineStep.Validate(ctx, params) +} + // Check if all knowledges are ready, and if not, return an error indicating why not. func (d *BaseFilterWeigherPipelineStep[RequestType, Opts]) CheckKnowledges(ctx context.Context, kns ...corev1.ObjectReference) error { if d.Client == nil { diff --git a/internal/scheduling/lib/weigher_monitor.go b/internal/scheduling/lib/weigher_monitor.go index e777a2e33..f190ff1d8 100644 --- a/internal/scheduling/lib/weigher_monitor.go +++ b/internal/scheduling/lib/weigher_monitor.go @@ -8,6 +8,7 @@ import ( "log/slog" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -37,6 +38,11 @@ func (wm *WeigherMonitor[RequestType]) Init(ctx context.Context, client client.C return wm.weigher.Init(ctx, client, step) } +// Validate the wrapped weigher. +func (wm *WeigherMonitor[RequestType]) Validate(ctx context.Context, params runtime.RawExtension) error { + return wm.weigher.Validate(ctx, params) +} + // Run the weigher and observe its execution. func (wm *WeigherMonitor[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) { return wm.monitor.RunWrapped(traceLog, request, wm.weigher) diff --git a/internal/scheduling/lib/weigher_test.go b/internal/scheduling/lib/weigher_test.go index b1db3ec58..dc0187495 100644 --- a/internal/scheduling/lib/weigher_test.go +++ b/internal/scheduling/lib/weigher_test.go @@ -17,8 +17,9 @@ import ( ) type mockWeigher[RequestType FilterWeigherPipelineRequest] struct { - InitFunc func(ctx context.Context, client client.Client, step v1alpha1.WeigherSpec) error - RunFunc func(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) + InitFunc func(ctx context.Context, client client.Client, step v1alpha1.WeigherSpec) error + ValidateFunc func(ctx context.Context, params runtime.RawExtension) error + RunFunc func(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) } func (m *mockWeigher[RequestType]) Init(ctx context.Context, client client.Client, step v1alpha1.WeigherSpec) error { @@ -27,6 +28,12 @@ func (m *mockWeigher[RequestType]) Init(ctx context.Context, client client.Clien } return m.InitFunc(ctx, client, step) } +func (m *mockWeigher[RequestType]) Validate(ctx context.Context, params runtime.RawExtension) error { + if m.ValidateFunc == nil { + return nil + } + return m.ValidateFunc(ctx, params) +} func (m *mockWeigher[RequestType]) Run(traceLog *slog.Logger, request RequestType) (*FilterWeigherPipelineStepResult, error) { if m.RunFunc == nil { return &FilterWeigherPipelineStepResult{}, nil diff --git a/internal/scheduling/lib/weigher_validation.go b/internal/scheduling/lib/weigher_validation.go index 3df01c264..f9b7020c0 100644 --- a/internal/scheduling/lib/weigher_validation.go +++ b/internal/scheduling/lib/weigher_validation.go @@ -9,6 +9,7 @@ import ( "log/slog" "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -24,6 +25,11 @@ func (s *WeigherValidator[RequestType]) Init(ctx context.Context, client client. return s.Weigher.Init(ctx, client, step) } +// Validate the wrapped weigher. +func (s *WeigherValidator[RequestType]) Validate(ctx context.Context, params runtime.RawExtension) error { + return s.Weigher.Validate(ctx, params) +} + // Validate the wrapped weigher with the database and options. func validateWeigher[RequestType FilterWeigherPipelineRequest](weigher Weigher[RequestType]) *WeigherValidator[RequestType] { return &WeigherValidator[RequestType]{Weigher: weigher} diff --git a/internal/scheduling/machines/pipeline_webhook.go b/internal/scheduling/machines/pipeline_webhook.go new file mode 100644 index 000000000..864ca13b2 --- /dev/null +++ b/internal/scheduling/machines/pipeline_webhook.go @@ -0,0 +1,31 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package machines + +import ( + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + "github.com/cobaltcore-dev/cortex/internal/scheduling/machines/plugins/filters" + "github.com/cobaltcore-dev/cortex/internal/scheduling/machines/plugins/weighers" +) + +// Create a new pipeline admission webhook for the machines scheduling domain, +// using the known filters, weighers and detectors for validation. +func NewPipelineWebhook() lib.PipelineAdmissionWebhook { + validatableFilters := map[string]lib.Validatable{} + for name, constructor := range filters.Index { + validatableFilters[name] = constructor() + } + validatableWeighers := map[string]lib.Validatable{} + for name, constructor := range weighers.Index { + validatableWeighers[name] = constructor() + } + validatableDetectors := map[string]lib.Validatable{} // No detectors for machines yet. + return lib.PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainMachines, + ValidatableFilters: validatableFilters, + ValidatableWeighers: validatableWeighers, + ValidatableDetectors: validatableDetectors, + } +} diff --git a/internal/scheduling/machines/plugins/filters/filter_noop.go b/internal/scheduling/machines/plugins/filters/filter_noop.go index 27c79e0e0..2f1f9d816 100644 --- a/internal/scheduling/machines/plugins/filters/filter_noop.go +++ b/internal/scheduling/machines/plugins/filters/filter_noop.go @@ -10,6 +10,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/external/ironcore" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -22,6 +23,10 @@ func (f *NoopFilter) Init(ctx context.Context, client client.Client, filter v1al return nil } +func (f *NoopFilter) Validate(ctx context.Context, params runtime.RawExtension) error { + return nil +} + // Run this step of the scheduling pipeline. // Return a map of keys to activation values. Important: keys that are // not in the map are considered as filtered out. diff --git a/internal/scheduling/manila/pipeline_webhook.go b/internal/scheduling/manila/pipeline_webhook.go new file mode 100644 index 000000000..4a1d349d0 --- /dev/null +++ b/internal/scheduling/manila/pipeline_webhook.go @@ -0,0 +1,31 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package manila + +import ( + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + "github.com/cobaltcore-dev/cortex/internal/scheduling/manila/plugins/filters" + "github.com/cobaltcore-dev/cortex/internal/scheduling/manila/plugins/weighers" +) + +// Create a new pipeline admission webhook for the manila scheduling domain, +// using the known filters, weighers and detectors for validation. +func NewPipelineWebhook() lib.PipelineAdmissionWebhook { + validatableFilters := map[string]lib.Validatable{} + for name, constructor := range filters.Index { + validatableFilters[name] = constructor() + } + validatableWeighers := map[string]lib.Validatable{} + for name, constructor := range weighers.Index { + validatableWeighers[name] = constructor() + } + validatableDetectors := map[string]lib.Validatable{} // No detectors for manila yet. + return lib.PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainManila, + ValidatableFilters: validatableFilters, + ValidatableWeighers: validatableWeighers, + ValidatableDetectors: validatableDetectors, + } +} diff --git a/internal/scheduling/nova/detector_pipeline_controller_test.go b/internal/scheduling/nova/detector_pipeline_controller_test.go index e3b127b7e..88c7304df 100644 --- a/internal/scheduling/nova/detector_pipeline_controller_test.go +++ b/internal/scheduling/nova/detector_pipeline_controller_test.go @@ -28,6 +28,9 @@ type mockControllerStep struct{} func (m *mockControllerStep) Run() ([]plugins.VMDetection, error) { return nil, nil } +func (m *mockControllerStep) Validate(ctx context.Context, params runtime.RawExtension) error { + return nil +} func (m *mockControllerStep) Init(ctx context.Context, client client.Client, step v1alpha1.DetectorSpec) error { return nil } diff --git a/internal/scheduling/nova/pipeline_webhook.go b/internal/scheduling/nova/pipeline_webhook.go new file mode 100644 index 000000000..b32abaa65 --- /dev/null +++ b/internal/scheduling/nova/pipeline_webhook.go @@ -0,0 +1,35 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package nova + +import ( + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + "github.com/cobaltcore-dev/cortex/internal/scheduling/nova/plugins/detectors" + "github.com/cobaltcore-dev/cortex/internal/scheduling/nova/plugins/filters" + "github.com/cobaltcore-dev/cortex/internal/scheduling/nova/plugins/weighers" +) + +// Create a new pipeline admission webhook for the nova scheduling domain, +// using the known filters, weighers and detectors for validation. +func NewPipelineWebhook() lib.PipelineAdmissionWebhook { + validatableFilters := map[string]lib.Validatable{} + for name, constructor := range filters.Index { + validatableFilters[name] = constructor() + } + validatableWeighers := map[string]lib.Validatable{} + for name, constructor := range weighers.Index { + validatableWeighers[name] = constructor() + } + validatableDetectors := map[string]lib.Validatable{} + for name, detector := range detectors.Index { + validatableDetectors[name] = detector + } + return lib.PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainNova, + ValidatableFilters: validatableFilters, + ValidatableWeighers: validatableWeighers, + ValidatableDetectors: validatableDetectors, + } +} diff --git a/internal/scheduling/nova/plugins/detectors/avoid_high_steal_pct.go b/internal/scheduling/nova/plugins/detectors/avoid_high_steal_pct.go index dae180c5a..2921daf1c 100644 --- a/internal/scheduling/nova/plugins/detectors/avoid_high_steal_pct.go +++ b/internal/scheduling/nova/plugins/detectors/avoid_high_steal_pct.go @@ -5,6 +5,7 @@ package detectors import ( "context" + "errors" "fmt" "log/slog" @@ -21,6 +22,14 @@ type AvoidHighStealPctStepOpts struct { MaxStealPctOverObservedTimeSpan float64 `json:"maxStealPctOverObservedTimeSpan"` } +// Validate the options for the AvoidHighStealPctStep. +func (o AvoidHighStealPctStepOpts) Validate() error { + if o.MaxStealPctOverObservedTimeSpan < 0 { + return errors.New("maxStealPctOverObservedTimeSpan must be non-negative") + } + return nil +} + type AvoidHighStealPctStep struct { // Detector is a helper struct that provides common functionality for all descheduler steps. lib.BaseDetector[AvoidHighStealPctStepOpts] diff --git a/internal/scheduling/nova/plugins/detectors/avoid_high_steal_pct_test.go b/internal/scheduling/nova/plugins/detectors/avoid_high_steal_pct_test.go index 30ec0d670..eb76ac1b9 100644 --- a/internal/scheduling/nova/plugins/detectors/avoid_high_steal_pct_test.go +++ b/internal/scheduling/nova/plugins/detectors/avoid_high_steal_pct_test.go @@ -324,3 +324,118 @@ func equalSlices(a, b []string) bool { return true } + +func TestAvoidHighStealPctStepOpts_Validate(t *testing.T) { + tests := []struct { + name string + opts AvoidHighStealPctStepOpts + expectError bool + }{ + { + name: "valid positive threshold", + opts: AvoidHighStealPctStepOpts{ + MaxStealPctOverObservedTimeSpan: 80.0, + }, + expectError: false, + }, + { + name: "valid zero threshold", + opts: AvoidHighStealPctStepOpts{ + MaxStealPctOverObservedTimeSpan: 0.0, + }, + expectError: false, + }, + { + name: "invalid negative threshold", + opts: AvoidHighStealPctStepOpts{ + MaxStealPctOverObservedTimeSpan: -5.0, + }, + expectError: true, + }, + { + name: "valid small positive threshold", + opts: AvoidHighStealPctStepOpts{ + MaxStealPctOverObservedTimeSpan: 0.001, + }, + expectError: false, + }, + { + name: "valid large threshold", + opts: AvoidHighStealPctStepOpts{ + MaxStealPctOverObservedTimeSpan: 100.0, + }, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.opts.Validate() + + if tt.expectError && err == nil { + t.Error("expected error but got nil") + } + if !tt.expectError && err != nil { + t.Errorf("expected no error but got: %v", err) + } + }) + } +} + +func TestAvoidHighStealPctStep_Validate(t *testing.T) { + tests := []struct { + name string + params runtime.RawExtension + expectError bool + }{ + { + name: "valid params", + params: runtime.RawExtension{ + Raw: []byte(`{"maxStealPctOverObservedTimeSpan": 80.0}`), + }, + expectError: false, + }, + { + name: "valid params with zero threshold", + params: runtime.RawExtension{ + Raw: []byte(`{"maxStealPctOverObservedTimeSpan": 0}`), + }, + expectError: false, + }, + { + name: "invalid params with negative threshold", + params: runtime.RawExtension{ + Raw: []byte(`{"maxStealPctOverObservedTimeSpan": -5.0}`), + }, + expectError: true, + }, + { + name: "invalid JSON", + params: runtime.RawExtension{ + Raw: []byte(`{invalid json}`), + }, + expectError: true, + }, + { + name: "empty params (defaults to zero)", + params: runtime.RawExtension{ + Raw: []byte(`{}`), + }, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + step := &AvoidHighStealPctStep{} + err := step.Validate(context.Background(), tt.params) + + if tt.expectError && err == nil { + t.Error("expected error but got nil") + } + if !tt.expectError && err != nil { + t.Errorf("expected no error but got: %v", err) + } + }) + } +} diff --git a/internal/scheduling/pods/pipeline_webhook.go b/internal/scheduling/pods/pipeline_webhook.go new file mode 100644 index 000000000..75e361007 --- /dev/null +++ b/internal/scheduling/pods/pipeline_webhook.go @@ -0,0 +1,31 @@ +// Copyright SAP SE +// SPDX-License-Identifier: Apache-2.0 + +package pods + +import ( + "github.com/cobaltcore-dev/cortex/api/v1alpha1" + "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + "github.com/cobaltcore-dev/cortex/internal/scheduling/pods/plugins/filters" + "github.com/cobaltcore-dev/cortex/internal/scheduling/pods/plugins/weighers" +) + +// Create a new pipeline admission webhook for the pods scheduling domain, +// using the known filters, weighers and detectors for validation. +func NewPipelineWebhook() lib.PipelineAdmissionWebhook { + validatableFilters := map[string]lib.Validatable{} + for name, constructor := range filters.Index { + validatableFilters[name] = constructor() + } + validatableWeighers := map[string]lib.Validatable{} + for name, constructor := range weighers.Index { + validatableWeighers[name] = constructor() + } + validatableDetectors := map[string]lib.Validatable{} // No detectors for pods yet. + return lib.PipelineAdmissionWebhook{ + SchedulingDomain: v1alpha1.SchedulingDomainPods, + ValidatableFilters: validatableFilters, + ValidatableWeighers: validatableWeighers, + ValidatableDetectors: validatableDetectors, + } +} diff --git a/internal/scheduling/pods/plugins/filters/filter_node_affinity.go b/internal/scheduling/pods/plugins/filters/filter_node_affinity.go index 44bf7363c..30988f50e 100644 --- a/internal/scheduling/pods/plugins/filters/filter_node_affinity.go +++ b/internal/scheduling/pods/plugins/filters/filter_node_affinity.go @@ -12,6 +12,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -23,6 +24,10 @@ func (f *NodeAffinityFilter) Init(ctx context.Context, client client.Client, ste return nil } +func (f *NodeAffinityFilter) Validate(ctx context.Context, params runtime.RawExtension) error { + return nil +} + func (NodeAffinityFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.FilterWeigherPipelineStepResult, error) { activations := make(map[string]float64) stats := make(map[string]lib.FilterWeigherPipelineStepStatistics) diff --git a/internal/scheduling/pods/plugins/filters/filter_node_available.go b/internal/scheduling/pods/plugins/filters/filter_node_available.go index e0514feee..59f6687ad 100644 --- a/internal/scheduling/pods/plugins/filters/filter_node_available.go +++ b/internal/scheduling/pods/plugins/filters/filter_node_available.go @@ -11,6 +11,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -22,6 +23,10 @@ func (f *NodeAvailableFilter) Init(ctx context.Context, client client.Client, st return nil } +func (f *NodeAvailableFilter) Validate(ctx context.Context, params runtime.RawExtension) error { + return nil +} + func (NodeAvailableFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.FilterWeigherPipelineStepResult, error) { activations := make(map[string]float64) stats := make(map[string]lib.FilterWeigherPipelineStepStatistics) diff --git a/internal/scheduling/pods/plugins/filters/filter_node_capacity.go b/internal/scheduling/pods/plugins/filters/filter_node_capacity.go index 692f55bc9..10d40aca4 100644 --- a/internal/scheduling/pods/plugins/filters/filter_node_capacity.go +++ b/internal/scheduling/pods/plugins/filters/filter_node_capacity.go @@ -12,6 +12,7 @@ import ( "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" "github.com/cobaltcore-dev/cortex/internal/scheduling/pods/helpers" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -23,6 +24,10 @@ func (f *NodeCapacityFilter) Init(ctx context.Context, client client.Client, ste return nil } +func (f *NodeCapacityFilter) Validate(ctx context.Context, params runtime.RawExtension) error { + return nil +} + func (NodeCapacityFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.FilterWeigherPipelineStepResult, error) { activations := make(map[string]float64) stats := make(map[string]lib.FilterWeigherPipelineStepStatistics) diff --git a/internal/scheduling/pods/plugins/filters/filter_noop.go b/internal/scheduling/pods/plugins/filters/filter_noop.go index 1a4c898c0..fe59c95eb 100644 --- a/internal/scheduling/pods/plugins/filters/filter_noop.go +++ b/internal/scheduling/pods/plugins/filters/filter_noop.go @@ -10,6 +10,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/external/pods" "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -22,6 +23,10 @@ func (f *NoopFilter) Init(ctx context.Context, client client.Client, step v1alph return nil } +func (f *NoopFilter) Validate(ctx context.Context, params runtime.RawExtension) error { + return nil +} + // Run this step of the scheduling pipeline. // Return a map of keys to activation values. Important: keys that are // not in the map are considered as filtered out. diff --git a/internal/scheduling/pods/plugins/filters/filter_taint.go b/internal/scheduling/pods/plugins/filters/filter_taint.go index 1325a8b14..b5a48707b 100644 --- a/internal/scheduling/pods/plugins/filters/filter_taint.go +++ b/internal/scheduling/pods/plugins/filters/filter_taint.go @@ -11,6 +11,7 @@ import ( "github.com/cobaltcore-dev/cortex/api/v1alpha1" "github.com/cobaltcore-dev/cortex/internal/scheduling/lib" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -22,6 +23,10 @@ func (f *TaintFilter) Init(ctx context.Context, client client.Client, step v1alp return nil } +func (f *TaintFilter) Validate(ctx context.Context, params runtime.RawExtension) error { + return nil +} + func (TaintFilter) Run(traceLog *slog.Logger, request pods.PodPipelineRequest) (*lib.FilterWeigherPipelineStepResult, error) { activations := make(map[string]float64) stats := make(map[string]lib.FilterWeigherPipelineStepStatistics)