From dc5c66362a4ce18bba05dcb40356e557b295e3c8 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 10 Mar 2022 10:28:10 -0800 Subject: [PATCH 1/9] Move visitor to Execute on scheduler, rather than the task itself. Allows reuse of the graph. Split out the execution task, as well as the dot graph function --- cli/internal/core/scheduler.go | 41 +- cli/internal/core/scheduler_test.go | 58 +-- cli/internal/run/run.go | 640 +++++++++++++++------------- 3 files changed, 371 insertions(+), 368 deletions(-) diff --git a/cli/internal/core/scheduler.go b/cli/internal/core/scheduler.go index a2506932823b5..30c96675a9a5c 100644 --- a/cli/internal/core/scheduler.go +++ b/cli/internal/core/scheduler.go @@ -17,10 +17,10 @@ type Task struct { Deps util.Set // TopoDeps are dependencies across packages within the same topological graph (e.g. parent `build` -> child `build`) */ TopoDeps util.Set - Cache *bool - Run func(cwd string) error } +type Visitor = func(taskID string) error + type scheduler struct { // TopologicGraph is a graph of workspaces TopologicGraph *dag.AcyclicGraph @@ -30,10 +30,6 @@ type scheduler struct { Tasks map[string]*Task taskDeps [][]string PackageTaskDeps [][]string - // Concurrency is the number of concurrent tasks that can be executed - Concurrency int - // Parallel is whether to run tasks in parallel - Parallel bool } // NewScheduler creates a new scheduler given a topologic graph of workspace package names @@ -53,10 +49,6 @@ type SchedulerExecutionOptions struct { Packages []string // TaskNames in the execution scope, if nil, all tasks will be executed TaskNames []string - // Concurrency is the number of concurrent tasks that can be executed - Concurrency int - // Parallel is whether to run tasks in parallel - Parallel bool // Restrict execution to only the listed task names TasksOnly bool } @@ -77,10 +69,6 @@ func (p *scheduler) Prepare(options *SchedulerExecutionOptions) error { } } - p.Concurrency = options.Concurrency - - p.Parallel = options.Parallel - if err := p.generateTaskGraph(pkgs, tasks, options.TasksOnly); err != nil { return err } @@ -88,29 +76,28 @@ func (p *scheduler) Prepare(options *SchedulerExecutionOptions) error { return nil } +// ExecOpts controls a single walk of the task graph +type ExecOpts struct { + // Parallel is whether to run tasks in parallel + Parallel bool + // Concurrency is the number of concurrent tasks that can be executed + Concurrency int +} + // Execute executes the pipeline, constructing an internal task graph and walking it accordingly. -func (p *scheduler) Execute() []error { - var sema = util.NewSemaphore(p.Concurrency) +func (p *scheduler) Execute(visitor Visitor, opts ExecOpts) []error { + var sema = util.NewSemaphore(opts.Concurrency) return p.TaskGraph.Walk(func(v dag.Vertex) error { // Always return if it is the root node if strings.Contains(dag.VertexName(v), ROOT_NODE_NAME) { return nil } // Acquire the semaphore unless parallel - if !p.Parallel { + if !opts.Parallel { sema.Acquire() defer sema.Release() } - // Find and run the task for the current vertex - _, taskName := util.GetPackageTaskFromId(dag.VertexName(v)) - task, ok := p.Tasks[taskName] - if !ok { - return fmt.Errorf("task %s not found", dag.VertexName(v)) - } - if task.Run != nil { - return task.Run(dag.VertexName(v)) - } - return nil + return visitor(dag.VertexName(v)) }) } diff --git a/cli/internal/core/scheduler_test.go b/cli/internal/core/scheduler_test.go index 94e8252f8d158..2fedeeb02123a 100644 --- a/cli/internal/core/scheduler_test.go +++ b/cli/internal/core/scheduler_test.go @@ -4,11 +4,17 @@ import ( "fmt" "strings" "testing" + "github.com/vercel/turborepo/cli/internal/util" "github.com/pyr-sh/dag" ) +func testVisitor(taskID string) error { + fmt.Println(taskID) + return nil +} + func TestSchedulerDefault(t *testing.T) { var g dag.AcyclicGraph g.Add("a") @@ -26,34 +32,18 @@ func TestSchedulerDefault(t *testing.T) { Name: "build", TopoDeps: topoDeps, Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "test", TopoDeps: topoDeps, Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "prepare", - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "side-quest", // not in the build/test tree Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) if _, ok := p.Tasks["build"]; !ok { @@ -65,18 +55,18 @@ func TestSchedulerDefault(t *testing.T) { } err := p.Prepare(&SchedulerExecutionOptions{ - Packages: nil, - TaskNames: []string{"test"}, - Concurrency: 10, - Parallel: false, - TasksOnly: false, + Packages: nil, + TaskNames: []string{"test"}, + TasksOnly: false, }) if err != nil { t.Fatalf("%v", err) } - errs := p.Execute() + errs := p.Execute(testVisitor, ExecOpts{ + Concurrency: 10, + }) for _, err := range errs { t.Fatalf("%v", err) @@ -106,26 +96,14 @@ func TestSchedulerTasksOnly(t *testing.T) { Name: "build", TopoDeps: topoDeps, Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "test", TopoDeps: topoDeps, Deps: deps, - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) p.AddTask(&Task{ Name: "prepare", - Run: func(cwd string) error { - fmt.Println(cwd) - return nil - }, }) if _, ok := p.Tasks["build"]; !ok { @@ -137,18 +115,18 @@ func TestSchedulerTasksOnly(t *testing.T) { } err := p.Prepare(&SchedulerExecutionOptions{ - Packages: nil, - TaskNames: []string{"test"}, - Concurrency: 10, - Parallel: false, - TasksOnly: true, + Packages: nil, + TaskNames: []string{"test"}, + TasksOnly: true, }) if err != nil { t.Fatalf("%v", err) } - errs := p.Execute() + errs := p.Execute(testVisitor, ExecOpts{ + Concurrency: 10, + }) for _, err := range errs { t.Fatalf("%v", err) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index 3bb44dceb7561..c69a7ecaba66b 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -193,18 +193,6 @@ func (c *RunCommand) Run(args []string) int { } func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.LanguageBackend, startAt time.Time) int { - goctx := gocontext.Background() - var analyticsSink analytics.Sink - if c.Config.IsLoggedIn() { - analyticsSink = c.Config.ApiClient - } else { - analyticsSink = analytics.NullSink - } - analyticsClient := analytics.NewClient(goctx, analyticsSink, c.Config.Logger.Named("analytics")) - defer analyticsClient.CloseWithTimeout(50 * time.Millisecond) - turboCache := cache.New(c.Config, analyticsClient) - defer turboCache.Shutdown() - var topoVisit []interface{} for _, node := range g.SCC { v := node[0] @@ -266,13 +254,8 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La if rs.Opts.stream { c.Ui.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len())))) } - // TODO(gsoltis): I think this should be passed in, and close called from the calling function - // however, need to handle the graph case, which early-returns - runState := NewRunState(rs.Opts, startAt) - runState.Listen(c.Ui, time.Now()) + engine := core.NewScheduler(&g.TopologicalGraph) - colorCache := NewColorCache() - var logReplayWaitGroup sync.WaitGroup for taskName, value := range g.Pipeline { topoDeps := make(util.Set) deps := make(util.Set) @@ -305,307 +288,61 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La } } - targetBaseUI := &cli.ConcurrentUi{Ui: c.Ui} engine.AddTask(&core.Task{ Name: taskName, TopoDeps: topoDeps, Deps: deps, - Cache: value.Cache, - Run: func(id string) error { - cmdTime := time.Now() - name, task := util.GetPackageTaskFromId(id) - pack := g.PackageInfos[name] - targetLogger := c.Config.Logger.Named(fmt.Sprintf("%v:%v", pack.Name, task)) - defer targetLogger.ResetNamed(pack.Name) - targetLogger.Debug("start") - - // bail if the script doesn't exist - if _, ok := pack.Scripts[task]; !ok { - targetLogger.Debug("no task in package, skipping") - targetLogger.Debug("done", "status", "skipped", "duration", time.Since(cmdTime)) - return nil - } - - // Setup tracer - tracer := runState.Run(util.GetTaskId(pack.Name, task)) - - // Create a logger - pref := colorCache.PrefixColor(pack.Name) - actualPrefix := pref("%s:%s: ", pack.Name, task) - targetUi := &cli.PrefixedUi{ - Ui: targetBaseUI, - OutputPrefix: actualPrefix, - InfoPrefix: actualPrefix, - ErrorPrefix: actualPrefix, - WarnPrefix: actualPrefix, - } - // Hash --------------------------------------------- - // first check for package-tasks - pipeline, ok := g.Pipeline[fmt.Sprintf("%v", id)] - if !ok { - // then check for regular tasks - altpipe, notcool := g.Pipeline[task] - // if neither, then bail - if !notcool && !ok { - return nil - } - // override if we need to... - pipeline = altpipe - } - - outputs := []string{fmt.Sprintf(".turbo/turbo-%v.log", task)} - if pipeline.Outputs == nil { - outputs = append(outputs, "dist/**/*", "build/**/*") - } else { - outputs = append(outputs, pipeline.Outputs...) - } - targetLogger.Debug("task output globs", "outputs", outputs) - - passThroughArgs := make([]string, 0, len(rs.Opts.passThroughArgs)) - for _, target := range rs.Targets { - if target == task { - passThroughArgs = append(passThroughArgs, rs.Opts.passThroughArgs...) - } - } - - // Hash the task-specific environment variables found in the dependsOnKey in the pipeline - var hashableEnvVars []string - var hashableEnvPairs []string - if len(pipeline.DependsOn) > 0 { - for _, v := range pipeline.DependsOn { - if strings.Contains(v, ENV_PIPELINE_DELIMITER) { - trimmed := strings.TrimPrefix(v, ENV_PIPELINE_DELIMITER) - hashableEnvPairs = append(hashableEnvPairs, fmt.Sprintf("%v=%v", trimmed, os.Getenv(trimmed))) - hashableEnvVars = append(hashableEnvVars, trimmed) - } - } - sort.Strings(hashableEnvVars) // always sort them - } - targetLogger.Debug("hashable env vars", "vars", hashableEnvVars) - hashable := struct { - Hash string - Task string - Outputs []string - PassThruArgs []string - HashableEnvPairs []string - }{ - Hash: pack.Hash, - Task: task, - Outputs: outputs, - PassThruArgs: passThroughArgs, - HashableEnvPairs: hashableEnvPairs, - } - hash, err := fs.HashObject(hashable) - targetLogger.Debug("task hash", "value", hash) - if err != nil { - targetUi.Error(fmt.Sprintf("Hashing error: %v", err)) - // @TODO probably should abort fatally??? - } - logFileName := filepath.Join(pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task)) - targetLogger.Debug("log file", "path", filepath.Join(rs.Opts.cwd, logFileName)) - - // Cache --------------------------------------------- - var hit bool - if !rs.Opts.forceExecution { - hit, _, _, err = turboCache.Fetch(rs.Opts.cwd, hash, nil) - if err != nil { - targetUi.Error(fmt.Sprintf("error fetching from cache: %s", err)) - } else if hit { - if rs.Opts.stream && fs.FileExists(filepath.Join(rs.Opts.cwd, logFileName)) { - logReplayWaitGroup.Add(1) - go replayLogs(targetLogger, targetBaseUI, rs.Opts, logFileName, hash, &logReplayWaitGroup, false) - } - targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime)) - tracer(TargetCached, nil) - - return nil - } - if rs.Opts.stream { - targetUi.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(hash))) - } - } else { - if rs.Opts.stream { - targetUi.Output(fmt.Sprintf("cache bypass, force executing %s", ui.Dim(hash))) - } - } - - // Setup command execution - argsactual := append([]string{"run"}, task) - argsactual = append(argsactual, passThroughArgs...) - // @TODO: @jaredpalmer fix this hack to get the package manager's name - var cmd *exec.Cmd - if backend.Name == "nodejs-berry" { - cmd = exec.Command("yarn", argsactual...) - } else { - cmd = exec.Command(strings.TrimPrefix(backend.Name, "nodejs-"), argsactual...) - } - cmd.Dir = pack.Dir - envs := fmt.Sprintf("TURBO_HASH=%v", hash) - cmd.Env = append(os.Environ(), envs) - - // Setup stdout/stderr - // If we are not caching anything, then we don't need to write logs to disk - // be careful about this conditional given the default of cache = true - var writer io.Writer - if !rs.Opts.cache || (pipeline.Cache != nil && !*pipeline.Cache) { - writer = os.Stdout - } else { - // Setup log file - if err := fs.EnsureDir(logFileName); err != nil { - tracer(TargetBuildFailed, err) - c.logError(targetLogger, actualPrefix, err) - if rs.Opts.bail { - os.Exit(1) - } - } - output, err := os.Create(logFileName) - if err != nil { - tracer(TargetBuildFailed, err) - c.logError(targetLogger, actualPrefix, err) - if rs.Opts.bail { - os.Exit(1) - } - } - defer output.Close() - bufWriter := bufio.NewWriter(output) - bufWriter.WriteString(fmt.Sprintf("%scache hit, replaying output %s\n", actualPrefix, ui.Dim(hash))) - defer bufWriter.Flush() - writer = io.MultiWriter(os.Stdout, bufWriter) - } - - logger := log.New(writer, "", 0) - // Setup a streamer that we'll pipe cmd.Stdout to - logStreamerOut := logstreamer.NewLogstreamer(logger, actualPrefix, false) - // Setup a streamer that we'll pipe cmd.Stderr to. - logStreamerErr := logstreamer.NewLogstreamer(logger, actualPrefix, false) - cmd.Stderr = logStreamerErr - cmd.Stdout = logStreamerOut - // Flush/Reset any error we recorded - logStreamerErr.FlushRecord() - logStreamerOut.FlushRecord() - - // Run the command - if err := c.Processes.Exec(cmd); err != nil { - // if we already know we're in the process of exiting, - // we don't need to record an error to that effect. - if errors.Is(err, process.ErrClosing) { - return nil - } - tracer(TargetBuildFailed, err) - targetLogger.Error("Error: command finished with error: %w", err) - if rs.Opts.bail { - if rs.Opts.stream { - targetUi.Error(fmt.Sprintf("Error: command finished with error: %s", err)) - } else { - f, err := os.Open(filepath.Join(rs.Opts.cwd, logFileName)) - if err != nil { - targetUi.Warn(fmt.Sprintf("failed reading logs: %v", err)) - } - defer f.Close() - scan := bufio.NewScanner(f) - targetBaseUI.Error("") - targetBaseUI.Error(util.Sprintf("%s ${RED}%s finished with error${RESET}", ui.ERROR_PREFIX, util.GetTaskId(pack.Name, task))) - targetBaseUI.Error("") - for scan.Scan() { - targetBaseUI.Output(util.Sprintf("${RED}%s:%s: ${RESET}%s", pack.Name, task, scan.Bytes())) //Writing to Stdout - } - } - c.Processes.Close() - } else { - if rs.Opts.stream { - targetUi.Warn("command finished with error, but continuing...") - } - } - return err - } - - // Cache command outputs - if rs.Opts.cache && (pipeline.Cache == nil || *pipeline.Cache) { - targetLogger.Debug("caching output", "outputs", outputs) - ignore := []string{} - filesToBeCached := globby.GlobFiles(pack.Dir, outputs, ignore) - if err := turboCache.Put(pack.Dir, hash, int(time.Since(cmdTime).Milliseconds()), filesToBeCached); err != nil { - c.logError(targetLogger, "", fmt.Errorf("error caching output: %w", err)) - } - } - - // Clean up tracing - tracer(TargetBuilt, nil) - targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime)) - return nil - }, }) } if err := engine.Prepare(&core.SchedulerExecutionOptions{ - Packages: rs.FilteredPkgs.UnsafeListOfStrings(), - TaskNames: rs.Targets, - Concurrency: rs.Opts.concurrency, - Parallel: rs.Opts.parallel, - TasksOnly: rs.Opts.only, + Packages: rs.FilteredPkgs.UnsafeListOfStrings(), + TaskNames: rs.Targets, + TasksOnly: rs.Opts.only, }); err != nil { c.Ui.Error(fmt.Sprintf("Error preparing engine: %s", err)) return 1 } if rs.Opts.dotGraph != "" { - graphString := string(engine.TaskGraph.Dot(&dag.DotOpts{ - Verbose: true, - DrawCycles: true, - })) - ext := filepath.Ext(rs.Opts.dotGraph) - if ext == ".html" { - f, err := os.Create(filepath.Join(rs.Opts.cwd, rs.Opts.dotGraph)) - if err != nil { - c.logError(c.Config.Logger, "", fmt.Errorf("error writing graph: %w", err)) - return 1 - } - defer f.Close() - f.WriteString(` - - - - Graph - - - - - - - `) - c.Ui.Output("") - c.Ui.Output(fmt.Sprintf("✔ Generated task graph in %s", ui.Bold(rs.Opts.dotGraph))) - if ui.IsTTY { - browser.OpenBrowser(filepath.Join(rs.Opts.cwd, rs.Opts.dotGraph)) - } - return 0 - } - hasDot := hasGraphViz() - if hasDot { - dotArgs := []string{"-T" + ext[1:], "-o", rs.Opts.dotGraph} - cmd := exec.Command("dot", dotArgs...) - cmd.Stdin = strings.NewReader(graphString) - if err := cmd.Run(); err != nil { - c.logError(c.Config.Logger, "", fmt.Errorf("could not generate task graphfile %v: %w", rs.Opts.dotGraph, err)) - return 1 - } else { - c.Ui.Output("") - c.Ui.Output(fmt.Sprintf("✔ Generated task graph in %s", ui.Bold(rs.Opts.dotGraph))) - } - } else { - c.Ui.Output("") - c.Ui.Warn(color.New(color.FgYellow, color.Bold, color.ReverseVideo).Sprint(" WARNING ") + color.YellowString(" `turbo` uses Graphviz to generate an image of your\ngraph, but Graphviz isn't installed on this machine.\n\nYou can download Graphviz from https://graphviz.org/download.\n\nIn the meantime, you can use this string output with an\nonline Dot graph viewer.")) - c.Ui.Output("") - c.Ui.Output(graphString) + err := c.generateDotGraph(engine.TaskGraph, filepath.Join(rs.Opts.cwd, rs.Opts.dotGraph)) + if err != nil { + c.logError(c.Config.Logger, "", err) + return 1 } return 0 } + goctx := gocontext.Background() + var analyticsSink analytics.Sink + if c.Config.IsLoggedIn() { + analyticsSink = c.Config.ApiClient + } else { + analyticsSink = analytics.NullSink + } + analyticsClient := analytics.NewClient(goctx, analyticsSink, c.Config.Logger.Named("analytics")) + defer analyticsClient.CloseWithTimeout(50 * time.Millisecond) + turboCache := cache.New(c.Config, analyticsClient) + defer turboCache.Shutdown() + runState := NewRunState(rs.Opts, startAt) + runState.Listen(c.Ui, time.Now()) + ec := &execContext{ + g: g, + colorCache: NewColorCache(), + runState: runState, + rs: rs, + ui: &cli.ConcurrentUi{Ui: c.Ui}, + turboCache: turboCache, + logger: c.Config.Logger, + backend: backend, + processes: c.Processes, + } + // run the thing - errs := engine.Execute() + errs := engine.Execute(ec.exec, core.ExecOpts{ + Parallel: rs.Opts.parallel, + Concurrency: rs.Opts.concurrency, + }) // Track if we saw any child with a non-zero exit code exitCode := 0 @@ -619,7 +356,7 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La c.Ui.Error(err.Error()) } - logReplayWaitGroup.Wait() + ec.logReplayWaitGroup.Wait() if err := runState.Close(c.Ui, rs.Opts.profile); err != nil { c.Ui.Error(fmt.Sprintf("Error with profiler: %s", err.Error())) @@ -893,3 +630,304 @@ func getTargetsFromArguments(arguments []string, configJson *fs.TurboConfigJSON) sort.Strings(stringTargets) return stringTargets, nil } + +type execContext struct { + g *completeGraph + colorCache *ColorCache + runState *RunState + rs *runSpec + logReplayWaitGroup sync.WaitGroup + ui cli.Ui + turboCache cache.Cache + logger hclog.Logger + backend *api.LanguageBackend + processes *process.Manager +} + +func (e *execContext) logError(log hclog.Logger, prefix string, err error) { + e.logger.Error(prefix, "error", err) + + if prefix != "" { + prefix += ": " + } + + e.ui.Error(fmt.Sprintf("%s%s%s", ui.ERROR_PREFIX, prefix, color.RedString(" %v", err))) +} + +func (e *execContext) exec(id string) error { + cmdTime := time.Now() + name, task := util.GetPackageTaskFromId(id) + pack := e.g.PackageInfos[name] + targetLogger := e.logger.Named(fmt.Sprintf("%v:%v", pack.Name, task)) + defer targetLogger.ResetNamed(pack.Name) + targetLogger.Debug("start") + + // bail if the script doesn't exist + if _, ok := pack.Scripts[task]; !ok { + targetLogger.Debug("no task in package, skipping") + targetLogger.Debug("done", "status", "skipped", "duration", time.Since(cmdTime)) + return nil + } + + // Setup tracer + tracer := e.runState.Run(util.GetTaskId(pack.Name, task)) + + // Create a logger + pref := e.colorCache.PrefixColor(pack.Name) + actualPrefix := pref("%s:%s: ", pack.Name, task) + targetUi := &cli.PrefixedUi{ + Ui: e.ui, + OutputPrefix: actualPrefix, + InfoPrefix: actualPrefix, + ErrorPrefix: actualPrefix, + WarnPrefix: actualPrefix, + } + // Hash --------------------------------------------- + // first check for package-tasks + pipeline, ok := e.g.Pipeline[fmt.Sprintf("%v", id)] + if !ok { + // then check for regular tasks + altpipe, notcool := e.g.Pipeline[task] + // if neither, then bail + if !notcool && !ok { + return nil + } + // override if we need to... + pipeline = altpipe + } + + outputs := []string{fmt.Sprintf(".turbo/turbo-%v.log", task)} + if pipeline.Outputs == nil { + outputs = append(outputs, "dist/**/*", "build/**/*") + } else { + outputs = append(outputs, pipeline.Outputs...) + } + targetLogger.Debug("task output globs", "outputs", outputs) + + passThroughArgs := make([]string, 0, len(e.rs.Opts.passThroughArgs)) + for _, target := range e.rs.Targets { + if target == task { + passThroughArgs = append(passThroughArgs, e.rs.Opts.passThroughArgs...) + } + } + + // Hash the task-specific environment variables found in the dependsOnKey in the pipeline + var hashableEnvVars []string + var hashableEnvPairs []string + if len(pipeline.DependsOn) > 0 { + for _, v := range pipeline.DependsOn { + if strings.Contains(v, ENV_PIPELINE_DELIMITER) { + trimmed := strings.TrimPrefix(v, ENV_PIPELINE_DELIMITER) + hashableEnvPairs = append(hashableEnvPairs, fmt.Sprintf("%v=%v", trimmed, os.Getenv(trimmed))) + hashableEnvVars = append(hashableEnvVars, trimmed) + } + } + sort.Strings(hashableEnvVars) // always sort them + } + targetLogger.Debug("hashable env vars", "vars", hashableEnvVars) + hashable := struct { + Hash string + Task string + Outputs []string + PassThruArgs []string + HashableEnvPairs []string + }{ + Hash: pack.Hash, + Task: task, + Outputs: outputs, + PassThruArgs: passThroughArgs, + HashableEnvPairs: hashableEnvPairs, + } + hash, err := fs.HashObject(hashable) + targetLogger.Debug("task hash", "value", hash) + if err != nil { + targetUi.Error(fmt.Sprintf("Hashing error: %v", err)) + // @TODO probably should abort fatally??? + } + logFileName := filepath.Join(pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task)) + targetLogger.Debug("log file", "path", filepath.Join(e.rs.Opts.cwd, logFileName)) + + // Cache --------------------------------------------- + var hit bool + if !e.rs.Opts.forceExecution { + hit, _, _, err = turboCache.Fetch(rs.Opts.cwd, hash, nil) + if err != nil { + targetUi.Error(fmt.Sprintf("error fetching from cache: %s", err)) + } else if hit { + if e.rs.Opts.stream && fs.FileExists(filepath.Join(e.rs.Opts.cwd, logFileName)) { + e.logReplayWaitGroup.Add(1) + go replayLogs(targetLogger, e.ui, e.rs.Opts, logFileName, hash, &e.logReplayWaitGroup, false) + } + targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime)) + tracer(TargetCached, nil) + + return nil + } + if e.rs.Opts.stream { + targetUi.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(hash))) + } + } else { + if e.rs.Opts.stream { + targetUi.Output(fmt.Sprintf("cache bypass, force executing %s", ui.Dim(hash))) + } + } + + // Setup command execution + argsactual := append([]string{"run"}, task) + argsactual = append(argsactual, passThroughArgs...) + // @TODO: @jaredpalmer fix this hack to get the package manager's name + var cmd *exec.Cmd + if e.backend.Name == "nodejs-berry" { + cmd = exec.Command("yarn", argsactual...) + } else { + cmd = exec.Command(strings.TrimPrefix(e.backend.Name, "nodejs-"), argsactual...) + } + cmd.Dir = pack.Dir + envs := fmt.Sprintf("TURBO_HASH=%v", hash) + cmd.Env = append(os.Environ(), envs) + + // Setup stdout/stderr + // If we are not caching anything, then we don't need to write logs to disk + // be careful about this conditional given the default of cache = true + var writer io.Writer + if !e.rs.Opts.cache || (pipeline.Cache != nil && !*pipeline.Cache) { + writer = os.Stdout + } else { + // Setup log file + if err := fs.EnsureDir(logFileName); err != nil { + tracer(TargetBuildFailed, err) + e.logError(targetLogger, actualPrefix, err) + if e.rs.Opts.bail { + os.Exit(1) + } + } + output, err := os.Create(logFileName) + if err != nil { + tracer(TargetBuildFailed, err) + e.logError(targetLogger, actualPrefix, err) + if e.rs.Opts.bail { + os.Exit(1) + } + } + defer output.Close() + bufWriter := bufio.NewWriter(output) + bufWriter.WriteString(fmt.Sprintf("%scache hit, replaying output %s\n", actualPrefix, ui.Dim(hash))) + defer bufWriter.Flush() + writer = io.MultiWriter(os.Stdout, bufWriter) + } + + logger := log.New(writer, "", 0) + // Setup a streamer that we'll pipe cmd.Stdout to + logStreamerOut := logstreamer.NewLogstreamer(logger, actualPrefix, false) + // Setup a streamer that we'll pipe cmd.Stderr to. + logStreamerErr := logstreamer.NewLogstreamer(logger, actualPrefix, false) + cmd.Stderr = logStreamerErr + cmd.Stdout = logStreamerOut + // Flush/Reset any error we recorded + logStreamerErr.FlushRecord() + logStreamerOut.FlushRecord() + + // Run the command + if err := e.processes.Exec(cmd); err != nil { + // if we already know we're in the process of exiting, + // we don't need to record an error to that effect. + if errors.Is(err, process.ErrClosing) { + return nil + } + tracer(TargetBuildFailed, err) + targetLogger.Error("Error: command finished with error: %w", err) + if e.rs.Opts.bail { + if e.rs.Opts.stream { + targetUi.Error(fmt.Sprintf("Error: command finished with error: %s", err)) + } else { + f, err := os.Open(filepath.Join(e.rs.Opts.cwd, logFileName)) + if err != nil { + targetUi.Warn(fmt.Sprintf("failed reading logs: %v", err)) + } + defer f.Close() + scan := bufio.NewScanner(f) + e.ui.Error("") + e.ui.Error(util.Sprintf("%s ${RED}%s finished with error${RESET}", ui.ERROR_PREFIX, util.GetTaskId(pack.Name, task))) + e.ui.Error("") + for scan.Scan() { + e.ui.Output(util.Sprintf("${RED}%s:%s: ${RESET}%s", pack.Name, task, scan.Bytes())) //Writing to Stdout + } + } + e.processes.Close() + } else { + if e.rs.Opts.stream { + targetUi.Warn("command finished with error, but continuing...") + } + } + return err + } + + // Cache command outputs + if e.rs.Opts.cache && (pipeline.Cache == nil || *pipeline.Cache) { + targetLogger.Debug("caching output", "outputs", outputs) + ignore := []string{} + filesToBeCached := globby.GlobFiles(pack.Dir, outputs, ignore) + if err := e.turboCache.Put(pack.Dir, hash, int(time.Since(cmdTime).Milliseconds()), filesToBeCached); err != nil { + e.logError(targetLogger, "", fmt.Errorf("error caching output: %w", err)) + } + } + + // Clean up tracing + tracer(TargetBuilt, nil) + targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime)) + return nil +} + +func (c *RunCommand) generateDotGraph(taskGraph *dag.AcyclicGraph, outputFilename string) error { + graphString := string(taskGraph.Dot(&dag.DotOpts{ + Verbose: true, + DrawCycles: true, + })) + ext := filepath.Ext(outputFilename) + if ext == ".html" { + f, err := os.Create(outputFilename) + if err != nil { + return fmt.Errorf("error writing graph: %w", err) + } + defer f.Close() + f.WriteString(` + + + + Graph + + + + + + + `) + c.Ui.Output("") + c.Ui.Output(fmt.Sprintf("✔ Generated task graph in %s", ui.Bold(outputFilename))) + if ui.IsTTY { + browser.OpenBrowser(outputFilename) + } + return nil + } + hasDot := hasGraphViz() + if hasDot { + dotArgs := []string{"-T" + ext[1:], "-o", outputFilename} + cmd := exec.Command("dot", dotArgs...) + cmd.Stdin = strings.NewReader(graphString) + if err := cmd.Run(); err != nil { + return fmt.Errorf("could not generate task graphfile %v: %w", outputFilename, err) + } else { + c.Ui.Output("") + c.Ui.Output(fmt.Sprintf("✔ Generated task graph in %s", ui.Bold(outputFilename))) + } + } else { + c.Ui.Output("") + c.Ui.Warn(color.New(color.FgYellow, color.Bold, color.ReverseVideo).Sprint(" WARNING ") + color.YellowString(" `turbo` uses Graphviz to generate an image of your\ngraph, but Graphviz isn't installed on this machine.\n\nYou can download Graphviz from https://graphviz.org/download.\n\nIn the meantime, you can use this string output with an\nonline Dot graph viewer.")) + c.Ui.Output("") + c.Ui.Output(graphString) + } + return nil +} From 475a12abce9e960f8d6ab6d2611f67fed1a55ddf Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 10 Mar 2022 11:11:36 -0800 Subject: [PATCH 2/9] Extract building the engine --- cli/internal/core/scheduler.go | 17 ++--- cli/internal/core/scheduler_test.go | 4 +- cli/internal/run/run.go | 99 +++++++++++++++-------------- 3 files changed, 64 insertions(+), 56 deletions(-) diff --git a/cli/internal/core/scheduler.go b/cli/internal/core/scheduler.go index 30c96675a9a5c..ad4458707749b 100644 --- a/cli/internal/core/scheduler.go +++ b/cli/internal/core/scheduler.go @@ -21,7 +21,7 @@ type Task struct { type Visitor = func(taskID string) error -type scheduler struct { +type Scheduler struct { // TopologicGraph is a graph of workspaces TopologicGraph *dag.AcyclicGraph // TaskGraph is a graph of package-tasks @@ -33,8 +33,8 @@ type scheduler struct { } // NewScheduler creates a new scheduler given a topologic graph of workspace package names -func NewScheduler(topologicalGraph *dag.AcyclicGraph) *scheduler { - return &scheduler{ +func NewScheduler(topologicalGraph *dag.AcyclicGraph) *Scheduler { + return &Scheduler{ Tasks: make(map[string]*Task), TopologicGraph: topologicalGraph, TaskGraph: &dag.AcyclicGraph{}, @@ -53,7 +53,7 @@ type SchedulerExecutionOptions struct { TasksOnly bool } -func (p *scheduler) Prepare(options *SchedulerExecutionOptions) error { +func (p *Scheduler) Prepare(options *SchedulerExecutionOptions) error { pkgs := options.Packages if len(pkgs) == 0 { // TODO(gsoltis): Is this behavior only used in tests? @@ -64,6 +64,7 @@ func (p *scheduler) Prepare(options *SchedulerExecutionOptions) error { tasks := options.TaskNames if len(tasks) == 0 { + // TODO(gsoltis): Is this behavior used? for key := range p.Tasks { tasks = append(tasks, key) } @@ -85,7 +86,7 @@ type ExecOpts struct { } // Execute executes the pipeline, constructing an internal task graph and walking it accordingly. -func (p *scheduler) Execute(visitor Visitor, opts ExecOpts) []error { +func (p *Scheduler) Execute(visitor Visitor, opts ExecOpts) []error { var sema = util.NewSemaphore(opts.Concurrency) return p.TaskGraph.Walk(func(v dag.Vertex) error { // Always return if it is the root node @@ -101,7 +102,7 @@ func (p *scheduler) Execute(visitor Visitor, opts ExecOpts) []error { }) } -func (p *scheduler) generateTaskGraph(scope []string, taskNames []string, tasksOnly bool) error { +func (p *Scheduler) generateTaskGraph(scope []string, taskNames []string, tasksOnly bool) error { if p.PackageTaskDeps == nil { p.PackageTaskDeps = [][]string{} } @@ -225,12 +226,12 @@ func getPackageTaskDepsMap(packageTaskDeps [][]string) map[string][]string { return depMap } -func (p *scheduler) AddTask(task *Task) *scheduler { +func (p *Scheduler) AddTask(task *Task) *Scheduler { p.Tasks[task.Name] = task return p } -func (p *scheduler) AddDep(fromTaskId string, toTaskId string) *scheduler { +func (p *Scheduler) AddDep(fromTaskId string, toTaskId string) *Scheduler { p.PackageTaskDeps = append(p.PackageTaskDeps, []string{fromTaskId, toTaskId}) return p } diff --git a/cli/internal/core/scheduler_test.go b/cli/internal/core/scheduler_test.go index 2fedeeb02123a..0245667c4fa5c 100644 --- a/cli/internal/core/scheduler_test.go +++ b/cli/internal/core/scheduler_test.go @@ -55,7 +55,7 @@ func TestSchedulerDefault(t *testing.T) { } err := p.Prepare(&SchedulerExecutionOptions{ - Packages: nil, + Packages: []string{"a", "b", "c"}, TaskNames: []string{"test"}, TasksOnly: false, }) @@ -115,7 +115,7 @@ func TestSchedulerTasksOnly(t *testing.T) { } err := p.Prepare(&SchedulerExecutionOptions{ - Packages: nil, + Packages: []string{"a", "b", "c"}, TaskNames: []string{"test"}, TasksOnly: true, }) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index c69a7ecaba66b..e365d15f45624 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -255,55 +255,11 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La c.Ui.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len())))) } - engine := core.NewScheduler(&g.TopologicalGraph) - for taskName, value := range g.Pipeline { - topoDeps := make(util.Set) - deps := make(util.Set) - if util.IsPackageTask(taskName) { - for _, from := range value.DependsOn { - if strings.HasPrefix(from, ENV_PIPELINE_DELIMITER) { - continue - } - if util.IsPackageTask(from) { - engine.AddDep(from, taskName) - continue - } else if strings.Contains(from, TOPOLOGICAL_PIPELINE_DELIMITER) { - topoDeps.Add(from[1:]) - } else { - deps.Add(from) - } - } - _, id := util.GetPackageTaskFromId(taskName) - taskName = id - } else { - for _, from := range value.DependsOn { - if strings.HasPrefix(from, ENV_PIPELINE_DELIMITER) { - continue - } - if strings.Contains(from, TOPOLOGICAL_PIPELINE_DELIMITER) { - topoDeps.Add(from[1:]) - } else { - deps.Add(from) - } - } - } - - engine.AddTask(&core.Task{ - Name: taskName, - TopoDeps: topoDeps, - Deps: deps, - }) - } - - if err := engine.Prepare(&core.SchedulerExecutionOptions{ - Packages: rs.FilteredPkgs.UnsafeListOfStrings(), - TaskNames: rs.Targets, - TasksOnly: rs.Opts.only, - }); err != nil { + engine, err := buildTaskGraph(&g.TopologicalGraph, g.Pipeline, rs) + if err != nil { c.Ui.Error(fmt.Sprintf("Error preparing engine: %s", err)) return 1 } - if rs.Opts.dotGraph != "" { err := c.generateDotGraph(engine.TaskGraph, filepath.Join(rs.Opts.cwd, rs.Opts.dotGraph)) if err != nil { @@ -366,6 +322,57 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La return exitCode } +func buildTaskGraph(topoGraph *dag.AcyclicGraph, pipeline map[string]fs.Pipeline, rs *runSpec) (*core.Scheduler, error) { + engine := core.NewScheduler(topoGraph) + for taskName, value := range pipeline { + topoDeps := make(util.Set) + deps := make(util.Set) + if util.IsPackageTask(taskName) { + for _, from := range value.DependsOn { + if strings.HasPrefix(from, ENV_PIPELINE_DELIMITER) { + continue + } + if util.IsPackageTask(from) { + engine.AddDep(from, taskName) + continue + } else if strings.Contains(from, TOPOLOGICAL_PIPELINE_DELIMITER) { + topoDeps.Add(from[1:]) + } else { + deps.Add(from) + } + } + _, id := util.GetPackageTaskFromId(taskName) + taskName = id + } else { + for _, from := range value.DependsOn { + if strings.HasPrefix(from, ENV_PIPELINE_DELIMITER) { + continue + } + if strings.Contains(from, TOPOLOGICAL_PIPELINE_DELIMITER) { + topoDeps.Add(from[1:]) + } else { + deps.Add(from) + } + } + } + + engine.AddTask(&core.Task{ + Name: taskName, + TopoDeps: topoDeps, + Deps: deps, + }) + } + + if err := engine.Prepare(&core.SchedulerExecutionOptions{ + Packages: rs.FilteredPkgs.UnsafeListOfStrings(), + TaskNames: rs.Targets, + TasksOnly: rs.Opts.only, + }); err != nil { + return nil, err + } + return engine, nil +} + // RunOptions holds the current run operations configuration type RunOptions struct { From 6f9b13c793292c20117f11845eca61ae90405461 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 10 Mar 2022 13:31:17 -0800 Subject: [PATCH 3/9] Add JSON functionality --- cli/internal/run/run.go | 175 +++++++++++++++++++++++++++------------- 1 file changed, 118 insertions(+), 57 deletions(-) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index e365d15f45624..a15c42eef018b 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -3,6 +3,7 @@ package run import ( "bufio" gocontext "context" + "encoding/json" "flag" "fmt" "io" @@ -121,6 +122,9 @@ Options: (default false) --no-cache Avoid saving task results to the cache. Useful for development/watch tasks. (default false) + --dry-run[=json] List the packages in scope and the tasks that would be run, + but don't actually run them. + Passing --dry-run=json will render the output in JSON format. `) return strings.TrimSpace(helpText) } @@ -168,9 +172,6 @@ func (c *RunCommand) Run(args []string) int { c.logError(c.Config.Logger, "", fmt.Errorf("failed resolve packages to run %v", err)) } c.Config.Logger.Debug("global hash", "value", ctx.GlobalHash) - packagesInScope := filteredPkgs.UnsafeListOfStrings() - sort.Strings(packagesInScope) - c.Ui.Output(fmt.Sprintf(ui.Dim("• Packages in scope: %v"), strings.Join(packagesInScope, ", "))) c.Config.Logger.Debug("local cache folder", "path", runOptions.cacheFolder) fs.EnsureDir(runOptions.cacheFolder) @@ -251,72 +252,54 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La } } - if rs.Opts.stream { - c.Ui.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len())))) - } - engine, err := buildTaskGraph(&g.TopologicalGraph, g.Pipeline, rs) if err != nil { c.Ui.Error(fmt.Sprintf("Error preparing engine: %s", err)) return 1 } + exitCode := 0 if rs.Opts.dotGraph != "" { err := c.generateDotGraph(engine.TaskGraph, filepath.Join(rs.Opts.cwd, rs.Opts.dotGraph)) if err != nil { c.logError(c.Config.Logger, "", err) return 1 } - return 0 - } - - goctx := gocontext.Background() - var analyticsSink analytics.Sink - if c.Config.IsLoggedIn() { - analyticsSink = c.Config.ApiClient - } else { - analyticsSink = analytics.NullSink - } - analyticsClient := analytics.NewClient(goctx, analyticsSink, c.Config.Logger.Named("analytics")) - defer analyticsClient.CloseWithTimeout(50 * time.Millisecond) - turboCache := cache.New(c.Config, analyticsClient) - defer turboCache.Shutdown() - runState := NewRunState(rs.Opts, startAt) - runState.Listen(c.Ui, time.Now()) - ec := &execContext{ - g: g, - colorCache: NewColorCache(), - runState: runState, - rs: rs, - ui: &cli.ConcurrentUi{Ui: c.Ui}, - turboCache: turboCache, - logger: c.Config.Logger, - backend: backend, - processes: c.Processes, - } - - // run the thing - errs := engine.Execute(ec.exec, core.ExecOpts{ - Parallel: rs.Opts.parallel, - Concurrency: rs.Opts.concurrency, - }) - - // Track if we saw any child with a non-zero exit code - exitCode := 0 - exitCodeErr := &process.ChildExit{} - for _, err := range errs { - if errors.As(err, &exitCodeErr) { - if exitCodeErr.ExitCode > exitCode { - exitCode = exitCodeErr.ExitCode + } else if rs.Opts.dryRun { + tasksRun, err := c.executeDryRun(engine) + if err != nil { + c.logError(c.Config.Logger, "", err) + return 1 + } + packagesInScope := rs.FilteredPkgs.UnsafeListOfStrings() + sort.Strings(packagesInScope) + if rs.Opts.dryRunJson { + dryRun := make(map[string][]string) + dryRun["packages"] = packagesInScope + dryRun["tasks"] = tasksRun + bytes, err := json.MarshalIndent(dryRun, "", " ") + if err != nil { + c.logError(c.Config.Logger, "", errors.Wrap(err, "failed to render to JSON")) + return 1 + } + c.Ui.Output(string(bytes)) + } else { + c.Ui.Info("Packages in scope:") + for _, pkg := range packagesInScope { + c.Ui.Output(fmt.Sprintf(ui.Bold("• %v"), pkg)) + } + c.Ui.Info("Tasks to run:") + for _, task := range tasksRun { + c.Ui.Output(fmt.Sprintf(ui.Bold("• %v"), task)) } } - c.Ui.Error(err.Error()) - } - - ec.logReplayWaitGroup.Wait() - - if err := runState.Close(c.Ui, rs.Opts.profile); err != nil { - c.Ui.Error(fmt.Sprintf("Error with profiler: %s", err.Error())) - return 1 + } else { + packagesInScope := rs.FilteredPkgs.UnsafeListOfStrings() + sort.Strings(packagesInScope) + c.Ui.Output(fmt.Sprintf(ui.Dim("• Packages in scope: %v"), strings.Join(packagesInScope, ", "))) + if rs.Opts.stream { + c.Ui.Output(fmt.Sprintf("%s %s %s", ui.Dim("• Running"), ui.Dim(ui.Bold(strings.Join(rs.Targets, ", "))), ui.Dim(fmt.Sprintf("in %v packages", rs.FilteredPkgs.Len())))) + } + exitCode = c.executeTasks(g, rs, engine, backend, startAt) } return exitCode @@ -410,7 +393,9 @@ type RunOptions struct { bail bool passThroughArgs []string // Restrict execution to only the listed task names. Default false - only bool + only bool + dryRun bool + dryRunJson bool } func (ro *RunOptions) ScopeOpts() *scope.Opts { @@ -540,6 +525,11 @@ func parseRunArgs(args []string, output cli.Ui) (*RunOptions, error) { includDepsSet = true case strings.HasPrefix(arg, "--only"): runOptions.only = true + case strings.HasPrefix(arg, "--dry-run"): + runOptions.dryRun = true + if strings.HasPrefix(arg, "--dry-run=json") { + runOptions.dryRunJson = true + } case strings.HasPrefix(arg, "--team"): case strings.HasPrefix(arg, "--token"): case strings.HasPrefix(arg, "--api"): @@ -595,6 +585,77 @@ func hasGraphViz() bool { return err == nil } +func (c *RunCommand) executeTasks(g *completeGraph, rs *runSpec, engine *core.Scheduler, backend *api.LanguageBackend, startAt time.Time) int { + goctx := gocontext.Background() + var analyticsSink analytics.Sink + if c.Config.IsLoggedIn() { + analyticsSink = c.Config.ApiClient + } else { + analyticsSink = analytics.NullSink + } + analyticsClient := analytics.NewClient(goctx, analyticsSink, c.Config.Logger.Named("analytics")) + defer analyticsClient.CloseWithTimeout(50 * time.Millisecond) + turboCache := cache.New(c.Config, analyticsClient) + defer turboCache.Shutdown() + runState := NewRunState(rs.Opts, startAt) + runState.Listen(c.Ui, time.Now()) + ec := &execContext{ + g: g, + colorCache: NewColorCache(), + runState: runState, + rs: rs, + ui: &cli.ConcurrentUi{Ui: c.Ui}, + turboCache: turboCache, + logger: c.Config.Logger, + backend: backend, + processes: c.Processes, + } + + // run the thing + errs := engine.Execute(ec.exec, core.ExecOpts{ + Parallel: rs.Opts.parallel, + Concurrency: rs.Opts.concurrency, + }) + + // Track if we saw any child with a non-zero exit code + exitCode := 0 + exitCodeErr := &process.ChildExit{} + for _, err := range errs { + if errors.As(err, &exitCodeErr) { + if exitCodeErr.ExitCode > exitCode { + exitCode = exitCodeErr.ExitCode + } + } + c.Ui.Error(err.Error()) + } + + ec.logReplayWaitGroup.Wait() + + if err := runState.Close(c.Ui, rs.Opts.profile); err != nil { + c.Ui.Error(fmt.Sprintf("Error with profiler: %s", err.Error())) + return 1 + } + return 0 +} + +func (c *RunCommand) executeDryRun(engine *core.Scheduler) ([]string, error) { + taskIDs := []string{} + errs := engine.Execute(func(taskID string) error { + taskIDs = append(taskIDs, taskID) + return nil + }, core.ExecOpts{ + Concurrency: 1, + Parallel: false, + }) + if len(errs) > 0 { + for _, err := range errs { + c.Ui.Error(err.Error()) + } + return nil, errors.New("errors occurred during dry-run graph traversal") + } + return taskIDs, nil +} + // Replay logs will try to replay logs back to the stdout func replayLogs(logger hclog.Logger, prefixUi cli.Ui, runOptions *RunOptions, logFileName, hash string, wg *sync.WaitGroup, silent bool) { defer wg.Done() From 59b882ca4a3e30aa584202289285f5ff32da6fa4 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 10 Mar 2022 13:35:34 -0800 Subject: [PATCH 4/9] Fix merge issue --- cli/internal/run/run.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index a15c42eef018b..ba538424a6dfe 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -818,7 +818,7 @@ func (e *execContext) exec(id string) error { // Cache --------------------------------------------- var hit bool if !e.rs.Opts.forceExecution { - hit, _, _, err = turboCache.Fetch(rs.Opts.cwd, hash, nil) + hit, _, _, err = e.turboCache.Fetch(e.rs.Opts.cwd, hash, nil) if err != nil { targetUi.Error(fmt.Sprintf("error fetching from cache: %s", err)) } else if hit { From 5abb8132ea64896d23dfaf6fbe1e6c6184af4e40 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Thu, 10 Mar 2022 16:18:22 -0800 Subject: [PATCH 5/9] Pull out hashing code, calculate hashes in dry run --- cli/internal/run/run.go | 228 ++++++++++++++++++++++++---------------- 1 file changed, 139 insertions(+), 89 deletions(-) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index ba538424a6dfe..6d7518012b041 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -70,6 +70,16 @@ type runSpec struct { Opts *RunOptions } +func (rs *runSpec) ArgsForTask(task string) []string { + passThroughArgs := make([]string, 0, len(rs.Opts.passThroughArgs)) + for _, target := range rs.Targets { + if target == task { + passThroughArgs = append(passThroughArgs, rs.Opts.passThroughArgs...) + } + } + return passThroughArgs +} + // Synopsis of run command func (c *RunCommand) Synopsis() string { return "Run a task" @@ -265,7 +275,7 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La return 1 } } else if rs.Opts.dryRun { - tasksRun, err := c.executeDryRun(engine) + tasksRun, err := c.executeDryRun(engine, g, rs, c.Config.Logger) if err != nil { c.logError(c.Config.Logger, "", err) return 1 @@ -273,9 +283,13 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La packagesInScope := rs.FilteredPkgs.UnsafeListOfStrings() sort.Strings(packagesInScope) if rs.Opts.dryRunJson { - dryRun := make(map[string][]string) - dryRun["packages"] = packagesInScope - dryRun["tasks"] = tasksRun + dryRun := &struct { + Packages []string `json:"packages"` + Tasks []hashedTask `json:"tasks"` + }{ + Packages: packagesInScope, + Tasks: tasksRun, + } bytes, err := json.MarshalIndent(dryRun, "", " ") if err != nil { c.logError(c.Config.Logger, "", errors.Wrap(err, "failed to render to JSON")) @@ -600,7 +614,6 @@ func (c *RunCommand) executeTasks(g *completeGraph, rs *runSpec, engine *core.Sc runState := NewRunState(rs.Opts, startAt) runState.Listen(c.Ui, time.Now()) ec := &execContext{ - g: g, colorCache: NewColorCache(), runState: runState, rs: rs, @@ -612,7 +625,7 @@ func (c *RunCommand) executeTasks(g *completeGraph, rs *runSpec, engine *core.Sc } // run the thing - errs := engine.Execute(ec.exec, core.ExecOpts{ + errs := engine.Execute(g.getPackageTaskVisitor(ec.exec), core.ExecOpts{ Parallel: rs.Opts.parallel, Concurrency: rs.Opts.concurrency, }) @@ -638,12 +651,26 @@ func (c *RunCommand) executeTasks(g *completeGraph, rs *runSpec, engine *core.Sc return 0 } -func (c *RunCommand) executeDryRun(engine *core.Scheduler) ([]string, error) { - taskIDs := []string{} - errs := engine.Execute(func(taskID string) error { - taskIDs = append(taskIDs, taskID) +type hashedTask struct { + TaskID string `json:"taskId"` + Hash string `json:"hash"` + // TODO(gsoltis): other data we might want here? inputs, args, outputs, etc. +} + +func (c *RunCommand) executeDryRun(engine *core.Scheduler, g *completeGraph, rs *runSpec, logger hclog.Logger) ([]hashedTask, error) { + taskIDs := []hashedTask{} + errs := engine.Execute(g.getPackageTaskVisitor(func(pt *packageTask) error { + passThroughArgs := rs.ArgsForTask(pt.task) + hash, err := pt.hash(passThroughArgs, logger) + if err != nil { + return err + } + taskIDs = append(taskIDs, hashedTask{ + TaskID: pt.taskID, + Hash: hash, + }) return nil - }, core.ExecOpts{ + }), core.ExecOpts{ Concurrency: 1, Parallel: false, }) @@ -700,7 +727,6 @@ func getTargetsFromArguments(arguments []string, configJson *fs.TurboConfigJSON) } type execContext struct { - g *completeGraph colorCache *ColorCache runState *RunState rs *runSpec @@ -722,27 +748,25 @@ func (e *execContext) logError(log hclog.Logger, prefix string, err error) { e.ui.Error(fmt.Sprintf("%s%s%s", ui.ERROR_PREFIX, prefix, color.RedString(" %v", err))) } -func (e *execContext) exec(id string) error { +func (e *execContext) exec(pt *packageTask) error { cmdTime := time.Now() - name, task := util.GetPackageTaskFromId(id) - pack := e.g.PackageInfos[name] - targetLogger := e.logger.Named(fmt.Sprintf("%v:%v", pack.Name, task)) - defer targetLogger.ResetNamed(pack.Name) + + targetLogger := e.logger.Named(fmt.Sprintf("%v:%v", pt.pkg.Name, pt.task)) targetLogger.Debug("start") // bail if the script doesn't exist - if _, ok := pack.Scripts[task]; !ok { + if _, ok := pt.pkg.Scripts[pt.task]; !ok { targetLogger.Debug("no task in package, skipping") targetLogger.Debug("done", "status", "skipped", "duration", time.Since(cmdTime)) return nil } // Setup tracer - tracer := e.runState.Run(util.GetTaskId(pack.Name, task)) + tracer := e.runState.Run(util.GetTaskId(pt.pkg.Name, pt.task)) // Create a logger - pref := e.colorCache.PrefixColor(pack.Name) - actualPrefix := pref("%s:%s: ", pack.Name, task) + pref := e.colorCache.PrefixColor(pt.pkg.Name) + actualPrefix := pref("%s:%s: ", pt.pkg.Name, pt.task) targetUi := &cli.PrefixedUi{ Ui: e.ui, OutputPrefix: actualPrefix, @@ -750,71 +774,17 @@ func (e *execContext) exec(id string) error { ErrorPrefix: actualPrefix, WarnPrefix: actualPrefix, } - // Hash --------------------------------------------- - // first check for package-tasks - pipeline, ok := e.g.Pipeline[fmt.Sprintf("%v", id)] - if !ok { - // then check for regular tasks - altpipe, notcool := e.g.Pipeline[task] - // if neither, then bail - if !notcool && !ok { - return nil - } - // override if we need to... - pipeline = altpipe - } - outputs := []string{fmt.Sprintf(".turbo/turbo-%v.log", task)} - if pipeline.Outputs == nil { - outputs = append(outputs, "dist/**/*", "build/**/*") - } else { - outputs = append(outputs, pipeline.Outputs...) - } - targetLogger.Debug("task output globs", "outputs", outputs) - - passThroughArgs := make([]string, 0, len(e.rs.Opts.passThroughArgs)) - for _, target := range e.rs.Targets { - if target == task { - passThroughArgs = append(passThroughArgs, e.rs.Opts.passThroughArgs...) - } - } + logFileName := filepath.Join(pt.pkg.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", pt.task)) + targetLogger.Debug("log file", "path", filepath.Join(e.rs.Opts.cwd, logFileName)) - // Hash the task-specific environment variables found in the dependsOnKey in the pipeline - var hashableEnvVars []string - var hashableEnvPairs []string - if len(pipeline.DependsOn) > 0 { - for _, v := range pipeline.DependsOn { - if strings.Contains(v, ENV_PIPELINE_DELIMITER) { - trimmed := strings.TrimPrefix(v, ENV_PIPELINE_DELIMITER) - hashableEnvPairs = append(hashableEnvPairs, fmt.Sprintf("%v=%v", trimmed, os.Getenv(trimmed))) - hashableEnvVars = append(hashableEnvVars, trimmed) - } - } - sort.Strings(hashableEnvVars) // always sort them - } - targetLogger.Debug("hashable env vars", "vars", hashableEnvVars) - hashable := struct { - Hash string - Task string - Outputs []string - PassThruArgs []string - HashableEnvPairs []string - }{ - Hash: pack.Hash, - Task: task, - Outputs: outputs, - PassThruArgs: passThroughArgs, - HashableEnvPairs: hashableEnvPairs, - } - hash, err := fs.HashObject(hashable) - targetLogger.Debug("task hash", "value", hash) + passThroughArgs := e.rs.ArgsForTask(pt.task) + hash, err := pt.hash(passThroughArgs, e.logger) + e.logger.Debug("task hash", "value", hash) if err != nil { - targetUi.Error(fmt.Sprintf("Hashing error: %v", err)) + e.ui.Error(fmt.Sprintf("Hashing error: %v", err)) // @TODO probably should abort fatally??? } - logFileName := filepath.Join(pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task)) - targetLogger.Debug("log file", "path", filepath.Join(e.rs.Opts.cwd, logFileName)) - // Cache --------------------------------------------- var hit bool if !e.rs.Opts.forceExecution { @@ -841,7 +811,7 @@ func (e *execContext) exec(id string) error { } // Setup command execution - argsactual := append([]string{"run"}, task) + argsactual := append([]string{"run"}, pt.task) argsactual = append(argsactual, passThroughArgs...) // @TODO: @jaredpalmer fix this hack to get the package manager's name var cmd *exec.Cmd @@ -850,7 +820,7 @@ func (e *execContext) exec(id string) error { } else { cmd = exec.Command(strings.TrimPrefix(e.backend.Name, "nodejs-"), argsactual...) } - cmd.Dir = pack.Dir + cmd.Dir = pt.pkg.Dir envs := fmt.Sprintf("TURBO_HASH=%v", hash) cmd.Env = append(os.Environ(), envs) @@ -858,7 +828,7 @@ func (e *execContext) exec(id string) error { // If we are not caching anything, then we don't need to write logs to disk // be careful about this conditional given the default of cache = true var writer io.Writer - if !e.rs.Opts.cache || (pipeline.Cache != nil && !*pipeline.Cache) { + if !e.rs.Opts.cache || (pt.pipeline.Cache != nil && !*pt.pipeline.Cache) { writer = os.Stdout } else { // Setup log file @@ -915,10 +885,10 @@ func (e *execContext) exec(id string) error { defer f.Close() scan := bufio.NewScanner(f) e.ui.Error("") - e.ui.Error(util.Sprintf("%s ${RED}%s finished with error${RESET}", ui.ERROR_PREFIX, util.GetTaskId(pack.Name, task))) + e.ui.Error(util.Sprintf("%s ${RED}%s finished with error${RESET}", ui.ERROR_PREFIX, util.GetTaskId(pt.pkg.Name, pt.task))) e.ui.Error("") for scan.Scan() { - e.ui.Output(util.Sprintf("${RED}%s:%s: ${RESET}%s", pack.Name, task, scan.Bytes())) //Writing to Stdout + e.ui.Output(util.Sprintf("${RED}%s:%s: ${RESET}%s", pt.pkg.Name, pt.task, scan.Bytes())) //Writing to Stdout } } e.processes.Close() @@ -931,11 +901,12 @@ func (e *execContext) exec(id string) error { } // Cache command outputs - if e.rs.Opts.cache && (pipeline.Cache == nil || *pipeline.Cache) { + if e.rs.Opts.cache && (pt.pipeline.Cache == nil || *pt.pipeline.Cache) { + outputs := pt.Outputs() targetLogger.Debug("caching output", "outputs", outputs) ignore := []string{} - filesToBeCached := globby.GlobFiles(pack.Dir, outputs, ignore) - if err := e.turboCache.Put(pack.Dir, hash, int(time.Since(cmdTime).Milliseconds()), filesToBeCached); err != nil { + filesToBeCached := globby.GlobFiles(pt.pkg.Dir, outputs, ignore) + if err := e.turboCache.Put(pt.pkg.Dir, hash, int(time.Since(cmdTime).Milliseconds()), filesToBeCached); err != nil { e.logError(targetLogger, "", fmt.Errorf("error caching output: %w", err)) } } @@ -999,3 +970,82 @@ func (c *RunCommand) generateDotGraph(taskGraph *dag.AcyclicGraph, outputFilenam } return nil } + +type packageTask struct { + taskID string + task string + packageName string + pkg *fs.PackageJSON + pipeline *fs.Pipeline +} + +func (pt *packageTask) Outputs() []string { + outputs := []string{fmt.Sprintf(".turbo/turbo-%v.log", pt.task)} + if pt.pipeline.Outputs == nil { + outputs = append(outputs, "dist/**/*", "build/**/*") + } else { + outputs = append(outputs, pt.pipeline.Outputs...) + } + return outputs +} + +func (pt *packageTask) hash(args []string, logger hclog.Logger) (string, error) { + // Hash --------------------------------------------- + outputs := pt.Outputs() + logger.Debug("task output globs", "outputs", outputs) + + // Hash the task-specific environment variables found in the dependsOnKey in the pipeline + var hashableEnvVars []string + var hashableEnvPairs []string + if len(pt.pipeline.DependsOn) > 0 { + for _, v := range pt.pipeline.DependsOn { + if strings.Contains(v, ENV_PIPELINE_DELIMITER) { + trimmed := strings.TrimPrefix(v, ENV_PIPELINE_DELIMITER) + hashableEnvPairs = append(hashableEnvPairs, fmt.Sprintf("%v=%v", trimmed, os.Getenv(trimmed))) + hashableEnvVars = append(hashableEnvVars, trimmed) + } + } + sort.Strings(hashableEnvVars) // always sort them + } + logger.Debug("hashable env vars", "vars", hashableEnvVars) + hashable := struct { + Hash string + Task string + Outputs []string + PassThruArgs []string + HashableEnvPairs []string + }{ + Hash: pt.pkg.Hash, + Task: pt.task, + Outputs: outputs, + PassThruArgs: args, + HashableEnvPairs: hashableEnvPairs, + } + return fs.HashObject(hashable) +} + +func (g *completeGraph) getPackageTaskVisitor(visitor func(pt *packageTask) error) func(taskID string) error { + return func(taskID string) error { + name, task := util.GetPackageTaskFromId(taskID) + pkg := g.PackageInfos[name] + // first check for package-tasks + pipeline, ok := g.Pipeline[fmt.Sprintf("%v", taskID)] + if !ok { + // then check for regular tasks + altpipe, notcool := g.Pipeline[task] + // if neither, then bail + if !notcool && !ok { + return nil + } + // override if we need to... + pipeline = altpipe + } + return visitor(&packageTask{ + taskID: taskID, + task: task, + packageName: name, + pkg: pkg, + pipeline: &pipeline, + }) + } +} From f70bf38e757c84415533b210e28cacb32f5cfe09 Mon Sep 17 00:00:00 2001 From: Jared Palmer Date: Fri, 11 Mar 2022 12:02:59 -0500 Subject: [PATCH 6/9] Add more details to dry run outputs --- cli/internal/run/run.go | 74 ++++++++++++++++++++++++++++++++++++----- 1 file changed, 66 insertions(+), 8 deletions(-) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index 6d7518012b041..c8c40cbd7e2cb 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -15,6 +15,7 @@ import ( "strconv" "strings" "sync" + "text/tabwriter" "time" "github.com/vercel/turborepo/cli/internal/analytics" @@ -297,14 +298,30 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La } c.Ui.Output(string(bytes)) } else { - c.Ui.Info("Packages in scope:") + c.Ui.Output("") + c.Ui.Info(util.Sprintf("${CYAN}${BOLD}Packages in Scope${RESET}")) + p := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) + fmt.Fprintln(p, "Name\tPath\t") for _, pkg := range packagesInScope { - c.Ui.Output(fmt.Sprintf(ui.Bold("• %v"), pkg)) + fmt.Fprintln(p, fmt.Sprintf("%s\t%s\t", pkg, g.PackageInfos[pkg].Dir)) } - c.Ui.Info("Tasks to run:") + p.Flush() + + c.Ui.Output("") + c.Ui.Info(util.Sprintf("${CYAN}${BOLD}Tasks to Run${RESET}")) + for _, task := range tasksRun { - c.Ui.Output(fmt.Sprintf(ui.Bold("• %v"), task)) + c.Ui.Info(util.Sprintf("${BOLD}%s${RESET}", task.TaskID)) + w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Hash\t=\t%s\t${RESET}", task.Hash)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Directory\t=\t%s\t${RESET}", task.Dir)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Command\t=\t%s\t${RESET}", task.Command)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Outputs\t=\t%s\t${RESET}", strings.Join(task.Outputs[1:], ", "))) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Dependencies\t=\t%s\t${RESET}", strings.Join(task.Dependencies, ", "))) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Dependendents\t=\t%s\t${RESET}", strings.Join(task.Dependents, ", "))) + w.Flush() } + } } else { packagesInScope := rs.FilteredPkgs.UnsafeListOfStrings() @@ -652,22 +669,63 @@ func (c *RunCommand) executeTasks(g *completeGraph, rs *runSpec, engine *core.Sc } type hashedTask struct { - TaskID string `json:"taskId"` - Hash string `json:"hash"` + TaskID string `json:"taskId"` + Hash string `json:"hash"` + Command string `json:"command"` + Outputs []string `json:"outputs"` + Dir string `json:"directory"` + Dependencies []string `json:"dependencies"` + Dependents []string `json:"dependents"` + // TaskDeps []string `json:"taskDependendents"` // TODO(gsoltis): other data we might want here? inputs, args, outputs, etc. } func (c *RunCommand) executeDryRun(engine *core.Scheduler, g *completeGraph, rs *runSpec, logger hclog.Logger) ([]hashedTask, error) { taskIDs := []hashedTask{} errs := engine.Execute(g.getPackageTaskVisitor(func(pt *packageTask) error { + command, ok := pt.pkg.Scripts[pt.task] + if !ok { + logger.Debug("no task in package, skipping") + logger.Debug("done", "status", "skipped") + return nil + } passThroughArgs := rs.ArgsForTask(pt.task) hash, err := pt.hash(passThroughArgs, logger) if err != nil { return err } + ancestors, err := engine.TaskGraph.Ancestors(pt.taskID) + if err != nil { + return err + } + stringAncestors := []string{} + for _, dep := range ancestors { + // Don't leak out internal ROOT_NODE_NAME nodes, which are just placeholders + if !strings.Contains(dep.(string), core.ROOT_NODE_NAME) { + stringAncestors = append(stringAncestors, dep.(string)) + } + } + descendents, err := engine.TaskGraph.Descendents(pt.taskID) + if err != nil { + return err + } + stringDescendents := []string{} + for _, dep := range descendents { + // Don't leak out internal ROOT_NODE_NAME nodes, which are just placeholders + if !strings.Contains(dep.(string), core.ROOT_NODE_NAME) { + stringDescendents = append(stringDescendents, dep.(string)) + } + } + sort.Strings(stringDescendents) + taskIDs = append(taskIDs, hashedTask{ - TaskID: pt.taskID, - Hash: hash, + TaskID: pt.taskID, + Hash: hash, + Command: command, + Dir: pt.pkg.Dir, + Outputs: pt.Outputs(), + Dependencies: stringAncestors, + Dependents: stringDescendents, }) return nil }), core.ExecOpts{ From 1de3314f0145227ceab748d1d2ee7d795f64dfd8 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 11 Mar 2022 12:30:52 -0800 Subject: [PATCH 7/9] Allow --dry in addition to --dry-run (#867) --- cli/internal/run/run.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index c8c40cbd7e2cb..be00019ab5cea 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -133,9 +133,9 @@ Options: (default false) --no-cache Avoid saving task results to the cache. Useful for development/watch tasks. (default false) - --dry-run[=json] List the packages in scope and the tasks that would be run, - but don't actually run them. - Passing --dry-run=json will render the output in JSON format. + --dry/--dry-run[=json] List the packages in scope and the tasks that would be run, + but don't actually run them. Passing --dry=json or + --dry-run=json will render the output in JSON format. `) return strings.TrimSpace(helpText) } @@ -561,6 +561,11 @@ func parseRunArgs(args []string, output cli.Ui) (*RunOptions, error) { if strings.HasPrefix(arg, "--dry-run=json") { runOptions.dryRunJson = true } + case strings.HasPrefix(arg, "--dry"): + runOptions.dryRun = true + if strings.HasPrefix(arg, "--dry=json") { + runOptions.dryRunJson = true + } case strings.HasPrefix(arg, "--team"): case strings.HasPrefix(arg, "--token"): case strings.HasPrefix(arg, "--api"): From 8ec37cfb4598cf4ab561c30a9c63dc35bd0230af Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 11 Mar 2022 13:35:22 -0800 Subject: [PATCH 8/9] Add package name and task to dry run output --- cli/internal/run/run.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index be00019ab5cea..70a9ad8bb9cf3 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -313,6 +313,8 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La for _, task := range tasksRun { c.Ui.Info(util.Sprintf("${BOLD}%s${RESET}", task.TaskID)) w := tabwriter.NewWriter(os.Stdout, 0, 0, 1, ' ', 0) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Task\t=\t%s\t${RESET}", task.Task)) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Package\t=\t%s\t${RESET}", task.Package)) fmt.Fprintln(w, util.Sprintf(" ${GREY}Hash\t=\t%s\t${RESET}", task.Hash)) fmt.Fprintln(w, util.Sprintf(" ${GREY}Directory\t=\t%s\t${RESET}", task.Dir)) fmt.Fprintln(w, util.Sprintf(" ${GREY}Command\t=\t%s\t${RESET}", task.Command)) @@ -675,14 +677,14 @@ func (c *RunCommand) executeTasks(g *completeGraph, rs *runSpec, engine *core.Sc type hashedTask struct { TaskID string `json:"taskId"` + Task string `json:"task"` + Package string `json:"package"` Hash string `json:"hash"` Command string `json:"command"` Outputs []string `json:"outputs"` Dir string `json:"directory"` Dependencies []string `json:"dependencies"` Dependents []string `json:"dependents"` - // TaskDeps []string `json:"taskDependendents"` - // TODO(gsoltis): other data we might want here? inputs, args, outputs, etc. } func (c *RunCommand) executeDryRun(engine *core.Scheduler, g *completeGraph, rs *runSpec, logger hclog.Logger) ([]hashedTask, error) { @@ -725,6 +727,8 @@ func (c *RunCommand) executeDryRun(engine *core.Scheduler, g *completeGraph, rs taskIDs = append(taskIDs, hashedTask{ TaskID: pt.taskID, + Task: pt.task, + Package: pt.packageName, Hash: hash, Command: command, Dir: pt.pkg.Dir, From ab966a8ee7c6bc06c6e3b18516cad53ffdb8c512 Mon Sep 17 00:00:00 2001 From: Greg Soltis Date: Fri, 11 Mar 2022 15:10:07 -0800 Subject: [PATCH 9/9] Add log file to dry run output --- cli/internal/run/run.go | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index 70a9ad8bb9cf3..1f0f06c2fe243 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -319,6 +319,7 @@ func (c *RunCommand) runOperation(g *completeGraph, rs *runSpec, backend *api.La fmt.Fprintln(w, util.Sprintf(" ${GREY}Directory\t=\t%s\t${RESET}", task.Dir)) fmt.Fprintln(w, util.Sprintf(" ${GREY}Command\t=\t%s\t${RESET}", task.Command)) fmt.Fprintln(w, util.Sprintf(" ${GREY}Outputs\t=\t%s\t${RESET}", strings.Join(task.Outputs[1:], ", "))) + fmt.Fprintln(w, util.Sprintf(" ${GREY}Log File\t=\t%s\t${RESET}", task.LogFile)) fmt.Fprintln(w, util.Sprintf(" ${GREY}Dependencies\t=\t%s\t${RESET}", strings.Join(task.Dependencies, ", "))) fmt.Fprintln(w, util.Sprintf(" ${GREY}Dependendents\t=\t%s\t${RESET}", strings.Join(task.Dependents, ", "))) w.Flush() @@ -682,6 +683,7 @@ type hashedTask struct { Hash string `json:"hash"` Command string `json:"command"` Outputs []string `json:"outputs"` + LogFile string `json:"logFile"` Dir string `json:"directory"` Dependencies []string `json:"dependencies"` Dependents []string `json:"dependents"` @@ -732,7 +734,8 @@ func (c *RunCommand) executeDryRun(engine *core.Scheduler, g *completeGraph, rs Hash: hash, Command: command, Dir: pt.pkg.Dir, - Outputs: pt.Outputs(), + Outputs: pt.ExternalOutputs(), + LogFile: pt.RepoRelativeLogFile(), Dependencies: stringAncestors, Dependents: stringDescendents, }) @@ -969,7 +972,7 @@ func (e *execContext) exec(pt *packageTask) error { // Cache command outputs if e.rs.Opts.cache && (pt.pipeline.Cache == nil || *pt.pipeline.Cache) { - outputs := pt.Outputs() + outputs := pt.HashableOutputs() targetLogger.Debug("caching output", "outputs", outputs) ignore := []string{} filesToBeCached := globby.GlobFiles(pt.pkg.Dir, outputs, ignore) @@ -1046,19 +1049,26 @@ type packageTask struct { pipeline *fs.Pipeline } -func (pt *packageTask) Outputs() []string { - outputs := []string{fmt.Sprintf(".turbo/turbo-%v.log", pt.task)} +func (pt *packageTask) ExternalOutputs() []string { if pt.pipeline.Outputs == nil { - outputs = append(outputs, "dist/**/*", "build/**/*") - } else { - outputs = append(outputs, pt.pipeline.Outputs...) + return []string{"dist/**/*", "build/**/*"} } + return pt.pipeline.Outputs +} + +func (pt *packageTask) RepoRelativeLogFile() string { + return filepath.Join(pt.pkg.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", pt.task)) +} + +func (pt *packageTask) HashableOutputs() []string { + outputs := []string{fmt.Sprintf(".turbo/turbo-%v.log", pt.task)} + outputs = append(outputs, pt.ExternalOutputs()...) return outputs } func (pt *packageTask) hash(args []string, logger hclog.Logger) (string, error) { // Hash --------------------------------------------- - outputs := pt.Outputs() + outputs := pt.HashableOutputs() logger.Debug("task output globs", "outputs", outputs) // Hash the task-specific environment variables found in the dependsOnKey in the pipeline