diff --git a/cli/internal/core/scheduler.go b/cli/internal/core/scheduler.go index a2506932823b5..ad4458707749b 100644 --- a/cli/internal/core/scheduler.go +++ b/cli/internal/core/scheduler.go @@ -17,11 +17,11 @@ type Task struct { Deps util.Set // TopoDeps are dependencies across packages within the same topological graph (e.g. parent `build` -> child `build`) */ TopoDeps util.Set - Cache *bool - Run func(cwd string) error } -type scheduler struct { +type Visitor = func(taskID string) error + +type Scheduler struct { // TopologicGraph is a graph of workspaces TopologicGraph *dag.AcyclicGraph // TaskGraph is a graph of package-tasks @@ -30,15 +30,11 @@ type scheduler struct { Tasks map[string]*Task taskDeps [][]string PackageTaskDeps [][]string - // Concurrency is the number of concurrent tasks that can be executed - Concurrency int - // Parallel is whether to run tasks in parallel - Parallel bool } // NewScheduler creates a new scheduler given a topologic graph of workspace package names -func NewScheduler(topologicalGraph *dag.AcyclicGraph) *scheduler { - return &scheduler{ +func NewScheduler(topologicalGraph *dag.AcyclicGraph) *Scheduler { + return &Scheduler{ Tasks: make(map[string]*Task), TopologicGraph: topologicalGraph, TaskGraph: &dag.AcyclicGraph{}, @@ -53,15 +49,11 @@ type SchedulerExecutionOptions struct { Packages []string // TaskNames in the execution scope, if nil, all tasks will be executed TaskNames []string - // Concurrency is the number of concurrent tasks that can be executed - Concurrency int - // Parallel is whether to run tasks in parallel - Parallel bool // Restrict execution to only the listed task names TasksOnly bool } -func (p *scheduler) Prepare(options *SchedulerExecutionOptions) error { +func (p *Scheduler) Prepare(options *SchedulerExecutionOptions) error { pkgs := options.Packages if len(pkgs) == 0 { // TODO(gsoltis): Is this behavior only used in tests? @@ -72,15 +64,12 @@ func (p *scheduler) Prepare(options *SchedulerExecutionOptions) error { tasks := options.TaskNames if len(tasks) == 0 { + // TODO(gsoltis): Is this behavior used? for key := range p.Tasks { tasks = append(tasks, key) } } - p.Concurrency = options.Concurrency - - p.Parallel = options.Parallel - if err := p.generateTaskGraph(pkgs, tasks, options.TasksOnly); err != nil { return err } @@ -88,33 +77,32 @@ func (p *scheduler) Prepare(options *SchedulerExecutionOptions) error { return nil } +// ExecOpts controls a single walk of the task graph +type ExecOpts struct { + // Parallel is whether to run tasks in parallel + Parallel bool + // Concurrency is the number of concurrent tasks that can be executed + Concurrency int +} + // Execute executes the pipeline, constructing an internal task graph and walking it accordingly. -func (p *scheduler) Execute() []error { - var sema = util.NewSemaphore(p.Concurrency) +func (p *Scheduler) Execute(visitor Visitor, opts ExecOpts) []error { + var sema = util.NewSemaphore(opts.Concurrency) return p.TaskGraph.Walk(func(v dag.Vertex) error { // Always return if it is the root node if strings.Contains(dag.VertexName(v), ROOT_NODE_NAME) { return nil } // Acquire the semaphore unless parallel - if !p.Parallel { + if !opts.Parallel { sema.Acquire() defer sema.Release() } - // Find and run the task for the current vertex - _, taskName := util.GetPackageTaskFromId(dag.VertexName(v)) - task, ok := p.Tasks[taskName] - if !ok { - return fmt.Errorf("task %s not found", dag.VertexName(v)) - } - if task.Run != nil { - return task.Run(dag.VertexName(v)) - } - return nil + return visitor(dag.VertexName(v)) }) } -func (p *scheduler) generateTaskGraph(scope []string, taskNames []string, tasksOnly bool) error { +func (p *Scheduler) generateTaskGraph(scope []string, taskNames []string, tasksOnly bool) error { if p.PackageTaskDeps == nil { p.PackageTaskDeps = [][]string{} } @@ -238,12 +226,12 @@ func getPackageTaskDepsMap(packageTaskDeps [][]string) map[string][]string { return depMap } -func (p *scheduler) AddTask(task *Task) *scheduler { +func (p *Scheduler) AddTask(task *Task) *Scheduler { p.Tasks[task.Name] = task return p } -func (p *scheduler) AddDep(fromTaskId string, toTaskId string) *scheduler { +func (p *Scheduler) AddDep(fromTaskId string, toTaskId string) *Scheduler { p.PackageTaskDeps = append(p.PackageTaskDeps, []string{fromTaskId, toTaskId}) return p } diff --git a/cli/internal/core/scheduler_test.go b/cli/internal/core/scheduler_test.go index 94e8252f8d158..0245667c4fa5c 100644 --- a/cli/internal/core/scheduler_test.go +++ b/cli/internal/core/scheduler_test.go @@ -4,11 +4,17 @@ import ( "fmt" "strings" "testing" + "github.com/vercel/turborepo/cli/internal/util" "github.com/pyr-sh/dag" ) +func testVisitor(taskID string) error { + fmt.Println(taskID) + return nil +} + func TestSchedulerDefault(t *testing.T) { var g dag.AcyclicGraph g.Add("a") @@ -26,34 +32,18 @@ func TestSchedulerDefault(t *testing.T) { Name: "build", TopoDeps: topoDeps, Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "test", TopoDeps: topoDeps, Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "prepare", - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "side-quest", // not in the build/test tree Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) if _, ok := p.Tasks["build"]; !ok { @@ -65,18 +55,18 @@ func TestSchedulerDefault(t *testing.T) { } err := p.Prepare(&SchedulerExecutionOptions{ - Packages: nil, - TaskNames: []string{"test"}, - Concurrency: 10, - Parallel: false, - TasksOnly: false, + Packages: []string{"a", "b", "c"}, + TaskNames: []string{"test"}, + TasksOnly: false, }) if err != nil { t.Fatalf("%v", err) } - errs := p.Execute() + errs := p.Execute(testVisitor, ExecOpts{ + Concurrency: 10, + }) for _, err := range errs { t.Fatalf("%v", err) @@ -106,26 +96,14 @@ func TestSchedulerTasksOnly(t *testing.T) { Name: "build", TopoDeps: topoDeps, Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "test", TopoDeps: topoDeps, Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "prepare", - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) if _, ok := p.Tasks["build"]; !ok { @@ -137,18 +115,18 @@ func TestSchedulerTasksOnly(t *testing.T) { } err := p.Prepare(&SchedulerExecutionOptions{ - Packages: nil, - TaskNames: []string{"test"}, - Concurrency: 10, - Parallel: false, - TasksOnly: true, + Packages: []string{"a", "b", "c"}, + TaskNames: []string{"test"}, + TasksOnly: true, }) if err != nil { t.Fatalf("%v", err) } - errs := p.Execute() + errs := p.Execute(testVisitor, ExecOpts{ + Concurrency: 10, + }) for _, err := range errs { t.Fatalf("%v", err) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index 3bb44dceb7561..1f0f06c2fe243 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -3,6 +3,7 @@ package run import ( "bufio" gocontext "context" + "encoding/json" "flag" "fmt" "io" @@ -14,6 +15,7 @@ import ( "strconv" "strings" "sync" + "text/tabwriter" "time" "github.com/vercel/turborepo/cli/internal/analytics" @@ -69,6 +71,16 @@ type runSpec struct { Opts *RunOptions } +func (rs *runSpec) ArgsForTask(task string) []string { + passThroughArgs := make([]string, 0, len(rs.Opts.passThroughArgs)) + for _, target := range rs.Targets { + if target == task { + passThroughArgs = append(passThroughArgs, rs.Opts.passThroughArgs...) + } + } + return passThroughArgs +} + // Synopsis of run command func (c *RunCommand) Synopsis() string { return "Run a task" @@ -121,6 +133,9 @@ Options: (default false) --no-cache Avoid saving task results to the cache. Useful for development/watch tasks. (default false) + --dry/--dry-run[=json] List the packages in scope and the tasks that would be run, + but don't actually run them. Passing --dry=json or + --dry-run=json will render the output in JSON format. `) return strings.TrimSpace(helpText) } @@ -168,9 +183,6 @@ func (c *RunCommand) Run(args []string) int { c.logError(c.Config.Logger, "", fmt.Errorf("failed resolve packages to run %v", err)) } c.Config.Logger.Debug("global hash", "value", ctx.GlobalHash) - packagesInScope := filteredPkgs.UnsafeListOfStrings() - sort.Strings(packagesInScope) - c.Ui.Output(fmt.Sprintf(ui.Dim("• Packages in scope: %v"), strings.Join(packagesInScope, ", "))) c.Config.Logger.Debug("local cache folder", "path", runOptions.cacheFolder) fs.EnsureDir(runOptions.cacheFolder) @@ -193,18 +205,6 @@ func (c *RunCommand) Run(args []string) int { } func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.LanguageBackend, startAt time.Time) int { - goctx := gocontext.Background() - var analyticsSink analytics.Sink - if c.Config.IsLoggedIn() { - analyticsSink = c.Config.ApiClient - } else { - analyticsSink = analytics.NullSink - } - analyticsClient := analytics.NewClient(goctx, analyticsSink, c.Config.Logger.Named("analytics")) - defer analyticsClient.CloseWithTimeout(50 * time.Millisecond) - turboCache := cache.New(c.Config, analyticsClient) - defer turboCache.Shutdown() - var topoVisit []interface{} for _, node := range g.SCC { v := node[0] @@ -263,17 +263,85 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La } } - if rs.Opts.stream { - c.Ui.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len())))) + engine, err := buildTaskGraph(&g.TopologicalGraph, g.Pipeline, rs) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error preparing engine: %s", err)) + return 1 } - // TODO(gsoltis): I think this should be passed in, and close called from the calling function - // however, need to handle the graph case, which early-returns - runState := NewRunState(rs.Opts, startAt) - runState.Listen(c.Ui, time.Now()) - engine := core.NewScheduler(&g.TopologicalGraph) - colorCache := NewColorCache() - var logReplayWaitGroup sync.WaitGroup - for taskName, value := range g.Pipeline { + exitCode := 0 + if rs.Opts.dotGraph != "" { + err := c.generateDotGraph(engine.TaskGraph, filepath.Join(rs.Opts.cwd, rs.Opts.dotGraph)) + if err != nil { + c.logError(c.Config.Logger, "", err) + return 1 + } + } else if rs.Opts.dryRun { + tasksRun, err := c.executeDryRun(engine, g, rs, c.Config.Logger) + if err != nil { + c.logError(c.Config.Logger, "", err) + return 1 + } + packagesInScope := rs.FilteredPkgs.UnsafeListOfStrings() + sort.Strings(packagesInScope) + if rs.Opts.dryRunJson { + dryRun := &struct { + Packages []string `json:"packages"` + Tasks []hashedTask `json:"tasks"` + }{ + Packages: packagesInScope, + Tasks: tasksRun, + } + bytes, err := json.MarshalIndent(dryRun, "", " ") + if err != nil { + c.logError(c.Config.Logger, "", errors.Wrap(err, "failed to render to JSON")) + return 1 + } + c.Ui.Output(string(bytes)) + } else { + c.Ui.Output("") + c.Ui.Info(util.Sprintf("${CYAN}${BOLD}Packages in Scope${RESET}")) + p := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) + fmt.Fprintln(p, "Name\tPath\t") + for _, pkg := range packagesInScope { + fmt.Fprintln(p, fmt.Sprintf("%s\t%s\t", pkg, g.PackageInfos[pkg].Dir)) + } + p.Flush() + + c.Ui.Output("") + c.Ui.Info(util.Sprintf("${CYAN}${BOLD}Tasks to Run${RESET}")) + + for _, task := range tasksRun { + c.Ui.Info(util.Sprintf("${BOLD}%s${RESET}", task.TaskID)) + w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Task\t=\t%s\t${RESET}", task.Task)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Package\t=\t%s\t${RESET}", task.Package)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Hash\t=\t%s\t${RESET}", task.Hash)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Directory\t=\t%s\t${RESET}", task.Dir)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Command\t=\t%s\t${RESET}", task.Command)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Outputs\t=\t%s\t${RESET}", strings.Join(task.Outputs[1:], ", "))) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Log File\t=\t%s\t${RESET}", task.LogFile)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Dependencies\t=\t%s\t${RESET}", strings.Join(task.Dependencies, ", "))) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Dependendents\t=\t%s\t${RESET}", strings.Join(task.Dependents, ", "))) + w.Flush() + } + + } + } else { + packagesInScope := rs.FilteredPkgs.UnsafeListOfStrings() + sort.Strings(packagesInScope) + c.Ui.Output(fmt.Sprintf(ui.Dim("• Packages in scope: %v"), strings.Join(packagesInScope, ", "))) + if rs.Opts.stream { + c.Ui.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len())))) + } + exitCode = c.executeTasks(g, rs, engine, backend, startAt) + } + + return exitCode +} + +func buildTaskGraph(topoGraph *dag.AcyclicGraph, pipeline map[string]fs.Pipeline, rs *runSpec) (*core.Scheduler, error) { + engine := core.NewScheduler(topoGraph) + for taskName, value := range pipeline { topoDeps := make(util.Set) deps := make(util.Set) if util.IsPackageTask(taskName) { @@ -305,328 +373,21 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La } } - targetBaseUI := &cli.ConcurrentUi{Ui: c.Ui} engine.AddTask(&core.Task{ Name: taskName, TopoDeps: topoDeps, Deps: deps, - Cache: value.Cache, - Run: func(id string) error { - cmdTime := time.Now() - name, task := util.GetPackageTaskFromId(id) - pack := g.PackageInfos[name] - targetLogger := c.Config.Logger.Named(fmt.Sprintf("%v:%v", pack.Name, task)) - defer targetLogger.ResetNamed(pack.Name) - targetLogger.Debug("start") - - // bail if the script doesn't exist - if _, ok := pack.Scripts[task]; !ok { - targetLogger.Debug("no task in package, skipping") - targetLogger.Debug("done", "status", "skipped", "duration", time.Since(cmdTime)) - return nil - } - - // Setup tracer - tracer := runState.Run(util.GetTaskId(pack.Name, task)) - - // Create a logger - pref := colorCache.PrefixColor(pack.Name) - actualPrefix := pref("%s:%s: ", pack.Name, task) - targetUi := &cli.PrefixedUi{ - Ui: targetBaseUI, - OutputPrefix: actualPrefix, - InfoPrefix: actualPrefix, - ErrorPrefix: actualPrefix, - WarnPrefix: actualPrefix, - } - // Hash --------------------------------------------- - // first check for package-tasks - pipeline, ok := g.Pipeline[fmt.Sprintf("%v", id)] - if !ok { - // then check for regular tasks - altpipe, notcool := g.Pipeline[task] - // if neither, then bail - if !notcool && !ok { - return nil - } - // override if we need to... - pipeline = altpipe - } - - outputs := []string{fmt.Sprintf(".turbo/turbo-%v.log", task)} - if pipeline.Outputs == nil { - outputs = append(outputs, "dist/**/*", "build/**/*") - } else { - outputs = append(outputs, pipeline.Outputs...) - } - targetLogger.Debug("task output globs", "outputs", outputs) - - passThroughArgs := make([]string, 0, len(rs.Opts.passThroughArgs)) - for _, target := range rs.Targets { - if target == task { - passThroughArgs = append(passThroughArgs, rs.Opts.passThroughArgs...) - } - } - - // Hash the task-specific environment variables found in the dependsOnKey in the pipeline - var hashableEnvVars []string - var hashableEnvPairs []string - if len(pipeline.DependsOn) > 0 { - for _, v := range pipeline.DependsOn { - if strings.Contains(v, ENV_PIPELINE_DELIMITER) { - trimmed := strings.TrimPrefix(v, ENV_PIPELINE_DELIMITER) - hashableEnvPairs = append(hashableEnvPairs, fmt.Sprintf("%v=%v", trimmed, os.Getenv(trimmed))) - hashableEnvVars = append(hashableEnvVars, trimmed) - } - } - sort.Strings(hashableEnvVars) // always sort them - } - targetLogger.Debug("hashable env vars", "vars", hashableEnvVars) - hashable := struct { - Hash string - Task string - Outputs []string - PassThruArgs []string - HashableEnvPairs []string - }{ - Hash: pack.Hash, - Task: task, - Outputs: outputs, - PassThruArgs: passThroughArgs, - HashableEnvPairs: hashableEnvPairs, - } - hash, err := fs.HashObject(hashable) - targetLogger.Debug("task hash", "value", hash) - if err != nil { - targetUi.Error(fmt.Sprintf("Hashing error: %v", err)) - // @TODO probably should abort fatally??? - } - logFileName := filepath.Join(pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task)) - targetLogger.Debug("log file", "path", filepath.Join(rs.Opts.cwd, logFileName)) - - // Cache --------------------------------------------- - var hit bool - if !rs.Opts.forceExecution { - hit, _, _, err = turboCache.Fetch(rs.Opts.cwd, hash, nil) - if err != nil { - targetUi.Error(fmt.Sprintf("error fetching from cache: %s", err)) - } else if hit { - if rs.Opts.stream && fs.FileExists(filepath.Join(rs.Opts.cwd, logFileName)) { - logReplayWaitGroup.Add(1) - go replayLogs(targetLogger, targetBaseUI, rs.Opts, logFileName, hash, &logReplayWaitGroup, false) - } - targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime)) - tracer(TargetCached, nil) - - return nil - } - if rs.Opts.stream { - targetUi.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(hash))) - } - } else { - if rs.Opts.stream { - targetUi.Output(fmt.Sprintf("cache bypass, force executing %s", ui.Dim(hash))) - } - } - - // Setup command execution - argsactual := append([]string{"run"}, task) - argsactual = append(argsactual, passThroughArgs...) - // @TODO: @jaredpalmer fix this hack to get the package manager's name - var cmd *exec.Cmd - if backend.Name == "nodejs-berry" { - cmd = exec.Command("yarn", argsactual...) - } else { - cmd = exec.Command(strings.TrimPrefix(backend.Name, "nodejs-"), argsactual...) - } - cmd.Dir = pack.Dir - envs := fmt.Sprintf("TURBO_HASH=%v", hash) - cmd.Env = append(os.Environ(), envs) - - // Setup stdout/stderr - // If we are not caching anything, then we don't need to write logs to disk - // be careful about this conditional given the default of cache = true - var writer io.Writer - if !rs.Opts.cache || (pipeline.Cache != nil && !*pipeline.Cache) { - writer = os.Stdout - } else { - // Setup log file - if err := fs.EnsureDir(logFileName); err != nil { - tracer(TargetBuildFailed, err) - c.logError(targetLogger, actualPrefix, err) - if rs.Opts.bail { - os.Exit(1) - } - } - output, err := os.Create(logFileName) - if err != nil { - tracer(TargetBuildFailed, err) - c.logError(targetLogger, actualPrefix, err) - if rs.Opts.bail { - os.Exit(1) - } - } - defer output.Close() - bufWriter := bufio.NewWriter(output) - bufWriter.WriteString(fmt.Sprintf("%scache hit, replaying output %s\n", actualPrefix, ui.Dim(hash))) - defer bufWriter.Flush() - writer = io.MultiWriter(os.Stdout, bufWriter) - } - - logger := log.New(writer, "", 0) - // Setup a streamer that we'll pipe cmd.Stdout to - logStreamerOut := logstreamer.NewLogstreamer(logger, actualPrefix, false) - // Setup a streamer that we'll pipe cmd.Stderr to. - logStreamerErr := logstreamer.NewLogstreamer(logger, actualPrefix, false) - cmd.Stderr = logStreamerErr - cmd.Stdout = logStreamerOut - // Flush/Reset any error we recorded - logStreamerErr.FlushRecord() - logStreamerOut.FlushRecord() - - // Run the command - if err := c.Processes.Exec(cmd); err != nil { - // if we already know we're in the process of exiting, - // we don't need to record an error to that effect. - if errors.Is(err, process.ErrClosing) { - return nil - } - tracer(TargetBuildFailed, err) - targetLogger.Error("Error: command finished with error: %w", err) - if rs.Opts.bail { - if rs.Opts.stream { - targetUi.Error(fmt.Sprintf("Error: command finished with error: %s", err)) - } else { - f, err := os.Open(filepath.Join(rs.Opts.cwd, logFileName)) - if err != nil { - targetUi.Warn(fmt.Sprintf("failed reading logs: %v", err)) - } - defer f.Close() - scan := bufio.NewScanner(f) - targetBaseUI.Error("") - targetBaseUI.Error(util.Sprintf("%s ${RED}%s finished with error${RESET}", ui.ERROR_PREFIX, util.GetTaskId(pack.Name, task))) - targetBaseUI.Error("") - for scan.Scan() { - targetBaseUI.Output(util.Sprintf("${RED}%s:%s: ${RESET}%s", pack.Name, task, scan.Bytes())) //Writing to Stdout - } - } - c.Processes.Close() - } else { - if rs.Opts.stream { - targetUi.Warn("command finished with error, but continuing...") - } - } - return err - } - - // Cache command outputs - if rs.Opts.cache && (pipeline.Cache == nil || *pipeline.Cache) { - targetLogger.Debug("caching output", "outputs", outputs) - ignore := []string{} - filesToBeCached := globby.GlobFiles(pack.Dir, outputs, ignore) - if err := turboCache.Put(pack.Dir, hash, int(time.Since(cmdTime).Milliseconds()), filesToBeCached); err != nil { - c.logError(targetLogger, "", fmt.Errorf("error caching output: %w", err)) - } - } - - // Clean up tracing - tracer(TargetBuilt, nil) - targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime)) - return nil - }, }) } if err := engine.Prepare(&core.SchedulerExecutionOptions{ - Packages: rs.FilteredPkgs.UnsafeListOfStrings(), - TaskNames: rs.Targets, - Concurrency: rs.Opts.concurrency, - Parallel: rs.Opts.parallel, - TasksOnly: rs.Opts.only, + Packages: rs.FilteredPkgs.UnsafeListOfStrings(), + TaskNames: rs.Targets, + TasksOnly: rs.Opts.only, }); err != nil { - c.Ui.Error(fmt.Sprintf("Error preparing engine: %s", err)) - return 1 - } - - if rs.Opts.dotGraph != "" { - graphString := string(engine.TaskGraph.Dot(&dag.DotOpts{ - Verbose: true, - DrawCycles: true, - })) - ext := filepath.Ext(rs.Opts.dotGraph) - if ext == ".html" { - f, err := os.Create(filepath.Join(rs.Opts.cwd, rs.Opts.dotGraph)) - if err != nil { - c.logError(c.Config.Logger, "", fmt.Errorf("error writing graph: %w", err)) - return 1 - } - defer f.Close() - f.WriteString(` - -
- -