From 3a4dd9518ee679b6656f014d72afe7e1284a76b8 Mon Sep 17 00:00:00 2001 From: Jared Palmer Date: Wed, 29 Dec 2021 16:25:41 -0500 Subject: [PATCH 1/3] Refactor log streaming --- cli/internal/run/run.go | 116 +++++++++++++++------------------------- cli/internal/ui/ui.go | 11 ++++ 2 files changed, 54 insertions(+), 73 deletions(-) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index d8847d4837529..17c048f5fb8a3 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -20,6 +20,7 @@ import ( "turbo/internal/core" "turbo/internal/fs" "turbo/internal/globby" + "turbo/internal/logstreamer" "turbo/internal/scm" "turbo/internal/ui" "turbo/internal/util" @@ -456,32 +457,15 @@ func (c *RunCommand) Run(args []string) int { } else if hit { if runOptions.stream && fs.FileExists(filepath.Join(runOptions.cwd, logFileName)) { logReplayWaitGroup.Add(1) - targetUi.Output(fmt.Sprintf("cache hit, replaying output %s", ui.Dim(hash))) - go replayLogs(targetLogger, targetUi, runOptions, logFileName, hash, &logReplayWaitGroup, false) + go replayLogs(targetLogger, c.Ui, runOptions, logFileName, hash, &logReplayWaitGroup, false) } targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime)) tracer(TargetCached, nil) + return nil } } - // Setup log file - if err := fs.EnsureDir(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task))); err != nil { - tracer(TargetBuildFailed, err) - c.logError(targetLogger, actualPrefix, err) - if runOptions.bail { - os.Exit(1) - } - } - output, err := os.Create(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task))) - if err != nil { - fmt.Println("here") - tracer(TargetBuildFailed, err) - c.logError(targetLogger, actualPrefix, err) - if runOptions.bail { - os.Exit(1) - } - } - defer output.Close() + if runOptions.stream { targetUi.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(hash))) } @@ -492,65 +476,51 @@ func (c *RunCommand) Run(args []string) int { cmd.Dir = pack.Dir envs := fmt.Sprintf("TURBO_HASH=%v", hash) cmd.Env = append(os.Environ(), envs) - - // Get a pipe to read from stdout and stderr - stdout, err := cmd.StdoutPipe() - defer stdout.Close() - if err != nil { - tracer(TargetBuildFailed, err) - c.logError(targetLogger, actualPrefix, err) - if runOptions.bail { - os.Exit(1) - } - } - stderr, err := cmd.StderrPipe() - defer stderr.Close() - if err != nil { - tracer(TargetBuildFailed, err) - c.logError(targetLogger, actualPrefix, err) - if runOptions.bail { - os.Exit(1) - } - } - - writer := bufio.NewWriter(output) - - // Merge the streams together - merged := io.MultiReader(stderr, stdout) - - // Create a scanner which scans r in a line-by-line fashion - scanner := bufio.NewScanner(merged) - - // Execute command - // Failed to spawn? - if err := cmd.Start(); err != nil { - tracer(TargetBuildFailed, err) - writer.Flush() - if runOptions.bail { - targetLogger.Error("Could not spawn command: %w", err) - targetUi.Error(fmt.Sprintf("Could not spawn command: %v", err)) - os.Exit(1) - } - targetUi.Warn("could not spawn command, but continuing...") - } - // Read line by line and process it - if runOptions.stream || runOptions.cache { - for scanner.Scan() { - line := scanner.Text() - if runOptions.stream { - targetUi.Output(string(scanner.Bytes())) + var combinedWriter io.Writer + // be careful about this conditional given the default of cache = true + if !runOptions.cache || !(pipeline.Cache == nil || *pipeline.Cache) { + combinedWriter = os.Stdout + } else { + // Setup log file + if err := fs.EnsureDir(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task))); err != nil { + tracer(TargetBuildFailed, err) + c.logError(targetLogger, actualPrefix, err) + if runOptions.bail { + os.Exit(1) } - if runOptions.cache { - writer.WriteString(fmt.Sprintf("%v\n", line)) + } + output, err := os.Create(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task))) + if err != nil { + tracer(TargetBuildFailed, err) + c.logError(targetLogger, actualPrefix, err) + if runOptions.bail { + os.Exit(1) } } + defer output.Close() + bufWriter := bufio.NewWriter(output) + bufWriter.WriteString(fmt.Sprintf("%scache hit, replaying output %s\n", actualPrefix, ui.Dim(hash))) + defer bufWriter.Flush() + combinedWriter = io.MultiWriter(os.Stdout, bufWriter) + logger := log.New(combinedWriter, "", 0) + // Setup a streamer that we'll pipe cmd.Stdout to + logStreamerOut := logstreamer.NewLogstreamer(logger, actualPrefix, false) + // Setup a streamer that we'll pipe cmd.Stderr to. + // We want to record/buffer anything that's written to this (3rd argument true) + logStreamerErr := logstreamer.NewLogstreamer(logger, actualPrefix, false) + cmd.Stderr = logStreamerErr + cmd.Stdout = logStreamerOut + + // Reset any error we recorded + logStreamerErr.FlushRecord() + logStreamerOut.FlushRecord() } // Run the command - if err := cmd.Wait(); err != nil { + if err := cmd.Run(); err != nil { tracer(TargetBuildFailed, err) targetLogger.Error("Error: command finished with error: %w", err) - writer.Flush() + // writer.Flush() if runOptions.bail { if runOptions.stream { targetUi.Error(fmt.Sprintf("Error: command finished with error: %s", err)) @@ -579,7 +549,7 @@ func (c *RunCommand) Run(args []string) int { return nil } - writer.Flush() + // writer.Flush() if runOptions.cache && (pipeline.Cache == nil || *pipeline.Cache) { targetLogger.Debug("caching output", "outputs", outputs) @@ -923,7 +893,7 @@ func replayLogs(logger hclog.Logger, prefixUi cli.Ui, runOptions *RunOptions, lo defer f.Close() scan := bufio.NewScanner(f) for scan.Scan() { - prefixUi.Output(ui.Dim(string(scan.Bytes()))) //Writing to Stdout + prefixUi.Output(ui.StripAnsi(string(scan.Bytes()))) //Writing to Stdout } logger.Debug("finish replaying logs") } diff --git a/cli/internal/ui/ui.go b/cli/internal/ui/ui.go index 7a33cc9adc1a0..6b6ef4982b525 100644 --- a/cli/internal/ui/ui.go +++ b/cli/internal/ui/ui.go @@ -5,6 +5,7 @@ import ( "io" "math" "os" + "regexp" "strings" "github.com/fatih/color" @@ -12,6 +13,7 @@ import ( ) const ESC = 27 +const ansiEscapeStr = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))" var IsTTY = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsCygwinTerminal(os.Stdout.Fd()) var IsCI = os.Getenv("CI") == "true" || os.Getenv("BUILD_NUMBER") == "true" || os.Getenv("TEAMCITY_VERSION") != "" @@ -27,6 +29,15 @@ func ClearLines(writer io.Writer, count int) { _, _ = fmt.Fprint(writer, strings.Repeat(clear, count)) } +var ansiRegex = regexp.MustCompile(ansiEscapeStr) + +func StripAnsi(str string) string { + if !IsTTY { + return ansiRegex.ReplaceAllString(str, "") + } + return str +} + // Dim prints out dimmed text func Dim(str string) string { return gray.Sprint(str) From cca3f85f97704b9dd129c2341401df1a4e9af098 Mon Sep 17 00:00:00 2001 From: Jared Palmer Date: Wed, 29 Dec 2021 17:27:56 -0500 Subject: [PATCH 2/3] Cleanup and add comments --- cli/internal/run/run.go | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/cli/internal/run/run.go b/cli/internal/run/run.go index 17c048f5fb8a3..58b526f641f39 100644 --- a/cli/internal/run/run.go +++ b/cli/internal/run/run.go @@ -444,9 +444,6 @@ func (c *RunCommand) Run(args []string) int { targetLogger.Debug("log file", "path", filepath.Join(runOptions.cwd, logFileName)) // Cache --------------------------------------------- - // We create the real task outputs now so we can potentially use them to - // to store artifacts from remote cache to local fs cache - var hit bool if runOptions.forceExecution { hit = false @@ -469,6 +466,8 @@ func (c *RunCommand) Run(args []string) int { if runOptions.stream { targetUi.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(hash))) } + + // Setup command execution argsactual := append([]string{"run"}, task) argsactual = append(argsactual, runOptions.passThroughArgs...) // @TODO: @jaredpalmer fix this hack to get the package manager's name @@ -476,20 +475,23 @@ func (c *RunCommand) Run(args []string) int { cmd.Dir = pack.Dir envs := fmt.Sprintf("TURBO_HASH=%v", hash) cmd.Env = append(os.Environ(), envs) - var combinedWriter io.Writer + + // Setup stdout/stderr + // If we are not caching anything, then we don't need to write logs to disk // be careful about this conditional given the default of cache = true + var combinedWriter io.Writer if !runOptions.cache || !(pipeline.Cache == nil || *pipeline.Cache) { combinedWriter = os.Stdout } else { // Setup log file - if err := fs.EnsureDir(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task))); err != nil { + if err := fs.EnsureDir(logFileName); err != nil { tracer(TargetBuildFailed, err) c.logError(targetLogger, actualPrefix, err) if runOptions.bail { os.Exit(1) } } - output, err := os.Create(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task))) + output, err := os.Create(logFileName) if err != nil { tracer(TargetBuildFailed, err) c.logError(targetLogger, actualPrefix, err) @@ -506,12 +508,10 @@ func (c *RunCommand) Run(args []string) int { // Setup a streamer that we'll pipe cmd.Stdout to logStreamerOut := logstreamer.NewLogstreamer(logger, actualPrefix, false) // Setup a streamer that we'll pipe cmd.Stderr to. - // We want to record/buffer anything that's written to this (3rd argument true) logStreamerErr := logstreamer.NewLogstreamer(logger, actualPrefix, false) cmd.Stderr = logStreamerErr cmd.Stdout = logStreamerOut - - // Reset any error we recorded + // Flush/Reset any error we recorded logStreamerErr.FlushRecord() logStreamerOut.FlushRecord() } @@ -520,7 +520,6 @@ func (c *RunCommand) Run(args []string) int { if err := cmd.Run(); err != nil { tracer(TargetBuildFailed, err) targetLogger.Error("Error: command finished with error: %w", err) - // writer.Flush() if runOptions.bail { if runOptions.stream { targetUi.Error(fmt.Sprintf("Error: command finished with error: %s", err)) @@ -549,8 +548,7 @@ func (c *RunCommand) Run(args []string) int { return nil } - // writer.Flush() - + // Cache command outputs if runOptions.cache && (pipeline.Cache == nil || *pipeline.Cache) { targetLogger.Debug("caching output", "outputs", outputs) ignore := []string{} @@ -560,6 +558,7 @@ func (c *RunCommand) Run(args []string) int { } } + // Clean up tracing tracer(TargetBuilt, nil) targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime)) return nil From 1e6582e90a602a0733bc5bdfccaf1957fa958a82 Mon Sep 17 00:00:00 2001 From: Jared Palmer Date: Wed, 29 Dec 2021 17:34:07 -0500 Subject: [PATCH 3/3] Change the global cache key since logs are different now --- cli/internal/context/context.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/internal/context/context.go b/cli/internal/context/context.go index e319ed7888477..81af84534fb38 100644 --- a/cli/internal/context/context.go +++ b/cli/internal/context/context.go @@ -23,7 +23,7 @@ import ( const ( ROOT_NODE_NAME = "___ROOT___" - GLOBAL_CACHE_KEY = "hello" + GLOBAL_CACHE_KEY = "snozzberries" ) // A BuildResultStatus represents the status of a target when we log a build result.