Skip to content

Commit

Permalink
chore: avoid allocations during log capture and replay (#6189)
Browse files Browse the repository at this point in the history
### Description

The use of `mpsc` to capture output from child processes really bothered
me. I figured out how to use `select!` to avoid it and I kept pulling
that thread and ended up with this PR.

This PR:
- Drops usage of `mpsc` to combine both stdout/stderr to a single writer
- Removes usage of `bytelines` which allows us to avoid temporary
allocations (except for a single buffer) when capturing child process
output
 - Folds in the two child process output capture into a single method

I'd suggest reviewing each commit on it's own.

### Testing Instructions

Existing unit tests and testing log replay in a test repo:
```
[0 olszewski@chriss-mbp] /tmp/testing $ turbo_dev build --experimental-rust-codepath 
docs:build: cache hit (outputs already on disk), suppressing logs 865e3d756acfe0ef
web:build: cache hit (outputs already on disk), suppressing logs a97ae6e5ab4613c0
docs:build: 
docs:build: > docs@1.0.0 build /private/tmp/testing/apps/docs
docs:build: > next build
docs:build: 
docs:build: - info Creating an optimized production build...
docs:build: - info Compiled successfully
docs:build: - info Linting and checking validity of types...
docs:build: - info Collecting page data...
docs:build: - info Generating static pages (0/4)
docs:build: - info Generating static pages (1/4)
docs:build: - info Generating static pages (2/4)
docs:build: - info Generating static pages (3/4)
docs:build: - info Generating static pages (4/4)
docs:build: - info Finalizing page optimization...
docs:build: 
docs:build: Route (app)                                Size     First Load JS
docs:build: ┌ ○ /                                      5.52 kB          84 kB
docs:build: └ ○ /favicon.ico                           0 B                0 B
docs:build: + First Load JS shared by all              78.5 kB
docs:build:   ├ chunks/934-196dcc5a61008b80.js         26.1 kB
web:build: 
web:build: > web@1.0.0 build /private/tmp/testing/apps/web
web:build: > next build
web:build: 
docs:build:   ├ chunks/c260e7fb-1dc88cd74c938f5d.js    50.5 kB
docs:build:   ├ chunks/main-app-2d713702b8a6a8c1.js    220 B
docs:build:   └ chunks/webpack-46498be4babc7638.js     1.68 kB
docs:build: 
docs:build: Route (pages)                              Size     First Load JS
docs:build: ─ ○ /404                                   182 B          76.5 kB
docs:build: + First Load JS shared by all              76.3 kB
docs:build:   ├ chunks/framework-eb124dc7acb3bb04.js   45.1 kB
web:build: - info Creating an optimized production build...
docs:build:   ├ chunks/main-15364e85b6f1124e.js        29.4 kB
docs:build:   ├ chunks/pages/_app-82ff52170628f1f6.js  191 B
docs:build:   └ chunks/webpack-46498be4babc7638.js     1.68 kB
web:build: - info Compiled successfully
web:build: - info Linting and checking validity of types...
web:build: - info Collecting page data...
web:build: - info Generating static pages (0/4)
web:build: - info Generating static pages (1/4)
web:build: - info Generating static pages (2/4)
web:build: - info Generating static pages (3/4)
docs:build: 
docs:build: ○  (Static)  automatically rendered as static HTML (uses no initial props)
docs:build: 
web:build: - info Generating static pages (4/4)
web:build: - info Finalizing page optimization...
web:build: 
web:build: Route (app)                                Size     First Load JS
web:build: ┌ ○ /                                      5.52 kB          84 kB
web:build: └ ○ /favicon.ico                           0 B                0 B
web:build: + First Load JS shared by all              78.5 kB
web:build:   ├ chunks/934-196dcc5a61008b80.js         26.1 kB
web:build:   ├ chunks/c260e7fb-1dc88cd74c938f5d.js    50.5 kB
web:build:   ├ chunks/main-app-dc37cd09df02200e.js    219 B
web:build:   └ chunks/webpack-46498be4babc7638.js     1.68 kB
web:build: 
web:build: Route (pages)                              Size     First Load JS
web:build: ─ ○ /404                                   182 B          76.5 kB
web:build: + First Load JS shared by all              76.3 kB
web:build:   ├ chunks/framework-eb124dc7acb3bb04.js   45.1 kB
web:build:   ├ chunks/main-15364e85b6f1124e.js        29.4 kB
web:build:   ├ chunks/pages/_app-82ff52170628f1f6.js  191 B
web:build:   └ chunks/webpack-46498be4babc7638.js     1.68 kB
web:build: 
web:build: ○  (Static)  automatically rendered as static HTML (uses no initial props)
web:build:
```


Closes TURBO-1476

---------

Co-authored-by: Chris Olszewski <Chris Olszewski>
  • Loading branch information
chris-olszewski authored Oct 18, 2023
1 parent cb70622 commit 0aedfc7
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 106 deletions.
12 changes: 0 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/turborepo-lib/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ anyhow = { workspace = true, features = ["backtrace"] }
atty = { workspace = true }
axum = { workspace = true }
axum-server = { workspace = true }
bytelines = "2.4.0"
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true, features = ["derive", "env"] }
clap_complete = { workspace = true }
Expand Down
135 changes: 52 additions & 83 deletions crates/turborepo-lib/src/process/child.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,11 @@ use std::{
time::Duration,
};

use bytelines::AsyncByteLines;
use command_group::AsyncCommandGroup;
use futures::future::{try_join3, try_join4};
use itertools::Itertools;
pub use tokio::process::Command;
use tokio::{
io::{AsyncRead, BufReader},
io::{AsyncBufRead, AsyncBufReadExt, BufReader},
join,
sync::{mpsc, watch, RwLock},
};
Expand Down Expand Up @@ -364,95 +362,66 @@ impl Child {

/// Wait for the `Child` to exit and pipe any stdout and stderr to the
/// provided writers.
/// If `None` is passed for stderr then all output produced will be piped
/// to stdout
pub async fn wait_with_piped_outputs<W: Write>(
&mut self,
stdout_pipe: W,
stderr_pipe: W,
mut stdout_pipe: W,
mut stderr_pipe: Option<W>,
) -> Result<Option<ChildExit>, std::io::Error> {
// Note this is similar to tokio::process::Command::wait_with_outputs
// but allows us to provide our own sinks instead of just writing to a buffers.
async fn pipe_lines<R: AsyncRead + Unpin, W: Write>(
stream: Option<R>,
mut sink: W,
) -> std::io::Result<()> {
let Some(stream) = stream else { return Ok(()) };
let stream = BufReader::new(stream);
let mut lines = AsyncByteLines::new(stream);
while let Some(line) = lines.next().await? {
sink.write_all(line)?;
// Line iterator doesn't return newline delimiter so we must add it here
sink.write_all(b"\n")?;
async fn next_line<R: AsyncBufRead + Unpin>(
stream: &mut Option<R>,
buffer: &mut Vec<u8>,
) -> Option<Result<(), io::Error>> {
match stream {
Some(stream) => match stream.read_until(b'\n', buffer).await {
Ok(0) => None,
Ok(_) => Some(Ok(())),
Err(e) => Some(Err(e)),
},
None => None,
}
Ok(())
}

let stdout_fut = pipe_lines(self.stdout(), stdout_pipe);
let stderr_fut = pipe_lines(self.stderr(), stderr_pipe);
let mut stdout_lines = self.stdout().map(BufReader::new);
let mut stderr_lines = self.stderr().map(BufReader::new);

let (exit, _stdout, _stderr) =
try_join3(async { Ok(self.wait().await) }, stdout_fut, stderr_fut).await?;
let mut stdout_buffer = Vec::new();
let mut stderr_buffer = Vec::new();

Ok(exit)
}

/// Wait for the `Child` to exit and pipe any stdout and stderr to a
/// single writers.
pub async fn wait_with_single_piped_output<W: Write>(
&mut self,
pipe: W,
) -> Result<Option<ChildExit>, std::io::Error> {
// Note this is similar to tokio::process::Command::wait_with_outputs
// but allows us to provide our own sinks instead of just writing to a buffers.
async fn pipe_lines<R: AsyncRead + Unpin>(
stream: Option<R>,
tx: mpsc::Sender<Vec<u8>>,
) -> std::io::Result<()> {
let Some(stream) = stream else { return Ok(()) };
let stream = BufReader::new(stream);
let mut lines = AsyncByteLines::new(stream);
while let Some(line) = lines.next().await? {
let line = {
// Allocate vector with enough capacity for trailing newline
let mut buffer = Vec::with_capacity(line.len() + 1);
buffer.extend_from_slice(line);
// Line iterator doesn't return newline delimiter so we must add it here
buffer.push(b'\n');
buffer
};

if tx.send(line).await.is_err() {
// If the receiver is dropped then we have nothing to do with these bytes
loop {
tokio::select! {
Some(result) = next_line(&mut stdout_lines, &mut stdout_buffer) => {
result?;
stdout_pipe.write_all(&stdout_buffer)?;
stdout_buffer.clear();
}
Some(result) = next_line(&mut stderr_lines, &mut stderr_buffer) => {
result?;
stderr_pipe.as_mut().unwrap_or(&mut stdout_pipe).write_all(&stderr_buffer)?;
stderr_buffer.clear();
}
else => {
// In the case that both futures read a complete line
// the future not chosen in the select will return None if it's at EOF
// as the number of bytes read will be 0.
// We check and flush the buffers to avoid missing the last line of output.
if !stdout_buffer.is_empty() {
stdout_pipe.write_all(&stdout_buffer)?;
stdout_buffer.clear();
}
if !stderr_buffer.is_empty() {
stderr_pipe.as_mut().unwrap_or(&mut stdout_pipe).write_all(&stderr_buffer)?;
stderr_buffer.clear();
}
break;
}
}
Ok(())
}

async fn write_lines<W: Write>(
mut rx: mpsc::Receiver<Vec<u8>>,
mut writer: W,
) -> std::io::Result<()> {
while let Some(buffer) = rx.recv().await {
writer.write_all(&buffer)?;
}
Ok(())
}
debug_assert!(stdout_buffer.is_empty(), "buffer should be empty");
debug_assert!(stderr_buffer.is_empty(), "buffer should be empty");

let (tx, rx) = mpsc::channel(16);

let stdout_fut = pipe_lines(self.stdout(), tx.clone());
let stderr_fut = pipe_lines(self.stderr(), tx);
let write_fut = write_lines(rx, pipe);

let (exit, _stdout, _stderr, _write) = try_join4(
async { Ok(self.wait().await) },
stdout_fut,
stderr_fut,
write_fut,
)
.await?;

Ok(exit)
Ok(self.wait().await)
}

pub fn label(&self) -> &str {
Expand Down Expand Up @@ -707,7 +676,7 @@ mod test {
let mut err = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut out, &mut err)
.wait_with_piped_outputs(&mut out, Some(&mut err))
.await
.unwrap();

Expand All @@ -728,7 +697,7 @@ mod test {
let mut buffer = Vec::new();

let exit = child
.wait_with_single_piped_output(&mut buffer)
.wait_with_piped_outputs(&mut buffer, None)
.await
.unwrap();

Expand All @@ -750,7 +719,7 @@ mod test {
let mut err = Vec::new();

let exit = child
.wait_with_piped_outputs(&mut out, &mut err)
.wait_with_piped_outputs(&mut out, Some(&mut err))
.await
.unwrap();

Expand All @@ -771,7 +740,7 @@ mod test {
let mut buffer = Vec::new();

let exit = child
.wait_with_single_piped_output(&mut buffer)
.wait_with_piped_outputs(&mut buffer, None)
.await
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion crates/turborepo-lib/src/task_graph/visitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ impl<'a> Visitor<'a> {
};

let exit_status = match process
.wait_with_single_piped_output(&mut stdout_writer)
.wait_with_piped_outputs(&mut stdout_writer, None)
.await
{
Ok(Some(exit_status)) => exit_status,
Expand Down
1 change: 0 additions & 1 deletion crates/turborepo-ui/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ test-case = { workspace = true }

[dependencies]
atty = { workspace = true }
bytelines = "2.4.0"
console = { workspace = true }
indicatif = { workspace = true }
lazy_static = { workspace = true }
Expand Down
27 changes: 19 additions & 8 deletions crates/turborepo-ui/src/logs.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use std::{
fs::File,
io::{BufReader, BufWriter, Write},
io::{BufRead, BufReader, BufWriter, Write},
};

use bytelines::ByteLines;
use tracing::{debug, warn};
use turbopath::AbsoluteSystemPath;

Expand Down Expand Up @@ -92,15 +91,27 @@ pub fn replay_logs<W: Write>(
// Construct a PrefixedWriter which allows for non UTF-8 bytes to be written to
// it.
let mut prefixed_writer = output.output_prefixed_writer();
let log_reader = BufReader::new(log_file);
let lines = ByteLines::new(log_reader);
let mut log_reader = BufReader::new(log_file);

for line in lines.into_iter() {
let mut line = line.map_err(Error::CannotReadLogs)?;
line.push(b'\n');
let mut buffer = Vec::new();
loop {
let num_bytes = log_reader
.read_until(b'\n', &mut buffer)
.map_err(Error::CannotReadLogs)?;
if num_bytes == 0 {
break;
}

// If the log file doesn't end with a newline, then we add one to ensure the
// underlying writer receives a full line.
if !buffer.ends_with(b"\n") {
buffer.push(b'\n');
}
prefixed_writer
.write_all(&line)
.write_all(&buffer)
.map_err(Error::CannotReadLogs)?;

buffer.clear();
}

debug!("finish replaying logs");
Expand Down

0 comments on commit 0aedfc7

Please sign in to comment.