From 50edd765b6cdb269d5930900db2f8a83a76656ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Wed, 27 Dec 2023 13:34:57 +0100 Subject: [PATCH 1/2] Refactor scenario extraction --- collector/src/compile/execute/bencher.rs | 33 +++++------------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/collector/src/compile/execute/bencher.rs b/collector/src/compile/execute/bencher.rs index 67d88df7b..9bb06df64 100644 --- a/collector/src/compile/execute/bencher.rs +++ b/collector/src/compile/execute/bencher.rs @@ -196,36 +196,17 @@ impl<'a> Processor for BenchProcessor<'a> { } } - let fut = match data.scenario { - Scenario::Full => self.insert_stats( - database::Scenario::Empty, - data.profile, - data.backend, - res, - ), - Scenario::IncrFull => self.insert_stats( - database::Scenario::IncrementalEmpty, - data.profile, - data.backend, - res, - ), - Scenario::IncrUnchanged => self.insert_stats( - database::Scenario::IncrementalFresh, - data.profile, - data.backend, - res, - ), + let scenario = match data.scenario { + Scenario::Full => database::Scenario::Empty, + Scenario::IncrFull => database::Scenario::IncrementalEmpty, + Scenario::IncrUnchanged => database::Scenario::IncrementalFresh, Scenario::IncrPatched => { let patch = data.patch.unwrap(); - self.insert_stats( - database::Scenario::IncrementalPatch(patch.name), - data.profile, - data.backend, - res, - ) + database::Scenario::IncrementalPatch(patch.name) } }; - fut.await; + self.insert_stats(scenario, data.profile, data.backend, res) + .await; Ok(Retry::No) } Err(DeserializeStatError::NoOutput(output)) => { From cd54cc4f1e61019c89dd17fccbee613e75d8cea6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ber=C3=A1nek?= Date: Wed, 27 Dec 2023 14:22:32 +0100 Subject: [PATCH 2/2] Upload self-profile files to S3 after each collection --- collector/src/compile/benchmark/mod.rs | 2 +- collector/src/compile/execute/bencher.rs | 211 ++++++++++++++++++----- collector/src/compile/execute/mod.rs | 94 +--------- 3 files changed, 174 insertions(+), 133 deletions(-) diff --git a/collector/src/compile/benchmark/mod.rs b/collector/src/compile/benchmark/mod.rs index c74bc6655..1a88ff63d 100644 --- a/collector/src/compile/benchmark/mod.rs +++ b/collector/src/compile/benchmark/mod.rs @@ -416,7 +416,7 @@ impl Benchmark { self.name, benchmark_start.elapsed().as_secs_f64() ); - + processor.postprocess_results().await; Ok(()) } } diff --git a/collector/src/compile/execute/bencher.rs b/collector/src/compile/execute/bencher.rs index 9bb06df64..623d7c749 100644 --- a/collector/src/compile/execute/bencher.rs +++ b/collector/src/compile/execute/bencher.rs @@ -4,19 +4,30 @@ use crate::compile::benchmark::scenario::Scenario; use crate::compile::benchmark::BenchmarkName; use crate::compile::execute; use crate::compile::execute::{ - rustc, DeserializeStatError, PerfTool, ProcessOutputData, Processor, Retry, SelfProfile, - SelfProfileFiles, Stats, Upload, + rustc, DeserializeStatError, PerfTool, ProcessOutputData, Processor, Retry, SelfProfileFiles, + Stats, }; use crate::toolchain::Toolchain; use crate::utils::git::get_rustc_perf_commit; +use anyhow::Context; +use database::CollectionId; use futures::stream::FuturesUnordered; -use futures::StreamExt; +use futures::{future, StreamExt}; +use std::collections::VecDeque; use std::future::Future; -use std::path::PathBuf; +use std::io::Read; +use std::path::{Path, PathBuf}; use std::pin::Pin; use std::process::Command; use std::{env, process}; +pub struct RecordedSelfProfile { + collection: CollectionId, + scenario: database::Scenario, + profile: database::Profile, + files: SelfProfileFiles, +} + // Tools usable with the benchmarking subcommands. #[derive(Clone, Copy, Debug, PartialEq)] pub enum Bencher { @@ -31,10 +42,10 @@ pub struct BenchProcessor<'a> { conn: &'a mut dyn database::Connection, artifact: &'a database::ArtifactId, artifact_row_id: database::ArtifactIdNumber, - upload: Option, is_first_collection: bool, is_self_profile: bool, tries: u8, + self_profiles: Vec, } impl<'a> BenchProcessor<'a> { @@ -63,7 +74,6 @@ impl<'a> BenchProcessor<'a> { } BenchProcessor { - upload: None, conn, benchmark, artifact, @@ -71,6 +81,7 @@ impl<'a> BenchProcessor<'a> { is_first_collection: true, is_self_profile, tries: 0, + self_profiles: vec![], } } @@ -79,8 +90,8 @@ impl<'a> BenchProcessor<'a> { scenario: database::Scenario, profile: Profile, backend: CodegenBackend, - stats: (Stats, Option, Option), - ) { + stats: Stats, + ) -> (CollectionId, database::Profile) { let version = get_rustc_perf_commit(); let collection = self.conn.collection_id(&version).await; @@ -97,41 +108,8 @@ impl<'a> BenchProcessor<'a> { CodegenBackend::Cranelift => database::CodegenBackend::Cranelift, }; - if let Some(files) = stats.2 { - if env::var_os("RUSTC_PERF_UPLOAD_TO_S3").is_some() { - // FIXME: Record codegen backend in the self profile name - - // We can afford to have the uploads run concurrently with - // rustc. Generally speaking, they take up almost no CPU time - // (just copying data into the network). Plus, during - // self-profile data timing noise doesn't matter as much. (We'll - // be migrating to instructions soon, hopefully, where the - // upload will cause even less noise). We may also opt at some - // point to defer these uploads entirely to the *end* or - // something like that. For now though this works quite well. - if let Some(u) = self.upload.take() { - u.wait(); - } - let prefix = PathBuf::from("self-profile") - .join(self.artifact_row_id.0.to_string()) - .join(self.benchmark.0.as_str()) - .join(profile.to_string()) - .join(scenario.to_id()); - self.upload = Some(Upload::new(prefix, collection, files)); - self.conn - .record_raw_self_profile( - collection, - self.artifact_row_id, - self.benchmark.0.as_str(), - profile, - scenario, - ) - .await; - } - } - let mut buf = FuturesUnordered::new(); - for (stat, value) in stats.0.iter() { + for (stat, value) in stats.iter() { buf.push(self.conn.record_statistic( collection, self.artifact_row_id, @@ -145,6 +123,7 @@ impl<'a> BenchProcessor<'a> { } while let Some(()) = buf.next().await {} + (collection, profile) } pub async fn measure_rustc(&mut self, toolchain: &Toolchain) -> anyhow::Result<()> { @@ -205,8 +184,19 @@ impl<'a> Processor for BenchProcessor<'a> { database::Scenario::IncrementalPatch(patch.name) } }; - self.insert_stats(scenario, data.profile, data.backend, res) + let (collection_id, profile) = self + .insert_stats(scenario, data.profile, data.backend, res.0) .await; + + if let Some(files) = res.2 { + self.self_profiles.push(RecordedSelfProfile { + collection: collection_id, + scenario, + profile, + files, + }); + } + Ok(Retry::No) } Err(DeserializeStatError::NoOutput(output)) => { @@ -231,4 +221,139 @@ impl<'a> Processor for BenchProcessor<'a> { } }) } + + fn postprocess_results<'b>(&'b mut self) -> Pin + 'b>> { + Box::pin(async move { + if env::var_os("RUSTC_PERF_UPLOAD_TO_S3").is_some() { + let futs = self + .self_profiles + .iter() + .map(|profile| { + self.conn.record_raw_self_profile( + profile.collection, + self.artifact_row_id, + self.benchmark.0.as_str(), + profile.profile, + profile.scenario, + ) + }) + .collect::>(); + future::join_all(futs).await; + + // Upload profiles to S3. Buffer up to 10 uploads at a time. + let mut uploads: VecDeque = VecDeque::new(); + for profile in self.self_profiles.drain(..) { + if uploads.len() == 10 { + uploads.pop_front().unwrap().wait(); + } + + // FIXME: Record codegen backend in the self profile name + let prefix = PathBuf::from("self-profile") + .join(self.artifact_row_id.0.to_string()) + .join(self.benchmark.0.as_str()) + .join(profile.profile.to_string()) + .join(profile.scenario.to_id()); + let upload = + SelfProfileS3Upload::new(prefix, profile.collection, profile.files); + uploads.push_back(upload); + } + for upload in uploads { + upload.wait(); + } + } + }) + } +} + +/// Uploads self-profile results to S3 +struct SelfProfileS3Upload(std::process::Child, tempfile::NamedTempFile); + +impl SelfProfileS3Upload { + fn new( + prefix: PathBuf, + collection: database::CollectionId, + files: SelfProfileFiles, + ) -> SelfProfileS3Upload { + // Files are placed at + // * self-profile//// + // /self-profile-.{extension} + let upload = tempfile::NamedTempFile::new() + .context("create temporary file") + .unwrap(); + let filename = match files { + SelfProfileFiles::Seven { + string_index, + string_data, + events, + } => { + let tarball = snap::write::FrameEncoder::new(Vec::new()); + let mut builder = tar::Builder::new(tarball); + builder.mode(tar::HeaderMode::Deterministic); + + let append_file = |builder: &mut tar::Builder<_>, + file: &Path, + name: &str| + -> anyhow::Result<()> { + if file.exists() { + // Silently ignore missing files, the new self-profile + // experiment with one file has a different structure. + builder.append_path_with_name(file, name)?; + } + Ok(()) + }; + + append_file(&mut builder, &string_index, "self-profile.string_index") + .expect("append string index"); + append_file(&mut builder, &string_data, "self-profile.string_data") + .expect("append string data"); + append_file(&mut builder, &events, "self-profile.events").expect("append events"); + builder.finish().expect("complete tarball"); + std::fs::write( + upload.path(), + builder + .into_inner() + .expect("get") + .into_inner() + .expect("snap success"), + ) + .expect("wrote tarball"); + format!("self-profile-{}.tar.sz", collection) + } + SelfProfileFiles::Eight { file } => { + let data = std::fs::read(file).expect("read profile data"); + let mut data = snap::read::FrameEncoder::new(&data[..]); + let mut compressed = Vec::new(); + data.read_to_end(&mut compressed).expect("compressed"); + std::fs::write(upload.path(), &compressed).expect("write compressed profile data"); + + format!("self-profile-{}.mm_profdata.sz", collection) + } + }; + + let child = Command::new("aws") + .arg("s3") + .arg("cp") + .arg("--storage-class") + .arg("INTELLIGENT_TIERING") + .arg("--only-show-errors") + .arg(upload.path()) + .arg(&format!( + "s3://rustc-perf/{}", + &prefix.join(filename).to_str().unwrap() + )) + .spawn() + .expect("spawn aws"); + + SelfProfileS3Upload(child, upload) + } + + fn wait(mut self) { + let start = std::time::Instant::now(); + let status = self.0.wait().expect("waiting for child"); + if !status.success() { + panic!("S3 upload failed: {:?}", status); + } + + log::trace!("uploaded to S3, additional wait: {:?}", start.elapsed()); + } } diff --git a/collector/src/compile/execute/mod.rs b/collector/src/compile/execute/mod.rs index 2965dac4d..47781a66d 100644 --- a/collector/src/compile/execute/mod.rs +++ b/collector/src/compile/execute/mod.rs @@ -15,7 +15,6 @@ use std::collections::HashMap; use std::env; use std::fs; use std::future::Future; -use std::io::Read; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::process::{self, Command}; @@ -431,6 +430,11 @@ pub trait Processor { output: process::Output, ) -> Pin> + 'a>>; + /// Postprocess results gathered during previous collection(s). + fn postprocess_results<'a>(&'a mut self) -> Pin + 'a>> { + Box::pin(async {}) + } + /// Provided to permit switching on more expensive profiling if it's needed /// for the "first" run for any given benchmark (we reuse the processor), /// e.g. disabling -Zself-profile. @@ -446,94 +450,6 @@ pub trait Processor { } } -struct Upload(std::process::Child, tempfile::NamedTempFile); - -impl Upload { - fn new(prefix: PathBuf, collection: database::CollectionId, files: SelfProfileFiles) -> Upload { - // Files are placed at - // * self-profile//// - // /self-profile-.{extension} - let upload = tempfile::NamedTempFile::new() - .context("create temporary file") - .unwrap(); - let filename = match files { - SelfProfileFiles::Seven { - string_index, - string_data, - events, - } => { - let tarball = snap::write::FrameEncoder::new(Vec::new()); - let mut builder = tar::Builder::new(tarball); - builder.mode(tar::HeaderMode::Deterministic); - - let append_file = |builder: &mut tar::Builder<_>, - file: &Path, - name: &str| - -> anyhow::Result<()> { - if file.exists() { - // Silently ignore missing files, the new self-profile - // experiment with one file has a different structure. - builder.append_path_with_name(file, name)?; - } - Ok(()) - }; - - append_file(&mut builder, &string_index, "self-profile.string_index") - .expect("append string index"); - append_file(&mut builder, &string_data, "self-profile.string_data") - .expect("append string data"); - append_file(&mut builder, &events, "self-profile.events").expect("append events"); - builder.finish().expect("complete tarball"); - std::fs::write( - upload.path(), - builder - .into_inner() - .expect("get") - .into_inner() - .expect("snap success"), - ) - .expect("wrote tarball"); - format!("self-profile-{}.tar.sz", collection) - } - SelfProfileFiles::Eight { file } => { - let data = std::fs::read(file).expect("read profile data"); - let mut data = snap::read::FrameEncoder::new(&data[..]); - let mut compressed = Vec::new(); - data.read_to_end(&mut compressed).expect("compressed"); - std::fs::write(upload.path(), &compressed).expect("write compressed profile data"); - - format!("self-profile-{}.mm_profdata.sz", collection) - } - }; - - let child = Command::new("aws") - .arg("s3") - .arg("cp") - .arg("--storage-class") - .arg("INTELLIGENT_TIERING") - .arg("--only-show-errors") - .arg(upload.path()) - .arg(&format!( - "s3://rustc-perf/{}", - &prefix.join(filename).to_str().unwrap() - )) - .spawn() - .expect("spawn aws"); - - Upload(child, upload) - } - - fn wait(mut self) { - let start = std::time::Instant::now(); - let status = self.0.wait().expect("waiting for child"); - if !status.success() { - panic!("S3 upload failed: {:?}", status); - } - - log::trace!("uploaded to S3, additional wait: {:?}", start.elapsed()); - } -} - fn store_documentation_size_into_stats(stats: &mut Stats, doc_dir: &Path) { match utils::fs::get_file_count_and_size(doc_dir) { Ok((count, size)) => {