Skip to content

Commit 12457e7

Browse files
committed
feat: add lock update mechanism for jobs, closes go-co-op#762
- Add `jobOutUpdateLockRequest` channel in the executor - Implement lock update requests in the job execution process - Add `Lock()` method to the `Job` interface - Update the scheduler to handle lock update requests - Add a test case to verify the new locking mechanism
1 parent c180381 commit 12457e7

File tree

4 files changed

+88
-7
lines changed

4 files changed

+88
-7
lines changed

executor.go

+18-2
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ type executor struct {
3030
jobsOutCompleted chan uuid.UUID
3131
// used to request jobs from the scheduler
3232
jobOutRequest chan jobOutRequest
33+
// used to request jobs from the scheduler
34+
jobOutUpdateLockRequest chan jobOutUpdateLockRequest
3335

3436
// used by the executor to receive a stop signal from the scheduler
3537
stopCh chan struct{}
@@ -378,7 +380,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
378380
e.incrementJobCounter(j, Skip)
379381
return
380382
}
381-
defer func() { _ = lock.Unlock(j.ctx) }()
383+
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
384+
id: j.id,
385+
lock: lock,
386+
}
387+
388+
defer func() {
389+
_ = lock.Unlock(j.ctx)
390+
}()
382391
} else if e.locker != nil {
383392
lock, err := e.locker.Lock(j.ctx, j.name)
384393
if err != nil {
@@ -387,7 +396,14 @@ func (e *executor) runJob(j internalJob, jIn jobIn) {
387396
e.incrementJobCounter(j, Skip)
388397
return
389398
}
390-
defer func() { _ = lock.Unlock(j.ctx) }()
399+
e.jobOutUpdateLockRequest <- jobOutUpdateLockRequest{
400+
id: j.id,
401+
lock: lock,
402+
}
403+
404+
defer func() {
405+
_ = lock.Unlock(j.ctx)
406+
}()
391407
}
392408
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
393409

job.go

+8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type internalJob struct {
3030
nextScheduled []time.Time
3131

3232
lastRun time.Time
33+
lastLock Lock
3334
function any
3435
parameters []any
3536
timer clockwork.Timer
@@ -1026,6 +1027,7 @@ type Job interface {
10261027
RunNow() error
10271028
// Tags returns the job's string tags.
10281029
Tags() []string
1030+
Lock() Lock
10291031
}
10301032

10311033
var _ Job = (*job)(nil)
@@ -1126,3 +1128,9 @@ func (j job) RunNow() error {
11261128
}
11271129
return err
11281130
}
1131+
1132+
func (j job) Lock() Lock {
1133+
ij := requestJob(j.id, j.jobOutRequest)
1134+
1135+
return ij.lastLock
1136+
}

scheduler.go

+21-5
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ type jobOutRequest struct {
107107
outChan chan internalJob
108108
}
109109

110+
type jobOutUpdateLockRequest struct {
111+
id uuid.UUID
112+
lock Lock
113+
}
114+
110115
type runJobRequest struct {
111116
id uuid.UUID
112117
outChan chan error
@@ -131,11 +136,12 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
131136
logger: &noOpLogger{},
132137
clock: clockwork.NewRealClock(),
133138

134-
jobsIn: make(chan jobIn),
135-
jobsOutForRescheduling: make(chan uuid.UUID),
136-
jobsOutCompleted: make(chan uuid.UUID),
137-
jobOutRequest: make(chan jobOutRequest, 1000),
138-
done: make(chan error),
139+
jobsIn: make(chan jobIn),
140+
jobsOutForRescheduling: make(chan uuid.UUID),
141+
jobsOutCompleted: make(chan uuid.UUID),
142+
jobOutRequest: make(chan jobOutRequest, 1000),
143+
jobOutUpdateLockRequest: make(chan jobOutUpdateLockRequest),
144+
done: make(chan error),
139145
}
140146

141147
s := &scheduler{
@@ -190,6 +196,9 @@ func NewScheduler(options ...SchedulerOption) (Scheduler, error) {
190196
case out := <-s.jobOutRequestCh:
191197
s.selectJobOutRequest(out)
192198

199+
case out := <-s.exec.jobOutUpdateLockRequest:
200+
s.jobOutUpdateLockRequest(out)
201+
193202
case out := <-s.allJobsOutRequest:
194203
s.selectAllJobsOutRequest(out)
195204

@@ -434,6 +443,13 @@ func (s *scheduler) selectJobOutRequest(out jobOutRequest) {
434443
close(out.outChan)
435444
}
436445

446+
func (s *scheduler) jobOutUpdateLockRequest(out jobOutUpdateLockRequest) {
447+
if j, ok := s.jobs[out.id]; ok {
448+
j.lastLock = out.lock
449+
s.jobs[out.id] = j
450+
}
451+
}
452+
437453
func (s *scheduler) selectNewJob(in newJobIn) {
438454
j := in.job
439455
if s.started {

scheduler_test.go

+41
Original file line numberDiff line numberDiff line change
@@ -2633,3 +2633,44 @@ func TestScheduler_WithMonitor(t *testing.T) {
26332633
})
26342634
}
26352635
}
2636+
2637+
func TestJob_Lock(t *testing.T) {
2638+
locker := &testLocker{
2639+
notLocked: make(chan struct{}, 1),
2640+
}
2641+
2642+
s := newTestScheduler(t,
2643+
WithDistributedLocker(locker),
2644+
)
2645+
2646+
jobRan := make(chan struct{})
2647+
j, err := s.NewJob(
2648+
DurationJob(time.Millisecond*100),
2649+
NewTask(func() {
2650+
time.Sleep(50 * time.Millisecond)
2651+
jobRan <- struct{}{}
2652+
}),
2653+
)
2654+
require.NoError(t, err)
2655+
2656+
s.Start()
2657+
defer s.Shutdown()
2658+
2659+
select {
2660+
case <-jobRan:
2661+
// Job has run
2662+
case <-time.After(200 * time.Millisecond):
2663+
t.Fatal("Job did not run in time")
2664+
}
2665+
2666+
require.Eventually(t, func() bool {
2667+
if locker.jobLocked {
2668+
return true
2669+
}
2670+
2671+
return false
2672+
}, 200*time.Millisecond, 100*time.Millisecond, "Job should be locked")
2673+
2674+
lock := j.Lock()
2675+
assert.NotNil(t, lock, "Job Lock() should return a non-nil Locker")
2676+
}

0 commit comments

Comments
 (0)