Skip to content

Commit

Permalink
Merge pull request #1084 from hashicorp/b-fix-jobstore-recursion
Browse files Browse the repository at this point in the history
state: avoid infinite recursion in JobStore
  • Loading branch information
radeksimko authored Oct 11, 2022
2 parents 81507c5 + c8ad71e commit e8397dc
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 26 deletions.
34 changes: 34 additions & 0 deletions .github/workflows/nightly-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: Nightly Tests

on:
schedule:
- cron: '0 3 * * *'
workflow_dispatch:

env:
GOPROXY: https://proxy.golang.org/

jobs:
test:
runs-on: ubuntu-latest
timeout-minutes: 60
strategy:
fail-fast: false
steps:
-
name: Checkout
uses: actions/checkout@v3
-
name: Unshallow
run: git fetch --prune --unshallow
-
name: Set up Go
uses: actions/setup-go@v3
with:
go-version-file: ".go-version"
-
name: Go mod download
run: go mod download -x
-
name: Run long tests
run: go test -timeout=30m -tags=longtest ./...
83 changes: 83 additions & 0 deletions internal/scheduler/scheduler_long_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//go:build longtest

package scheduler

import (
"context"
"fmt"
"path/filepath"
"sync"
"testing"

"github.com/hashicorp/terraform-ls/internal/document"
"github.com/hashicorp/terraform-ls/internal/job"
"github.com/hashicorp/terraform-ls/internal/state"
)

// See https://github.com/hashicorp/terraform-ls/issues/1065
// This test can be very expensive to run in terms of CPU, memory and time.
// It takes about 3-4 minutes to finish on M1 Pro.
func TestScheduler_millionJobsQueued(t *testing.T) {
ss, err := state.NewStateStore()
if err != nil {
t.Fatal(err)
}
ss.SetLogger(testLogger())

tmpDir := t.TempDir()
ctx, cancelFunc := context.WithCancel(context.Background())

lowPrioSched := NewScheduler(ss.JobStore, 1, job.LowPriority)
lowPrioSched.Start(ctx)
t.Cleanup(func() {
lowPrioSched.Stop()
cancelFunc()
})

highPrioSched := NewScheduler(ss.JobStore, 100, job.HighPriority)

// slightly over ~1M jobs seems sufficient to exceed the goroutine stack limit
idBatches := make([]job.IDs, 106, 106)
var wg sync.WaitGroup
for i := 0; i <= 105; i++ {
wg.Add(1)
i := i
go func(i int) {
defer wg.Done()
idBatches[i] = make(job.IDs, 0)
for j := 0; j < 10000; j++ {
dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-%d", j))

newId, err := ss.JobStore.EnqueueJob(job.Job{
Func: func(c context.Context) error {
return nil
},
Dir: document.DirHandleFromPath(dirPath),
Type: "test",
Priority: job.HighPriority,
})
if err != nil {
t.Error(err)
}
idBatches[i] = append(idBatches[i], newId)
}
t.Logf("scheduled %d high priority jobs in batch %d", len(idBatches[i]), i)
}(i)
}
wg.Wait()

highPrioSched.Start(ctx)
t.Log("high priority scheduler started")

t.Cleanup(func() {
highPrioSched.Stop()
cancelFunc()
})

for _, batch := range idBatches {
err = ss.JobStore.WaitForJobs(ctx, batch...)
if err != nil {
t.Fatal(err)
}
}
}
55 changes: 29 additions & 26 deletions internal/state/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,38 +228,41 @@ func (js *JobStore) AwaitNextJob(ctx context.Context, priority job.JobPriority)
}

func (js *JobStore) awaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) {
txn := js.db.Txn(false)
var sJob *ScheduledJob
for {
txn := js.db.Txn(false)
wCh, obj, err := txn.FirstWatch(js.tableName, "priority_dependecies_state", priority, 0, StateQueued)
if err != nil {
return "", job.Job{}, err
}

wCh, obj, err := txn.FirstWatch(js.tableName, "priority_dependecies_state", priority, 0, StateQueued)
if err != nil {
return "", job.Job{}, err
}
if obj == nil {
select {
case <-wCh:
case <-ctx.Done():
return "", job.Job{}, ctx.Err()
}

if obj == nil {
select {
case <-wCh:
case <-ctx.Done():
return "", job.Job{}, ctx.Err()
js.logger.Printf("retrying on obj is nil")
continue
}

return js.awaitNextJob(ctx, priority)
}

sJob := obj.(*ScheduledJob)
sJob = obj.(*ScheduledJob)

err = js.markJobAsRunning(sJob)
if err != nil {
// Although we hold a write db-wide lock when marking job as running
// we may still end up passing the same job from the above read-only
// transaction, which does *not* hold a db-wide lock.
//
// Instead of adding more sync primitives here we simply retry.
if errors.Is(err, jobAlreadyRunning{ID: sJob.ID}) || errors.Is(err, jobNotFound{ID: sJob.ID}) {
js.logger.Printf("retrying next job: %s", err)
return js.awaitNextJob(ctx, priority)
err = js.markJobAsRunning(sJob)
if err != nil {
// Although we hold a write db-wide lock when marking job as running
// we may still end up passing the same job from the above read-only
// transaction, which does *not* hold a db-wide lock.
//
// Instead of adding more sync primitives here we simply retry.
if errors.Is(err, jobAlreadyRunning{ID: sJob.ID}) || errors.Is(err, jobNotFound{ID: sJob.ID}) {
js.logger.Printf("retrying next job: %s", err)
continue
}
return "", job.Job{}, err
}

return "", job.Job{}, err
break
}

js.logger.Printf("JOBS: Dispatching next job %q (scheduler prio: %d, job prio: %d, isDirOpen: %t): %q for %q",
Expand Down

0 comments on commit e8397dc

Please sign in to comment.