From 251d4b2cb91a1004c18fbe547c651066e5ad5afa Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Sat, 21 Sep 2024 08:23:28 +0200 Subject: [PATCH 01/11] Introduce ManageBy field to RunPolicy Signed-off-by: Michal Szadkowski --- manifests/base/kubeflow.org_mpijobs.yaml | 11 +++++++++++ pkg/apis/kubeflow/v2beta1/types.go | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/manifests/base/kubeflow.org_mpijobs.yaml b/manifests/base/kubeflow.org_mpijobs.yaml index e326aecf..a86fc73e 100644 --- a/manifests/base/kubeflow.org_mpijobs.yaml +++ b/manifests/base/kubeflow.org_mpijobs.yaml @@ -7734,6 +7734,17 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a job. + The value must be either an empty, 'kubeflow.org/mpi-operator' or + 'kueue.x-k8s.io/multikueue'. + The mpi-operator reconciles a job which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/mpi-operator', but delegates reconciling the job + with 'kueue.x-k8s.io/multikueue' to the Kueue. + The field is immutable. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 5480d842..9178c317 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -93,6 +93,14 @@ type SchedulingPolicy struct { ScheduleTimeoutSeconds *int32 `json:"scheduleTimeoutSeconds,omitempty"` } +const ( + // KubeflowJobController represents the value of the default job controller + KubeflowJobController = "kubeflow.org/mpi-operator" + + // MultiKueueController represents the vaue of the MultiKueue controller + MultiKueueController = "kueue.x-k8s.io/multikueue" +) + // RunPolicy encapsulates various runtime policies of the distributed training // job, for example how to clean up resources and how long the job can stay // active. @@ -131,6 +139,16 @@ type RunPolicy struct { // Defaults to false. // +kubebuilder:default:=false Suspend *bool `json:"suspend,omitempty"` + + // ManagedBy is used to indicate the controller or entity that manages a job. + // The value must be either an empty, 'kubeflow.org/mpi-operator' or + // 'kueue.x-k8s.io/multikueue'. + // The mpi-operator reconciles a job which doesn't have this + // field at all or the field value is the reserved string + // 'kubeflow.org/mpi-operator', but delegates reconciling the job + // with 'kueue.x-k8s.io/multikueue' to the Kueue. + // The field is immutable. + ManagedBy *string `json:"managedBy,omitempty"` } type LauncherCreationPolicy string From b74df5605817abe59cb11194d5ed4f31d377ecfe Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Sat, 21 Sep 2024 08:23:50 +0200 Subject: [PATCH 02/11] Make mpi-operator a default value for ManagedBy Signed-off-by: Michal Szadkowski --- pkg/apis/kubeflow/v2beta1/default.go | 3 +++ pkg/apis/kubeflow/v2beta1/default_test.go | 7 +++++++ 2 files changed, 10 insertions(+) diff --git a/pkg/apis/kubeflow/v2beta1/default.go b/pkg/apis/kubeflow/v2beta1/default.go index b45f2ebc..0af2e4fa 100644 --- a/pkg/apis/kubeflow/v2beta1/default.go +++ b/pkg/apis/kubeflow/v2beta1/default.go @@ -53,6 +53,9 @@ func setDefaultsRunPolicy(policy *RunPolicy) { if policy.CleanPodPolicy == nil { policy.CleanPodPolicy = ptr.To(CleanPodPolicyNone) } + if policy.ManagedBy == nil { + policy.ManagedBy = ptr.To(KubeflowJobController) + } // The remaining fields are passed as-is to the k8s Job API, which does its // own defaulting. } diff --git a/pkg/apis/kubeflow/v2beta1/default_test.go b/pkg/apis/kubeflow/v2beta1/default_test.go index 598d8032..154d5392 100644 --- a/pkg/apis/kubeflow/v2beta1/default_test.go +++ b/pkg/apis/kubeflow/v2beta1/default_test.go @@ -32,6 +32,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { SlotsPerWorker: ptr.To[int32](1), RunPolicy: RunPolicy{ CleanPodPolicy: ptr.To(CleanPodPolicyNone), + ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, @@ -48,6 +49,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](2), ActiveDeadlineSeconds: ptr.To[int64](3), BackoffLimit: ptr.To[int32](4), + ManagedBy: ptr.To(MultiKueueController), }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: MPIImplementationIntel, @@ -62,6 +64,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](2), ActiveDeadlineSeconds: ptr.To[int64](3), BackoffLimit: ptr.To[int32](4), + ManagedBy: ptr.To(MultiKueueController), }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: MPIImplementationIntel, @@ -78,6 +81,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](2), ActiveDeadlineSeconds: ptr.To[int64](3), BackoffLimit: ptr.To[int32](4), + ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: MPIImplementationMPICH, @@ -92,6 +96,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](2), ActiveDeadlineSeconds: ptr.To[int64](3), BackoffLimit: ptr.To[int32](4), + ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: MPIImplementationMPICH, @@ -112,6 +117,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { SlotsPerWorker: ptr.To[int32](1), RunPolicy: RunPolicy{ CleanPodPolicy: ptr.To(CleanPodPolicyNone), + ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, @@ -138,6 +144,7 @@ func TestSetDefaults_MPIJob(t *testing.T) { SlotsPerWorker: ptr.To[int32](1), RunPolicy: RunPolicy{ CleanPodPolicy: ptr.To(CleanPodPolicyNone), + ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, From 67782e312cea213996797d5c88b48b295bcb1ad0 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Sat, 21 Sep 2024 08:24:10 +0200 Subject: [PATCH 03/11] Add validation for ManagedBy field Signed-off-by: Michal Szadkowski --- pkg/apis/kubeflow/validation/validation.go | 12 ++++++++++-- pkg/apis/kubeflow/validation/validation_test.go | 5 +++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/pkg/apis/kubeflow/validation/validation.go b/pkg/apis/kubeflow/validation/validation.go index 8658683b..bea4cd71 100644 --- a/pkg/apis/kubeflow/validation/validation.go +++ b/pkg/apis/kubeflow/validation/validation.go @@ -39,8 +39,11 @@ var ( validRestartPolicies = sets.NewString( string(kubeflow.RestartPolicyNever), - string(kubeflow.RestartPolicyOnFailure), - ) + string(kubeflow.RestartPolicyOnFailure)) + + validManagedBy = sets.NewString( + string(kubeflow.MultiKueueController), + string(kubeflow.KubeflowJobController)) ) func ValidateMPIJob(job *kubeflow.MPIJob) field.ErrorList { @@ -98,6 +101,11 @@ func validateRunPolicy(policy *kubeflow.RunPolicy, path *field.Path) field.Error if policy.BackoffLimit != nil { errs = append(errs, apivalidation.ValidateNonnegativeField(int64(*policy.BackoffLimit), path.Child("backoffLimit"))...) } + if policy.ManagedBy != nil { + if !validManagedBy.Has(*policy.ManagedBy) { + errs = append(errs, field.NotSupported(path.Child("managedBy"), *policy.ManagedBy, validManagedBy.List())) + } + } return errs } diff --git a/pkg/apis/kubeflow/validation/validation_test.go b/pkg/apis/kubeflow/validation/validation_test.go index 4da4676c..5c6977ba 100644 --- a/pkg/apis/kubeflow/validation/validation_test.go +++ b/pkg/apis/kubeflow/validation/validation_test.go @@ -193,6 +193,7 @@ func TestValidateMPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](-1), ActiveDeadlineSeconds: ptr.To[int64](-1), BackoffLimit: ptr.To[int32](-1), + ManagedBy: ptr.To("invalid.com/controller"), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: kubeflow.MPIImplementation("Unknown"), @@ -239,6 +240,10 @@ func TestValidateMPIJob(t *testing.T) { Type: field.ErrorTypeInvalid, Field: "spec.runPolicy.backoffLimit", }, + { + Type: field.ErrorTypeNotSupported, + Field: "spec.runPolicy.managedBy", + }, { Type: field.ErrorTypeNotSupported, Field: "spec.mpiImplementation", From 111261c9cda9e08aa566df90c21759afa16d04f6 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Sat, 21 Sep 2024 08:24:37 +0200 Subject: [PATCH 04/11] Make use of ManagedBy in reconciliation process Signed-off-by: Michal Szadkowski --- pkg/controller/mpi_job_controller.go | 12 +++ pkg/controller/mpi_job_controller_test.go | 12 +++ test/integration/mpi_job_controller_test.go | 93 ++++++++++++++++++++- 3 files changed, 116 insertions(+), 1 deletion(-) diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index 80cb46ba..4ccc1666 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -582,6 +582,11 @@ func (c *MPIJobController) syncHandler(key string) error { // Set default for the new mpiJob. scheme.Scheme.Default(mpiJob) + if manager := managedByExternalController(mpiJob.Spec.RunPolicy.ManagedBy); manager != nil { + klog.V(2).Info("Skipping MPIJob managed by a custom controller", "managed-by", manager) + return nil + } + // for mpi job that is terminating, just return. if mpiJob.DeletionTimestamp != nil { return nil @@ -1722,3 +1727,10 @@ func truncateMessage(message string) string { suffix := "..." return message[:eventMessageLimit-len(suffix)] + suffix } + +func managedByExternalController(controllerName *string) *string { + if controllerName != nil && *controllerName != kubeflow.KubeflowJobController { + return controllerName + } + return nil +} diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 95445d94..bd7129bb 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -504,6 +504,18 @@ func TestDoNothingWithInvalidMPIJob(t *testing.T) { f.run(getKey(mpiJob, t)) } +func TestDoNothingWithMPIJobManagedExternally(t *testing.T) { + f := newFixture(t, "") + var replicas int32 = 1 + startTime := metav1.Now() + completionTime := metav1.Now() + mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime) + mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationIntel + mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController) + f.setUpMPIJob(mpiJob) + f.run(getKey(mpiJob, t)) +} + func TestAllResourcesCreated(t *testing.T) { impls := []kubeflow.MPIImplementation{kubeflow.MPIImplementationOpenMPI, kubeflow.MPIImplementationIntel, kubeflow.MPIImplementationMPICH} for _, implementation := range impls { diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index f465d077..c9ba71ca 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -46,7 +46,8 @@ import ( ) const ( - waitInterval = 100 * time.Millisecond + waitInterval = 100 * time.Millisecond + moderateTimeout = 2 * time.Second ) func TestMPIJobSuccess(t *testing.T) { @@ -822,6 +823,96 @@ func TestMPIJobWithVolcanoScheduler(t *testing.T) { } } +func TestMPIJobManagedExternally(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + s := newTestSetup(ctx, t) + startController(ctx, s.kClient, s.mpiClient, nil) + + mpiJob := &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: s.namespace, + }, + Spec: kubeflow.MPIJobSpec{ + SlotsPerWorker: ptr.To[int32](1), + RunPolicy: kubeflow.RunPolicy{ + CleanPodPolicy: ptr.To(kubeflow.CleanPodPolicyRunning), + ManagedBy: ptr.To(kubeflow.MultiKueueController), + }, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*kubeflow.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "mpi-image", + }, + }, + }, + }, + }, + kubeflow.MPIReplicaTypeWorker: { + Replicas: ptr.To[int32](2), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "mpi-image", + }, + }, + }, + }, + }, + }, + }, + } + + // 1. The job must be created + var err error + mpiJob, err = s.mpiClient.KubeflowV2beta1().MPIJobs(s.namespace).Create(ctx, mpiJob, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed sending job to apiserver: %v", err) + } + backoff := wait.Backoff{ + Duration: moderateTimeout, + Factor: 1.0, + Steps: 3, + } + err = wait.ExponentialBackoff(backoff, func() (bool, error) { + // 2. Status is not getting updated + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, nil) + if mpiJob.Status.StartTime != nil { + t.Errorf("MPIJob should be missing startTime") + } + // 3. There should be no conditions, even the one for create + if mpiJobHasCondition(mpiJob, kubeflow.JobCreated) { + t.Errorf("MPIJob shouldn't have any condition") + } + // 4. No Pods or Services created + pods, err := getPodsForJob(ctx, s.kClient, mpiJob) + if err != nil { + t.Fatalf("Failed getting pods for the job: %v", err) + } + if len(pods) > 0 { + t.Fatalf("There should be no pods from job: %v", pods) + } + svcs, err := getServiceForJob(ctx, s.kClient, mpiJob) + if err != nil { + t.Fatalf("Failed getting services for the job: %v", err) + } + if svcs != nil { + t.Fatalf("There should be no services from job: %v", svcs) + } + return false, nil + }) + if !wait.Interrupted(err) { + t.Fatalf("Failed to verify externally managed mpiJob: %v", err) + } +} + func startController( ctx context.Context, kClient kubernetes.Interface, From 90594d9a014fc5735c1719433f04afe546ef9d7b Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 23 Sep 2024 14:31:19 +0200 Subject: [PATCH 05/11] Regenerate code after adding managedBy field Signed-off-by: Michal Szadkowski --- deploy/v2beta1/mpi-operator.yaml | 11 +++++++ pkg/apis/kubeflow/v2beta1/swagger.json | 4 +++ .../kubeflow/v2beta1/zz_generated.deepcopy.go | 5 ++++ .../kubeflow/v2beta1/runpolicy.go | 9 ++++++ sdk/python/v2beta1/docs/V2beta1RunPolicy.md | 1 + .../mpijob/models/v2beta1_run_policy.py | 30 ++++++++++++++++++- 6 files changed, 59 insertions(+), 1 deletion(-) diff --git a/deploy/v2beta1/mpi-operator.yaml b/deploy/v2beta1/mpi-operator.yaml index 308f2e05..dc6f32b9 100644 --- a/deploy/v2beta1/mpi-operator.yaml +++ b/deploy/v2beta1/mpi-operator.yaml @@ -7757,6 +7757,17 @@ spec: CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. type: string + managedBy: + description: |- + ManagedBy is used to indicate the controller or entity that manages a job. + The value must be either an empty, 'kubeflow.org/mpi-operator' or + 'kueue.x-k8s.io/multikueue'. + The mpi-operator reconciles a job which doesn't have this + field at all or the field value is the reserved string + 'kubeflow.org/mpi-operator', but delegates reconciling the job + with 'kueue.x-k8s.io/multikueue' to the Kueue. + The field is immutable. + type: string schedulingPolicy: description: SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 9a7e3c45..80cc8569 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -241,6 +241,10 @@ "description": "CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running.", "type": "string" }, + "managedBy": { + "description": "ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.", + "type": "string" + }, "schedulingPolicy": { "description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", "$ref": "#/definitions/v2beta1.SchedulingPolicy" diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index 75ef51aa..e85549f5 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -273,6 +273,11 @@ func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { *out = new(bool) **out = **in } + if in.ManagedBy != nil { + in, out := &in.ManagedBy, &out.ManagedBy + *out = new(string) + **out = **in + } return } diff --git a/pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go b/pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go index 5370cddb..0481ee7e 100644 --- a/pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go +++ b/pkg/client/applyconfiguration/kubeflow/v2beta1/runpolicy.go @@ -29,6 +29,7 @@ type RunPolicyApplyConfiguration struct { BackoffLimit *int32 `json:"backoffLimit,omitempty"` SchedulingPolicy *SchedulingPolicyApplyConfiguration `json:"schedulingPolicy,omitempty"` Suspend *bool `json:"suspend,omitempty"` + ManagedBy *string `json:"managedBy,omitempty"` } // RunPolicyApplyConfiguration constructs an declarative configuration of the RunPolicy type for use with @@ -84,3 +85,11 @@ func (b *RunPolicyApplyConfiguration) WithSuspend(value bool) *RunPolicyApplyCon b.Suspend = &value return b } + +// WithManagedBy sets the ManagedBy field in the declarative configuration to the given value +// and returns the receiver, so that objects can be built by chaining "With" function invocations. +// If called multiple times, the ManagedBy field is set to the value of the last call. +func (b *RunPolicyApplyConfiguration) WithManagedBy(value string) *RunPolicyApplyConfiguration { + b.ManagedBy = &value + return b +} diff --git a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md index eeb1727f..2c790c23 100644 --- a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md +++ b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md @@ -8,6 +8,7 @@ Name | Type | Description | Notes **active_deadline_seconds** | **int** | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. | [optional] **backoff_limit** | **int** | Optional number of retries before marking this job failed. | [optional] **clean_pod_policy** | **str** | CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. | [optional] +**managed_by** | **str** | ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. | [optional] **scheduling_policy** | [**V2beta1SchedulingPolicy**](V2beta1SchedulingPolicy.md) | | [optional] **suspend** | **bool** | suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob. Defaults to false. | [optional] **ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. | [optional] diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py index b8c25294..b039bf66 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py @@ -36,6 +36,7 @@ class V2beta1RunPolicy(object): 'active_deadline_seconds': 'int', 'backoff_limit': 'int', 'clean_pod_policy': 'str', + 'managed_by': 'str', 'scheduling_policy': 'V2beta1SchedulingPolicy', 'suspend': 'bool', 'ttl_seconds_after_finished': 'int' @@ -45,12 +46,13 @@ class V2beta1RunPolicy(object): 'active_deadline_seconds': 'activeDeadlineSeconds', 'backoff_limit': 'backoffLimit', 'clean_pod_policy': 'cleanPodPolicy', + 'managed_by': 'managedBy', 'scheduling_policy': 'schedulingPolicy', 'suspend': 'suspend', 'ttl_seconds_after_finished': 'ttlSecondsAfterFinished' } - def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, scheduling_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, managed_by=None, scheduling_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 """V2beta1RunPolicy - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration.get_default_copy() @@ -59,6 +61,7 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self._active_deadline_seconds = None self._backoff_limit = None self._clean_pod_policy = None + self._managed_by = None self._scheduling_policy = None self._suspend = None self._ttl_seconds_after_finished = None @@ -70,6 +73,8 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self.backoff_limit = backoff_limit if clean_pod_policy is not None: self.clean_pod_policy = clean_pod_policy + if managed_by is not None: + self.managed_by = managed_by if scheduling_policy is not None: self.scheduling_policy = scheduling_policy if suspend is not None: @@ -146,6 +151,29 @@ def clean_pod_policy(self, clean_pod_policy): self._clean_pod_policy = clean_pod_policy + @property + def managed_by(self): + """Gets the managed_by of this V2beta1RunPolicy. # noqa: E501 + + ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + + :return: The managed_by of this V2beta1RunPolicy. # noqa: E501 + :rtype: str + """ + return self._managed_by + + @managed_by.setter + def managed_by(self, managed_by): + """Sets the managed_by of this V2beta1RunPolicy. + + ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + + :param managed_by: The managed_by of this V2beta1RunPolicy. # noqa: E501 + :type managed_by: str + """ + + self._managed_by = managed_by + @property def scheduling_policy(self): """Gets the scheduling_policy of this V2beta1RunPolicy. # noqa: E501 From 0e35914ad646d8f51aad09ffdcdc0857f03ce27c Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 23 Sep 2024 13:08:02 +0200 Subject: [PATCH 06/11] Add e2e tests Signed-off-by: Michal Szadkowski --- pkg/controller/mpi_job_controller_test.go | 2 +- test/e2e/mpi_job_test.go | 22 ++++++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index bd7129bb..5ed8bc1e 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -510,7 +510,7 @@ func TestDoNothingWithMPIJobManagedExternally(t *testing.T) { startTime := metav1.Now() completionTime := metav1.Now() mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime) - mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationIntel + mpiJob.Spec.MPIImplementation = kubeflow.MPIImplementationOpenMPI mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController) f.setUpMPIJob(mpiJob) f.run(getKey(mpiJob, t)) diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index b7f35815..2359b62f 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "io" + "time" "github.com/google/go-cmp/cmp" "github.com/onsi/ginkgo" @@ -164,6 +165,27 @@ var _ = ginkgo.Describe("MPIJob", func() { mpiJob := createJobAndWaitForCompletion(mpiJob) expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) }) + + ginkgo.It("should not be updated when managed externaly, only created", func() { + mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController) + ctx := context.Background() + mpiJob = createJob(ctx, mpiJob) + + gomega.Consistently(func() error { + updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{}) + if err != nil { + return err + } + mpiJob = updatedJob + return nil + }, 5*time.Second, waitInterval).Should(gomega.Succeed()) + + // job should be created, but status should not be updated neither for create nor for any other status + condition := getJobCondition(mpiJob, kubeflow.JobCreated) + gomega.Expect(condition).To(gomega.BeNil()) + condition = getJobCondition(mpiJob, kubeflow.JobSucceeded) + gomega.Expect(condition).To(gomega.BeNil()) + }) }) }) From 4ac4cb2e98bb7df5e7661793b5eb84404152df8f Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Wed, 25 Sep 2024 15:20:32 +0200 Subject: [PATCH 07/11] Update after code review Signed-off-by: Michal Szadkowski --- deploy/v2beta1/mpi-operator.yaml | 6 +- manifests/base/kubeflow.org_mpijobs.yaml | 6 +- pkg/apis/kubeflow/v2beta1/swagger.json | 2 +- pkg/apis/kubeflow/v2beta1/types.go | 6 +- pkg/controller/mpi_job_controller_test.go | 8 +++ sdk/python/v2beta1/docs/V2beta1RunPolicy.md | 2 +- .../mpijob/models/v2beta1_run_policy.py | 4 +- test/e2e/mpi_job_test.go | 66 ++++++++++++++++--- 8 files changed, 78 insertions(+), 22 deletions(-) diff --git a/deploy/v2beta1/mpi-operator.yaml b/deploy/v2beta1/mpi-operator.yaml index dc6f32b9..420710d1 100644 --- a/deploy/v2beta1/mpi-operator.yaml +++ b/deploy/v2beta1/mpi-operator.yaml @@ -7759,12 +7759,12 @@ spec: type: string managedBy: description: |- - ManagedBy is used to indicate the controller or entity that manages a job. + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. - The mpi-operator reconciles a job which doesn't have this + The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string - 'kubeflow.org/mpi-operator', but delegates reconciling the job + 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. type: string diff --git a/manifests/base/kubeflow.org_mpijobs.yaml b/manifests/base/kubeflow.org_mpijobs.yaml index a86fc73e..38169bb1 100644 --- a/manifests/base/kubeflow.org_mpijobs.yaml +++ b/manifests/base/kubeflow.org_mpijobs.yaml @@ -7736,12 +7736,12 @@ spec: type: string managedBy: description: |- - ManagedBy is used to indicate the controller or entity that manages a job. + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. - The mpi-operator reconciles a job which doesn't have this + The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string - 'kubeflow.org/mpi-operator', but delegates reconciling the job + 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. type: string diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 80cc8569..61449c43 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -242,7 +242,7 @@ "type": "string" }, "managedBy": { - "description": "ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.", + "description": "ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable.", "type": "string" }, "schedulingPolicy": { diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 9178c317..187f266a 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -140,12 +140,12 @@ type RunPolicy struct { // +kubebuilder:default:=false Suspend *bool `json:"suspend,omitempty"` - // ManagedBy is used to indicate the controller or entity that manages a job. + // ManagedBy is used to indicate the controller or entity that manages a MPIJob. // The value must be either an empty, 'kubeflow.org/mpi-operator' or // 'kueue.x-k8s.io/multikueue'. - // The mpi-operator reconciles a job which doesn't have this + // The mpi-operator reconciles a MPIJob which doesn't have this // field at all or the field value is the reserved string - // 'kubeflow.org/mpi-operator', but delegates reconciling the job + // 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob // with 'kueue.x-k8s.io/multikueue' to the Kueue. // The field is immutable. ManagedBy *string `json:"managedBy,omitempty"` diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index 5ed8bc1e..3594d354 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -418,6 +418,11 @@ func (f *fixture) expectCreateSecretAction(d *corev1.Secret) { f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "secrets"}, d.Namespace, d)) } +func (f *fixture) expectNoKubeActions() bool { + k8sActions := filterInformerActions(f.kubeClient.Actions()) + return len(k8sActions) == 0 +} + func (f *fixture) expectUpdateMPIJobStatusAction(mpiJob *kubeflow.MPIJob) { action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "mpijobs"}, mpiJob.Namespace, mpiJob) action.Subresource = "status" @@ -514,6 +519,9 @@ func TestDoNothingWithMPIJobManagedExternally(t *testing.T) { mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.MultiKueueController) f.setUpMPIJob(mpiJob) f.run(getKey(mpiJob, t)) + if !f.expectNoKubeActions() { + t.Fatalf("Expected no kubeActions (secrets, pods, services etc.)") + } } func TestAllResourcesCreated(t *testing.T) { diff --git a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md index 2c790c23..8491e8ca 100644 --- a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md +++ b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md @@ -8,7 +8,7 @@ Name | Type | Description | Notes **active_deadline_seconds** | **int** | Specifies the duration in seconds relative to the startTime that the job may be active before the system tries to terminate it; value must be positive integer. | [optional] **backoff_limit** | **int** | Optional number of retries before marking this job failed. | [optional] **clean_pod_policy** | **str** | CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. | [optional] -**managed_by** | **str** | ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. | [optional] +**managed_by** | **str** | ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. | [optional] **scheduling_policy** | [**V2beta1SchedulingPolicy**](V2beta1SchedulingPolicy.md) | | [optional] **suspend** | **bool** | suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob. Defaults to false. | [optional] **ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. | [optional] diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py index b039bf66..1860c6dc 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py @@ -155,7 +155,7 @@ def clean_pod_policy(self, clean_pod_policy): def managed_by(self): """Gets the managed_by of this V2beta1RunPolicy. # noqa: E501 - ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 :return: The managed_by of this V2beta1RunPolicy. # noqa: E501 :rtype: str @@ -166,7 +166,7 @@ def managed_by(self): def managed_by(self, managed_by): """Sets the managed_by of this V2beta1RunPolicy. - ManagedBy is used to indicate the controller or entity that manages a job. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a job which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the job with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 + ManagedBy is used to indicate the controller or entity that manages a MPIJob. The value must be either an empty, 'kubeflow.org/mpi-operator' or 'kueue.x-k8s.io/multikueue'. The mpi-operator reconciles a MPIJob which doesn't have this field at all or the field value is the reserved string 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob with 'kueue.x-k8s.io/multikueue' to the Kueue. The field is immutable. # noqa: E501 :param managed_by: The managed_by of this V2beta1RunPolicy. # noqa: E501 :type managed_by: str diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index 2359b62f..729ed58f 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -178,16 +178,30 @@ var _ = ginkgo.Describe("MPIJob", func() { } mpiJob = updatedJob return nil - }, 5*time.Second, waitInterval).Should(gomega.Succeed()) + }, 2*time.Second, waitInterval).Should(gomega.Succeed()) // job should be created, but status should not be updated neither for create nor for any other status condition := getJobCondition(mpiJob, kubeflow.JobCreated) gomega.Expect(condition).To(gomega.BeNil()) condition = getJobCondition(mpiJob, kubeflow.JobSucceeded) gomega.Expect(condition).To(gomega.BeNil()) + launcherPods, err := getLauncherPods(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(len(launcherPods.Items)).To(gomega.Equal(0)) + workerPods, err := getWorkerPods(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(len(workerPods.Items)).To(gomega.Equal(0)) + secret, err := getSecretsForJob(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(secret).To(gomega.BeNil()) }) - }) + ginkgo.It("should succeed when explicitly managed by mpi-operator", func() { + mpiJob.Spec.RunPolicy.ManagedBy = ptr.To(kubeflow.KubeflowJobController) + mpiJob := createJobAndWaitForCompletion(mpiJob) + expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) + }) + }) }) ginkgo.Context("with Intel Implementation", func() { @@ -558,7 +572,7 @@ func waitForCompletion(ctx context.Context, mpiJob *kubeflow.MPIJob) *kubeflow.M return mpiJob } -func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { +func getLauncherPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) { selector := metav1.LabelSelector{ MatchLabels: map[string]string{ kubeflow.OperatorNameLabel: kubeflow.OperatorName, @@ -570,7 +584,45 @@ func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { LabelSelector: metav1.FormatLabelSelector(&selector), }) if err != nil { - return fmt.Errorf("getting launcher Pods: %w", err) + return &corev1.PodList{}, fmt.Errorf("getting launcher Pods: %w", err) + } + return launcherPods, nil +} + +func getWorkerPods(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.PodList, error) { + selector := metav1.LabelSelector{ + MatchLabels: map[string]string{ + kubeflow.OperatorNameLabel: kubeflow.OperatorName, + kubeflow.JobNameLabel: mpiJob.Name, + kubeflow.JobRoleLabel: "worker", + }, + } + workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{ + LabelSelector: metav1.FormatLabelSelector(&selector), + }) + if err != nil { + return &corev1.PodList{}, fmt.Errorf("getting worker Pods: %w", err) + } + return workerPods, nil +} + +func getSecretsForJob(ctx context.Context, mpiJob *kubeflow.MPIJob) (*corev1.Secret, error) { + result, err := k8sClient.CoreV1().Secrets(mpiJob.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + for _, obj := range result.Items { + if metav1.IsControlledBy(&obj, mpiJob) { + return &obj, nil + } + } + return nil, nil +} + +func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { + launcherPods, err := getLauncherPods(ctx, mpiJob) + if err != nil { + return err } if len(launcherPods.Items) == 0 { return fmt.Errorf("no launcher Pods found") @@ -585,11 +637,7 @@ func debugJob(ctx context.Context, mpiJob *kubeflow.MPIJob) error { if err != nil { return fmt.Errorf("obtaining launcher logs: %w", err) } - - selector.MatchLabels[kubeflow.JobRoleLabel] = "worker" - workerPods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{ - LabelSelector: metav1.FormatLabelSelector(&selector), - }) + workerPods, err := getWorkerPods(ctx, mpiJob) if err != nil { return fmt.Errorf("getting worker Pods: %w", err) } From 08a3ac841b11f6043aa85d177ce0ec6db9e1384a Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Wed, 2 Oct 2024 13:53:13 +0200 Subject: [PATCH 08/11] Update tests Signed-off-by: Michal Szadkowski --- test/e2e/mpi_job_test.go | 17 +++++++++++++++++ test/integration/mpi_job_controller_test.go | 10 +++++----- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index 729ed58f..a279337a 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/onsi/ginkgo" "github.com/onsi/gomega" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" @@ -185,6 +186,9 @@ var _ = ginkgo.Describe("MPIJob", func() { gomega.Expect(condition).To(gomega.BeNil()) condition = getJobCondition(mpiJob, kubeflow.JobSucceeded) gomega.Expect(condition).To(gomega.BeNil()) + launcherJob, err := getLauncherJob(ctx, mpiJob) + gomega.Expect(err).To(gomega.BeNil()) + gomega.Expect(launcherJob).To(gomega.BeNil()) launcherPods, err := getLauncherPods(ctx, mpiJob) gomega.Expect(err).To(gomega.BeNil()) gomega.Expect(len(launcherPods.Items)).To(gomega.Equal(0)) @@ -681,6 +685,19 @@ func getJobCondition(mpiJob *kubeflow.MPIJob, condType kubeflow.JobConditionType return nil } +func getLauncherJob(ctx context.Context, mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) { + result, err := k8sClient.BatchV1().Jobs(mpiJob.Namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + return nil, err + } + for _, j := range result.Items { + if metav1.IsControlledBy(&j, mpiJob) { + return &j, nil + } + } + return nil, nil +} + func createMPIJobWithOpenMPI(mpiJob *kubeflow.MPIJob) { mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.Containers = []corev1.Container{ { diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index c9ba71ca..9e0b18e5 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -891,13 +891,13 @@ func TestMPIJobManagedExternally(t *testing.T) { if mpiJobHasCondition(mpiJob, kubeflow.JobCreated) { t.Errorf("MPIJob shouldn't have any condition") } - // 4. No Pods or Services created - pods, err := getPodsForJob(ctx, s.kClient, mpiJob) + // 4. No Jobs or Services created + lp, err := getLauncherJobForMPIJob(ctx, s.kClient, mpiJob) if err != nil { - t.Fatalf("Failed getting pods for the job: %v", err) + t.Fatalf("Failed getting launcher jobs: %v", err) } - if len(pods) > 0 { - t.Fatalf("There should be no pods from job: %v", pods) + if lp != nil { + t.Fatalf("There should be no launcher jobs from job: %v", lp) } svcs, err := getServiceForJob(ctx, s.kClient, mpiJob) if err != nil { From e1eae618229866d1f73cee47619579936bac9c43 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Mon, 7 Oct 2024 13:25:37 +0200 Subject: [PATCH 09/11] Remove default value for ManagedBy Signed-off-by: Michal Szadkowski --- pkg/apis/kubeflow/v2beta1/default.go | 3 --- pkg/apis/kubeflow/v2beta1/default_test.go | 7 ------- 2 files changed, 10 deletions(-) diff --git a/pkg/apis/kubeflow/v2beta1/default.go b/pkg/apis/kubeflow/v2beta1/default.go index 0af2e4fa..b45f2ebc 100644 --- a/pkg/apis/kubeflow/v2beta1/default.go +++ b/pkg/apis/kubeflow/v2beta1/default.go @@ -53,9 +53,6 @@ func setDefaultsRunPolicy(policy *RunPolicy) { if policy.CleanPodPolicy == nil { policy.CleanPodPolicy = ptr.To(CleanPodPolicyNone) } - if policy.ManagedBy == nil { - policy.ManagedBy = ptr.To(KubeflowJobController) - } // The remaining fields are passed as-is to the k8s Job API, which does its // own defaulting. } diff --git a/pkg/apis/kubeflow/v2beta1/default_test.go b/pkg/apis/kubeflow/v2beta1/default_test.go index 154d5392..598d8032 100644 --- a/pkg/apis/kubeflow/v2beta1/default_test.go +++ b/pkg/apis/kubeflow/v2beta1/default_test.go @@ -32,7 +32,6 @@ func TestSetDefaults_MPIJob(t *testing.T) { SlotsPerWorker: ptr.To[int32](1), RunPolicy: RunPolicy{ CleanPodPolicy: ptr.To(CleanPodPolicyNone), - ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, @@ -49,7 +48,6 @@ func TestSetDefaults_MPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](2), ActiveDeadlineSeconds: ptr.To[int64](3), BackoffLimit: ptr.To[int32](4), - ManagedBy: ptr.To(MultiKueueController), }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: MPIImplementationIntel, @@ -64,7 +62,6 @@ func TestSetDefaults_MPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](2), ActiveDeadlineSeconds: ptr.To[int64](3), BackoffLimit: ptr.To[int32](4), - ManagedBy: ptr.To(MultiKueueController), }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: MPIImplementationIntel, @@ -81,7 +78,6 @@ func TestSetDefaults_MPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](2), ActiveDeadlineSeconds: ptr.To[int64](3), BackoffLimit: ptr.To[int32](4), - ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: MPIImplementationMPICH, @@ -96,7 +92,6 @@ func TestSetDefaults_MPIJob(t *testing.T) { TTLSecondsAfterFinished: ptr.To[int32](2), ActiveDeadlineSeconds: ptr.To[int64](3), BackoffLimit: ptr.To[int32](4), - ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/home/mpiuser/.ssh", MPIImplementation: MPIImplementationMPICH, @@ -117,7 +112,6 @@ func TestSetDefaults_MPIJob(t *testing.T) { SlotsPerWorker: ptr.To[int32](1), RunPolicy: RunPolicy{ CleanPodPolicy: ptr.To(CleanPodPolicyNone), - ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, @@ -144,7 +138,6 @@ func TestSetDefaults_MPIJob(t *testing.T) { SlotsPerWorker: ptr.To[int32](1), RunPolicy: RunPolicy{ CleanPodPolicy: ptr.To(CleanPodPolicyNone), - ManagedBy: ptr.To(KubeflowJobController), }, SSHAuthMountPath: "/root/.ssh", MPIImplementation: MPIImplementationOpenMPI, From cc9c108592ff72dce1e99251a4449d0a4006e908 Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Tue, 8 Oct 2024 09:19:30 +0200 Subject: [PATCH 10/11] Add optional tag Replace backoff and consistently with sleep Signed-off-by: Michal Szadkowski --- pkg/apis/kubeflow/v2beta1/types.go | 1 + test/e2e/mpi_job_test.go | 11 ++-- test/integration/mpi_job_controller_test.go | 60 +++++++++------------ 3 files changed, 30 insertions(+), 42 deletions(-) diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 187f266a..f6831080 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -148,6 +148,7 @@ type RunPolicy struct { // 'kubeflow.org/mpi-operator', but delegates reconciling the MPIJob // with 'kueue.x-k8s.io/multikueue' to the Kueue. // The field is immutable. + // +optional ManagedBy *string `json:"managedBy,omitempty"` } diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index a279337a..3192c2a7 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -172,14 +172,9 @@ var _ = ginkgo.Describe("MPIJob", func() { ctx := context.Background() mpiJob = createJob(ctx, mpiJob) - gomega.Consistently(func() error { - updatedJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{}) - if err != nil { - return err - } - mpiJob = updatedJob - return nil - }, 2*time.Second, waitInterval).Should(gomega.Succeed()) + time.Sleep(1 * time.Second) + mpiJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{}) + gomega.Expect(err).To(gomega.BeNil()) // job should be created, but status should not be updated neither for create nor for any other status condition := getJobCondition(mpiJob, kubeflow.JobCreated) diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index 9e0b18e5..822840fe 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -876,41 +876,33 @@ func TestMPIJobManagedExternally(t *testing.T) { if err != nil { t.Fatalf("Failed sending job to apiserver: %v", err) } - backoff := wait.Backoff{ - Duration: moderateTimeout, - Factor: 1.0, - Steps: 3, - } - err = wait.ExponentialBackoff(backoff, func() (bool, error) { - // 2. Status is not getting updated - mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, nil) - if mpiJob.Status.StartTime != nil { - t.Errorf("MPIJob should be missing startTime") - } - // 3. There should be no conditions, even the one for create - if mpiJobHasCondition(mpiJob, kubeflow.JobCreated) { - t.Errorf("MPIJob shouldn't have any condition") - } - // 4. No Jobs or Services created - lp, err := getLauncherJobForMPIJob(ctx, s.kClient, mpiJob) - if err != nil { - t.Fatalf("Failed getting launcher jobs: %v", err) - } - if lp != nil { - t.Fatalf("There should be no launcher jobs from job: %v", lp) - } - svcs, err := getServiceForJob(ctx, s.kClient, mpiJob) - if err != nil { - t.Fatalf("Failed getting services for the job: %v", err) - } - if svcs != nil { - t.Fatalf("There should be no services from job: %v", svcs) - } - return false, nil - }) - if !wait.Interrupted(err) { - t.Fatalf("Failed to verify externally managed mpiJob: %v", err) + + time.Sleep(moderateTimeout) + // 2. Status is not getting updated + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, nil) + if mpiJob.Status.StartTime != nil { + t.Errorf("MPIJob should be missing startTime") + } + // 3. There should be no conditions, even the one for create + if mpiJobHasCondition(mpiJob, kubeflow.JobCreated) { + t.Errorf("MPIJob shouldn't have any condition") + } + // 4. No Jobs or Services created + lp, err := getLauncherJobForMPIJob(ctx, s.kClient, mpiJob) + if err != nil { + t.Fatalf("Failed getting launcher jobs: %v", err) } + if lp != nil { + t.Fatalf("There should be no launcher jobs from job: %v", lp) + } + svcs, err := getServiceForJob(ctx, s.kClient, mpiJob) + if err != nil { + t.Fatalf("Failed getting services for the job: %v", err) + } + if svcs != nil { + t.Fatalf("There should be no services from job: %v", svcs) + } + } func startController( From f91b8d2decdb1b6e813dcd0c9c3931a16590f84d Mon Sep 17 00:00:00 2001 From: Michal Szadkowski Date: Tue, 8 Oct 2024 11:39:18 +0200 Subject: [PATCH 11/11] Create common util package for integration and e2e tests with sleep/wait constants Signed-off-by: Michal Szadkowski --- test/e2e/mpi_job_test.go | 3 ++- test/integration/main_test.go | 3 ++- test/integration/mpi_job_controller_test.go | 16 ++++++-------- test/util/constants.go | 23 +++++++++++++++++++++ 4 files changed, 33 insertions(+), 12 deletions(-) create mode 100644 test/util/constants.go diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index 3192c2a7..5c1e9b61 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -35,6 +35,7 @@ import ( schedv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" + "github.com/kubeflow/mpi-operator/test/util" ) var _ = ginkgo.Describe("MPIJob", func() { @@ -172,7 +173,7 @@ var _ = ginkgo.Describe("MPIJob", func() { ctx := context.Background() mpiJob = createJob(ctx, mpiJob) - time.Sleep(1 * time.Second) + time.Sleep(util.SleepDurationControllerSyncDelay) mpiJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Get(ctx, mpiJob.Name, metav1.GetOptions{}) gomega.Expect(err).To(gomega.BeNil()) diff --git a/test/integration/main_test.go b/test/integration/main_test.go index 5dd36b93..14419dc5 100644 --- a/test/integration/main_test.go +++ b/test/integration/main_test.go @@ -36,6 +36,7 @@ import ( volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" clientset "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned" + "github.com/kubeflow/mpi-operator/test/util" ) var ( @@ -191,7 +192,7 @@ func (c *eventChecker) run() { func (c *eventChecker) verify(t *testing.T) { t.Helper() - err := wait.PollUntilContextTimeout(context.Background(), waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + err := wait.PollUntilContextTimeout(context.Background(), util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { c.Lock() defer c.Unlock() return c.expected.Len() == 0, nil diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index 822840fe..f260d829 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -43,11 +43,7 @@ import ( "github.com/kubeflow/mpi-operator/pkg/client/clientset/versioned/scheme" informers "github.com/kubeflow/mpi-operator/pkg/client/informers/externalversions" "github.com/kubeflow/mpi-operator/pkg/controller" -) - -const ( - waitInterval = 100 * time.Millisecond - moderateTimeout = 2 * time.Second + "github.com/kubeflow/mpi-operator/test/util" ) func TestMPIJobSuccess(t *testing.T) { @@ -693,7 +689,7 @@ func TestMPIJobWithSchedulerPlugins(t *testing.T) { if err != nil { t.Errorf("Failed sending job to apiserver: %v", err) } - if err = wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if err = wait.PollUntilContextTimeout(ctx, util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { pg, err := getSchedPodGroup(ctx, gangSchedulerCfg.schedClient, mpiJob) if err != nil { return false, err @@ -809,7 +805,7 @@ func TestMPIJobWithVolcanoScheduler(t *testing.T) { if err != nil { t.Errorf("Failed sending job to apiserver: %v", err) } - if err = wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if err = wait.PollUntilContextTimeout(ctx, util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { pg, err := getVolcanoPodGroup(ctx, gangSchedulerCfg.volcanoClient, mpiJob) if err != nil { return false, err @@ -877,7 +873,7 @@ func TestMPIJobManagedExternally(t *testing.T) { t.Fatalf("Failed sending job to apiserver: %v", err) } - time.Sleep(moderateTimeout) + time.Sleep(util.SleepDurationControllerSyncDelay) // 2. Status is not getting updated mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, nil) if mpiJob.Status.StartTime != nil { @@ -975,7 +971,7 @@ func validateMPIJobDependencies( podGroup metav1.Object ) var problems []string - if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { problems = nil var err error svc, err = getServiceForJob(ctx, kubeClient, job) @@ -1071,7 +1067,7 @@ func validateMPIJobStatus(ctx context.Context, t *testing.T, client clientset.In err error got map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus ) - if err := wait.PollUntilContextTimeout(ctx, waitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { + if err := wait.PollUntilContextTimeout(ctx, util.WaitInterval, wait.ForeverTestTimeout, false, func(ctx context.Context) (bool, error) { newJob, err = client.KubeflowV2beta1().MPIJobs(job.Namespace).Get(ctx, job.Name, metav1.GetOptions{}) if err != nil { return false, err diff --git a/test/util/constants.go b/test/util/constants.go new file mode 100644 index 00000000..17c2e82e --- /dev/null +++ b/test/util/constants.go @@ -0,0 +1,23 @@ +// Copyright 2024 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package util + +import "time" + +const ( + // Time duration used to ensure that subsequent controller syncs have occurred + SleepDurationControllerSyncDelay = 1 * time.Second + WaitInterval = 100 * time.Millisecond +)