Skip to content

Commit

Permalink
Retry to reset start time and restore info after error on stop job. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
mbobrovskyi authored Jul 10, 2024
1 parent 5bca510 commit fc7477d
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
13 changes: 9 additions & 4 deletions pkg/controller/jobs/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ var (
const (
JobMinParallelismAnnotation = "kueue.x-k8s.io/job-min-parallelism"
JobCompletionsEqualParallelismAnnotation = "kueue.x-k8s.io/job-completions-equal-parallelism"
StoppingAnnotation = "kueue.x-k8s.io/stopping"
)

func init() {
Expand Down Expand Up @@ -151,7 +152,7 @@ func fromObject(o runtime.Object) *Job {
}

func (j *Job) IsSuspended() bool {
return j.Spec.Suspend != nil && *j.Spec.Suspend
return j.Spec.Suspend != nil && *j.Spec.Suspend && j.Annotations[StoppingAnnotation] != "true"
}

func (j *Job) IsActive() bool {
Expand All @@ -166,6 +167,11 @@ func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.Po
stoppedNow := false
if !j.IsSuspended() {
j.Suspend()
if j.ObjectMeta.Annotations == nil {
j.ObjectMeta.Annotations = map[string]string{}
}
// We are using annotation to be sure that all updates finished successfully.
j.ObjectMeta.Annotations[StoppingAnnotation] = "true"
if err := c.Update(ctx, j.Object()); err != nil {
return false, fmt.Errorf("suspend: %w", err)
}
Expand All @@ -180,9 +186,8 @@ func (j *Job) Stop(ctx context.Context, c client.Client, podSetsInfo []podset.Po
}
}

if changed := j.RestorePodSetsInfo(podSetsInfo); !changed {
return stoppedNow, nil
}
j.RestorePodSetsInfo(podSetsInfo)
delete(j.ObjectMeta.Annotations, StoppingAnnotation)
if err := c.Update(ctx, j.Object()); err != nil {
return false, fmt.Errorf("restore info: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1692,6 +1692,7 @@ var _ = ginkgo.Describe("Job controller interacting with scheduler", ginkgo.Orde
ginkgo.By("job should be suspended and its parallelism restored", func() {
gomega.Eventually(func(g gomega.Gomega) {
g.Expect(k8sClient.Get(ctx, jobKey, createdJob)).Should(gomega.Succeed())
g.Expect(createdJob.Annotations[workloadjob.StoppingAnnotation]).ToNot(gomega.Equal("true"))
g.Expect(ptr.Deref(createdJob.Spec.Suspend, false)).To(gomega.BeTrue(), "the job should be suspended")
g.Expect(ptr.Deref(createdJob.Spec.Parallelism, 0)).To(gomega.BeEquivalentTo(5))
}, util.Timeout, util.Interval).Should(gomega.Succeed())
Expand Down

0 comments on commit fc7477d

Please sign in to comment.