Skip to content

Commit

Permalink
Refactor log streaming (#462)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredpalmer authored Dec 29, 2021
1 parent 5d41833 commit f2c12d9
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 77 deletions.
2 changes: 1 addition & 1 deletion cli/internal/context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (

const (
ROOT_NODE_NAME = "___ROOT___"
GLOBAL_CACHE_KEY = "hello"
GLOBAL_CACHE_KEY = "snozzberries"
)

// A BuildResultStatus represents the status of a target when we log a build result.
Expand Down
121 changes: 45 additions & 76 deletions cli/internal/run/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"turbo/internal/core"
"turbo/internal/fs"
"turbo/internal/globby"
"turbo/internal/logstreamer"
"turbo/internal/scm"
"turbo/internal/ui"
"turbo/internal/util"
Expand Down Expand Up @@ -443,9 +444,6 @@ func (c *RunCommand) Run(args []string) int {
targetLogger.Debug("log file", "path", filepath.Join(runOptions.cwd, logFileName))

// Cache ---------------------------------------------
// We create the real task outputs now so we can potentially use them to
// to store artifacts from remote cache to local fs cache

var hit bool
if runOptions.forceExecution {
hit = false
Expand All @@ -456,35 +454,20 @@ func (c *RunCommand) Run(args []string) int {
} else if hit {
if runOptions.stream && fs.FileExists(filepath.Join(runOptions.cwd, logFileName)) {
logReplayWaitGroup.Add(1)
targetUi.Output(fmt.Sprintf("cache hit, replaying output %s", ui.Dim(hash)))
go replayLogs(targetLogger, targetUi, runOptions, logFileName, hash, &logReplayWaitGroup, false)
go replayLogs(targetLogger, c.Ui, runOptions, logFileName, hash, &logReplayWaitGroup, false)
}
targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime))
tracer(TargetCached, nil)

return nil
}
}
// Setup log file
if err := fs.EnsureDir(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task))); err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}
output, err := os.Create(filepath.Join(runOptions.cwd, pack.Dir, ".turbo", fmt.Sprintf("turbo-%v.log", task)))
if err != nil {
fmt.Println("here")
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}
defer output.Close()

if runOptions.stream {
targetUi.Output(fmt.Sprintf("cache miss, executing %s", ui.Dim(hash)))
}

// Setup command execution
argsactual := append([]string{"run"}, task)
argsactual = append(argsactual, runOptions.passThroughArgs...)
// @TODO: @jaredpalmer fix this hack to get the package manager's name
Expand All @@ -493,64 +476,50 @@ func (c *RunCommand) Run(args []string) int {
envs := fmt.Sprintf("TURBO_HASH=%v", hash)
cmd.Env = append(os.Environ(), envs)

// Get a pipe to read from stdout and stderr
stdout, err := cmd.StdoutPipe()
defer stdout.Close()
if err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}
stderr, err := cmd.StderrPipe()
defer stderr.Close()
if err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}

writer := bufio.NewWriter(output)

// Merge the streams together
merged := io.MultiReader(stderr, stdout)

// Create a scanner which scans r in a line-by-line fashion
scanner := bufio.NewScanner(merged)

// Execute command
// Failed to spawn?
if err := cmd.Start(); err != nil {
tracer(TargetBuildFailed, err)
writer.Flush()
if runOptions.bail {
targetLogger.Error("Could not spawn command: %w", err)
targetUi.Error(fmt.Sprintf("Could not spawn command: %v", err))
os.Exit(1)
}
targetUi.Warn("could not spawn command, but continuing...")
}
// Read line by line and process it
if runOptions.stream || runOptions.cache {
for scanner.Scan() {
line := scanner.Text()
if runOptions.stream {
targetUi.Output(string(scanner.Bytes()))
// Setup stdout/stderr
// If we are not caching anything, then we don't need to write logs to disk
// be careful about this conditional given the default of cache = true
var combinedWriter io.Writer
if !runOptions.cache || !(pipeline.Cache == nil || *pipeline.Cache) {
combinedWriter = os.Stdout
} else {
// Setup log file
if err := fs.EnsureDir(logFileName); err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
if runOptions.cache {
writer.WriteString(fmt.Sprintf("%v\n", line))
}
output, err := os.Create(logFileName)
if err != nil {
tracer(TargetBuildFailed, err)
c.logError(targetLogger, actualPrefix, err)
if runOptions.bail {
os.Exit(1)
}
}
defer output.Close()
bufWriter := bufio.NewWriter(output)
bufWriter.WriteString(fmt.Sprintf("%scache hit, replaying output %s\n", actualPrefix, ui.Dim(hash)))
defer bufWriter.Flush()
combinedWriter = io.MultiWriter(os.Stdout, bufWriter)
logger := log.New(combinedWriter, "", 0)
// Setup a streamer that we'll pipe cmd.Stdout to
logStreamerOut := logstreamer.NewLogstreamer(logger, actualPrefix, false)
// Setup a streamer that we'll pipe cmd.Stderr to.
logStreamerErr := logstreamer.NewLogstreamer(logger, actualPrefix, false)
cmd.Stderr = logStreamerErr
cmd.Stdout = logStreamerOut
// Flush/Reset any error we recorded
logStreamerErr.FlushRecord()
logStreamerOut.FlushRecord()
}

// Run the command
if err := cmd.Wait(); err != nil {
if err := cmd.Run(); err != nil {
tracer(TargetBuildFailed, err)
targetLogger.Error("Error: command finished with error: %w", err)
writer.Flush()
if runOptions.bail {
if runOptions.stream {
targetUi.Error(fmt.Sprintf("Error: command finished with error: %s", err))
Expand Down Expand Up @@ -579,8 +548,7 @@ func (c *RunCommand) Run(args []string) int {
return nil
}

writer.Flush()

// Cache command outputs
if runOptions.cache && (pipeline.Cache == nil || *pipeline.Cache) {
targetLogger.Debug("caching output", "outputs", outputs)
ignore := []string{}
Expand All @@ -590,6 +558,7 @@ func (c *RunCommand) Run(args []string) int {
}
}

// Clean up tracing
tracer(TargetBuilt, nil)
targetLogger.Debug("done", "status", "complete", "duration", time.Since(cmdTime))
return nil
Expand Down Expand Up @@ -923,7 +892,7 @@ func replayLogs(logger hclog.Logger, prefixUi cli.Ui, runOptions *RunOptions, lo
defer f.Close()
scan := bufio.NewScanner(f)
for scan.Scan() {
prefixUi.Output(ui.Dim(string(scan.Bytes()))) //Writing to Stdout
prefixUi.Output(ui.StripAnsi(string(scan.Bytes()))) //Writing to Stdout
}
logger.Debug("finish replaying logs")
}
11 changes: 11 additions & 0 deletions cli/internal/ui/ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (
"io"
"math"
"os"
"regexp"
"strings"

"github.com/fatih/color"
"github.com/mattn/go-isatty"
)

const ESC = 27
const ansiEscapeStr = "[\u001B\u009B][[\\]()#;?]*(?:(?:(?:[a-zA-Z\\d]*(?:;[a-zA-Z\\d]*)*)?\u0007)|(?:(?:\\d{1,4}(?:;\\d{0,4})*)?[\\dA-PRZcf-ntqry=><~]))"

var IsTTY = isatty.IsTerminal(os.Stdout.Fd()) || isatty.IsCygwinTerminal(os.Stdout.Fd())
var IsCI = os.Getenv("CI") == "true" || os.Getenv("BUILD_NUMBER") == "true" || os.Getenv("TEAMCITY_VERSION") != ""
Expand All @@ -27,6 +29,15 @@ func ClearLines(writer io.Writer, count int) {
_, _ = fmt.Fprint(writer, strings.Repeat(clear, count))
}

var ansiRegex = regexp.MustCompile(ansiEscapeStr)

func StripAnsi(str string) string {
if !IsTTY {
return ansiRegex.ReplaceAllString(str, "")
}
return str
}

// Dim prints out dimmed text
func Dim(str string) string {
return gray.Sprint(str)
Expand Down

1 comment on commit f2c12d9

@vercel
Copy link

@vercel vercel bot commented on f2c12d9 Dec 29, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.