diff --git a/.github/workflows/nightly-tests.yml b/.github/workflows/nightly-tests.yml new file mode 100644 index 00000000..f24fdf8e --- /dev/null +++ b/.github/workflows/nightly-tests.yml @@ -0,0 +1,34 @@ +name: Nightly Tests + +on: + schedule: + - cron: '0 3 * * *' + workflow_dispatch: + +env: + GOPROXY: https://proxy.golang.org/ + +jobs: + test: + runs-on: ubuntu-latest + timeout-minutes: 60 + strategy: + fail-fast: false + steps: + - + name: Checkout + uses: actions/checkout@v3 + - + name: Unshallow + run: git fetch --prune --unshallow + - + name: Set up Go + uses: actions/setup-go@v3 + with: + go-version-file: ".go-version" + - + name: Go mod download + run: go mod download -x + - + name: Run long tests + run: go test -timeout=30m -tags=longtest ./... diff --git a/internal/scheduler/scheduler_long_test.go b/internal/scheduler/scheduler_long_test.go new file mode 100644 index 00000000..91ad145a --- /dev/null +++ b/internal/scheduler/scheduler_long_test.go @@ -0,0 +1,83 @@ +//go:build longtest + +package scheduler + +import ( + "context" + "fmt" + "path/filepath" + "sync" + "testing" + + "github.com/hashicorp/terraform-ls/internal/document" + "github.com/hashicorp/terraform-ls/internal/job" + "github.com/hashicorp/terraform-ls/internal/state" +) + +// See https://github.com/hashicorp/terraform-ls/issues/1065 +// This test can be very expensive to run in terms of CPU, memory and time. +// It takes about 3-4 minutes to finish on M1 Pro. +func TestScheduler_millionJobsQueued(t *testing.T) { + ss, err := state.NewStateStore() + if err != nil { + t.Fatal(err) + } + ss.SetLogger(testLogger()) + + tmpDir := t.TempDir() + ctx, cancelFunc := context.WithCancel(context.Background()) + + lowPrioSched := NewScheduler(ss.JobStore, 1, job.LowPriority) + lowPrioSched.Start(ctx) + t.Cleanup(func() { + lowPrioSched.Stop() + cancelFunc() + }) + + highPrioSched := NewScheduler(ss.JobStore, 100, job.HighPriority) + + // slightly over ~1M jobs seems sufficient to exceed the goroutine stack limit + idBatches := make([]job.IDs, 106, 106) + var wg sync.WaitGroup + for i := 0; i <= 105; i++ { + wg.Add(1) + i := i + go func(i int) { + defer wg.Done() + idBatches[i] = make(job.IDs, 0) + for j := 0; j < 10000; j++ { + dirPath := filepath.Join(tmpDir, fmt.Sprintf("folder-%d", j)) + + newId, err := ss.JobStore.EnqueueJob(job.Job{ + Func: func(c context.Context) error { + return nil + }, + Dir: document.DirHandleFromPath(dirPath), + Type: "test", + Priority: job.HighPriority, + }) + if err != nil { + t.Error(err) + } + idBatches[i] = append(idBatches[i], newId) + } + t.Logf("scheduled %d high priority jobs in batch %d", len(idBatches[i]), i) + }(i) + } + wg.Wait() + + highPrioSched.Start(ctx) + t.Log("high priority scheduler started") + + t.Cleanup(func() { + highPrioSched.Stop() + cancelFunc() + }) + + for _, batch := range idBatches { + err = ss.JobStore.WaitForJobs(ctx, batch...) + if err != nil { + t.Fatal(err) + } + } +} diff --git a/internal/state/jobs.go b/internal/state/jobs.go index 26a7ae8e..c07ee2ff 100644 --- a/internal/state/jobs.go +++ b/internal/state/jobs.go @@ -228,38 +228,41 @@ func (js *JobStore) AwaitNextJob(ctx context.Context, priority job.JobPriority) } func (js *JobStore) awaitNextJob(ctx context.Context, priority job.JobPriority) (job.ID, job.Job, error) { - txn := js.db.Txn(false) + var sJob *ScheduledJob + for { + txn := js.db.Txn(false) + wCh, obj, err := txn.FirstWatch(js.tableName, "priority_dependecies_state", priority, 0, StateQueued) + if err != nil { + return "", job.Job{}, err + } - wCh, obj, err := txn.FirstWatch(js.tableName, "priority_dependecies_state", priority, 0, StateQueued) - if err != nil { - return "", job.Job{}, err - } + if obj == nil { + select { + case <-wCh: + case <-ctx.Done(): + return "", job.Job{}, ctx.Err() + } - if obj == nil { - select { - case <-wCh: - case <-ctx.Done(): - return "", job.Job{}, ctx.Err() + js.logger.Printf("retrying on obj is nil") + continue } - return js.awaitNextJob(ctx, priority) - } - - sJob := obj.(*ScheduledJob) + sJob = obj.(*ScheduledJob) - err = js.markJobAsRunning(sJob) - if err != nil { - // Although we hold a write db-wide lock when marking job as running - // we may still end up passing the same job from the above read-only - // transaction, which does *not* hold a db-wide lock. - // - // Instead of adding more sync primitives here we simply retry. - if errors.Is(err, jobAlreadyRunning{ID: sJob.ID}) || errors.Is(err, jobNotFound{ID: sJob.ID}) { - js.logger.Printf("retrying next job: %s", err) - return js.awaitNextJob(ctx, priority) + err = js.markJobAsRunning(sJob) + if err != nil { + // Although we hold a write db-wide lock when marking job as running + // we may still end up passing the same job from the above read-only + // transaction, which does *not* hold a db-wide lock. + // + // Instead of adding more sync primitives here we simply retry. + if errors.Is(err, jobAlreadyRunning{ID: sJob.ID}) || errors.Is(err, jobNotFound{ID: sJob.ID}) { + js.logger.Printf("retrying next job: %s", err) + continue + } + return "", job.Job{}, err } - - return "", job.Job{}, err + break } js.logger.Printf("JOBS: Dispatching next job %q (scheduler prio: %d, job prio: %d, isDirOpen: %t): %q for %q",