From f2775138fb9578511509e76e1936811fd3b391e1 Mon Sep 17 00:00:00 2001 From: Benno van den Berg Date: Wed, 14 Jun 2023 15:14:50 +0200 Subject: [PATCH 1/4] Add support for pushgateway - Add writer that will suspend progressbars when writing to the it --- src/bin/am/commands.rs | 7 +- src/bin/am/commands/start.rs | 353 +++++++++++++++++++++++++--- src/bin/am/commands/system.rs | 5 +- src/bin/am/commands/system/prune.rs | 3 +- src/bin/am/interactive.rs | 65 ++++- src/bin/am/main.rs | 28 ++- 6 files changed, 404 insertions(+), 57 deletions(-) diff --git a/src/bin/am/commands.rs b/src/bin/am/commands.rs index 9313c13..1536646 100644 --- a/src/bin/am/commands.rs +++ b/src/bin/am/commands.rs @@ -1,5 +1,6 @@ use anyhow::Result; use clap::{Parser, Subcommand}; +use indicatif::MultiProgress; pub mod start; pub mod system; @@ -24,10 +25,10 @@ pub enum SubCommands { MarkdownHelp, } -pub async fn handle_command(app: Application) -> Result<()> { +pub async fn handle_command(app: Application, mp: MultiProgress) -> Result<()> { match app.command { - SubCommands::Start(args) => start::handle_command(args).await, - SubCommands::System(args) => system::handle_command(args).await, + SubCommands::Start(args) => start::handle_command(args, mp).await, + SubCommands::System(args) => system::handle_command(args, mp).await, SubCommands::MarkdownHelp => { clap_markdown::print_help_markdown::(); Ok(()) diff --git a/src/bin/am/commands/start.rs b/src/bin/am/commands/start.rs index 8eb4a64..4cbd646 100644 --- a/src/bin/am/commands/start.rs +++ b/src/bin/am/commands/start.rs @@ -9,15 +9,17 @@ use axum::Router; use clap::Parser; use directories::ProjectDirs; use flate2::read::GzDecoder; +use futures_util::FutureExt; use http::{StatusCode, Uri}; use include_dir::{include_dir, Dir}; -use indicatif::{ProgressBar, ProgressState, ProgressStyle}; +use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; use once_cell::sync::Lazy; use sha2::{Digest, Sha256}; use std::fs::File; use std::io::{BufWriter, Seek, SeekFrom, Write}; use std::net::SocketAddr; use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; use std::vec; use tempfile::NamedTempFile; @@ -30,6 +32,7 @@ use url::Url; static CLIENT: Lazy = Lazy::new(|| { reqwest::Client::builder() .user_agent(concat!("am/", env!("CARGO_PKG_VERSION"))) + .connect_timeout(Duration::from_secs(5)) .build() .expect("Unable to create reqwest client") }); @@ -50,16 +53,19 @@ pub struct Arguments { #[clap(short, long, env, default_value = "127.0.0.1:6789")] listen_address: SocketAddr, - /// Startup the gateway as well. - // TODO: Actually implement that we use this + /// Startup the pushgateway as well. #[clap(short, long, env)] - enable_gateway: bool, + enable_pushgateway: bool, + + /// The pushgateway version to use. Leave empty to use the latest version. + #[clap(long, env, default_value = "v1.6.0")] + pushgateway_version: String, } -pub async fn handle_command(mut args: Arguments) -> Result<()> { - if args.metrics_endpoints.is_empty() && args.enable_gateway { +pub async fn handle_command(mut args: Arguments, mp: MultiProgress) -> Result<()> { + if args.metrics_endpoints.is_empty() && args.enable_pushgateway { let endpoint = interactive::user_input("Endpoint")?; - args.metrics_endpoints.push(Url::parse(&endpoint)?); + args.metrics_endpoints.push(endpoint_parser(&endpoint)?); } // First let's retrieve the directory for our application to store data in. @@ -80,20 +86,32 @@ pub async fn handle_command(mut args: Arguments) -> Result<()> { } } + if args.enable_pushgateway { + args.metrics_endpoints + .push(Url::parse("http://localhost:9091/pushgateway/metrics").unwrap()); + } + // Start Prometheus server let prometheus_args = args.clone(); let prometheus_local_data = local_data.clone(); + let prometheus_multi_progress = mp.clone(); let prometheus_task = async move { - let prometheus_version = args.prometheus_version.trim_start_matches('v'); + let prometheus_version = prometheus_args.prometheus_version.trim_start_matches('v'); info!("Using Prometheus version: {}", prometheus_version); let prometheus_path = - prometheus_local_data.join(format!("prometheus-{}", prometheus_version)); + prometheus_local_data.join(format!("prometheus-{prometheus_version}")); + // Check if prometheus is available if !prometheus_path.exists() { - info!("Downloading Prometheus"); - install_prometheus(&prometheus_path, prometheus_version).await?; + info!("Cached version of Prometheus not found, downloading Prometheus"); + install_prometheus( + &prometheus_path, + prometheus_version, + prometheus_multi_progress, + ) + .await?; debug!("Downloaded Prometheus to: {:?}", &prometheus_path); } else { debug!("Found prometheus in: {}", prometheus_path.display()); @@ -103,22 +121,62 @@ pub async fn handle_command(mut args: Arguments) -> Result<()> { start_prometheus(&prometheus_path, &prometheus_config).await }; + let pushgateway_task = if args.enable_pushgateway { + let pushgateway_args = args.clone(); + let pushgateway_local_data = local_data.clone(); + let pushgateway_multi_progress = mp.clone(); + async move { + let pushgateway_version = pushgateway_args.pushgateway_version.trim_start_matches('v'); + + info!("Using pushgateway version: {}", pushgateway_version); + + let pushgateway_path = + pushgateway_local_data.join(format!("pushgateway-{pushgateway_version}")); + + // Check if pushgateway is available + if !pushgateway_path.exists() { + info!("Cached version of pushgateway not found, downloading pushgateway"); + install_pushgateway( + &pushgateway_path, + pushgateway_version, + pushgateway_multi_progress, + ) + .await?; + info!("Downloaded to: {:?}", &pushgateway_path); + } + + let pushgateway_config = generate_prom_config(pushgateway_args.metrics_endpoints)?; + start_pushgateway(&pushgateway_path, &pushgateway_config).await + } + .boxed() + } else { + async move { anyhow::Ok(()) }.boxed() + }; + // Start web server for hosting the explorer, am api and proxies to the enabled services. let listen_address = args.listen_address; - let web_server_task = async move { start_web_server(&listen_address).await }; + let web_server_task = async move { start_web_server(&listen_address, args).await }; select! { biased; _ = tokio::signal::ctrl_c() => { - bail!("CTRL-C invoked by user, exiting..."); + debug!("sigint received by user, exiting..."); + return Ok(()) } + Err(err) = prometheus_task => { bail!("Prometheus exited with an error: {err:?}"); } + + Err(err) = pushgateway_task => { + bail!("Pushgateway exited with an error: {err:?}"); + } + Err(err) = web_server_task => { bail!("Web server exited with an error: {err:?}"); } + else => { return Ok(()); } @@ -131,16 +189,25 @@ pub async fn handle_command(mut args: Arguments) -> Result<()> { /// archive into. Then it will verify the downloaded archive against the /// downloaded checksum. Finally it will unpack the archive into /// `prometheus_path`. -async fn install_prometheus(prometheus_path: &PathBuf, prometheus_version: &str) -> Result<()> { +async fn install_prometheus( + prometheus_path: &PathBuf, + prometheus_version: &str, + multi_progress: MultiProgress, +) -> Result<()> { let (os, arch) = determine_os_and_arch()?; let package = format!("prometheus-{prometheus_version}.{os}-{arch}.tar.gz"); let mut prometheus_archive = NamedTempFile::new()?; - let calculated_checksum = - download_prometheus(prometheus_archive.as_file(), prometheus_version, &package).await?; + let calculated_checksum = download_prometheus( + prometheus_archive.as_file(), + prometheus_version, + &package, + &multi_progress, + ) + .await?; - verify_checksum(calculated_checksum, prometheus_version, &package).await?; + verify_checksum_prometheus(calculated_checksum, prometheus_version, &package).await?; // Make sure we set the position to the beginning of the file so that we can // unpack it. @@ -150,6 +217,7 @@ async fn install_prometheus(prometheus_path: &PathBuf, prometheus_version: &str) prometheus_archive.as_file(), prometheus_path, prometheus_version, + &multi_progress, ) .await } @@ -160,6 +228,7 @@ async fn download_prometheus( destination: &File, prometheus_version: &str, package: &str, + multi_progress: &MultiProgress, ) -> Result { let mut hasher = Sha256::new(); let mut response = CLIENT @@ -173,12 +242,13 @@ async fn download_prometheus( .ok_or_else(|| anyhow!("didn't receive content length"))?; let mut downloaded = 0; - let pb = ProgressBar::new(total_size); + let pb = multi_progress.add(ProgressBar::new(total_size)); // https://github.com/console-rs/indicatif/blob/HEAD/examples/download.rs#L12 - pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")? + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg} [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")? .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) .progress_chars("=> ")); + pb.set_message("Downloading Prometheus"); let mut buffer = BufWriter::new(destination); @@ -193,6 +263,7 @@ async fn download_prometheus( } pb.finish_and_clear(); + multi_progress.remove(&pb); let checksum = hex::encode(hasher.finalize()); @@ -202,13 +273,15 @@ async fn download_prometheus( /// Verify the checksum of the downloaded Prometheus archive. /// /// This will retrieve the checksum file from the Prometheus GitHub release page. -async fn verify_checksum( +async fn verify_checksum_prometheus( calculated_checksum: String, prometheus_version: &str, package: &str, ) -> Result<()> { let checksums = CLIENT - .get(format!("https://github.com/prometheus/prometheus/releases/download/v{prometheus_version}/sha256sums.txt")) + .get(format!( + "https://github.com/prometheus/prometheus/releases/download/v{prometheus_version}/sha256sums.txt" + )) .send() .await? .error_for_status()? @@ -243,6 +316,7 @@ async fn unpack_prometheus( archive: &File, prometheus_path: &PathBuf, prometheus_version: &str, + multi_progress: &MultiProgress, ) -> Result<()> { let (os, arch) = determine_os_and_arch()?; @@ -252,10 +326,10 @@ async fn unpack_prometheus( // This prefix will be removed from the files in the archive. let prefix = format!("prometheus-{prometheus_version}.{os}-{arch}/"); - let pb = ProgressBar::new_spinner(); + let pb = multi_progress.add(ProgressBar::new_spinner()); pb.set_style(ProgressStyle::default_spinner()); pb.enable_steady_tick(Duration::from_millis(120)); - pb.set_message("Unpacking..."); + pb.set_message("Unpacking Prometheus..."); for entry in ar.entries()? { let mut entry = entry?; @@ -271,6 +345,175 @@ async fn unpack_prometheus( } pb.finish_and_clear(); + multi_progress.remove(&pb); + + Ok(()) +} + +/// Install the specified version of Pushgateway into `pushgateway_path`. +/// +/// This function will first create a temporary file to download the Pushgateway +/// archive into. Then it will verify the downloaded archive against the +/// downloaded checksum. Finally it will unpack the archive into +/// `pushgateway_path`. +async fn install_pushgateway( + pushgateway_path: &PathBuf, + pushgateway_version: &str, + multi_progress: MultiProgress, +) -> Result<()> { + let (os, arch) = determine_os_and_arch()?; + let package = format!("pushgateway-{pushgateway_version}.{os}-{arch}.tar.gz"); + + let mut pushgateway_archive = NamedTempFile::new()?; + + let calculated_checksum = download_pushgateway( + pushgateway_archive.as_file(), + pushgateway_version, + &package, + &multi_progress, + ) + .await?; + + verify_checksum_pushgateway(calculated_checksum, pushgateway_version, &package).await?; + + // Make sure we set the position to the beginning of the file so that we can + // unpack it. + pushgateway_archive.as_file_mut().seek(SeekFrom::Start(0))?; + + unpack_pushgateway( + pushgateway_archive.as_file(), + pushgateway_path, + pushgateway_version, + &multi_progress, + ) + .await +} + +/// Download the specified version of Pushgateway into `destination`. It will +/// also calculate the SHA256 checksum of the downloaded file. +async fn download_pushgateway( + destination: &File, + pushgateway_version: &str, + package: &str, + multi_progress: &MultiProgress, +) -> Result { + let mut hasher = Sha256::new(); + let mut response = CLIENT + .get(format!("https://github.com/prometheus/pushgateway/releases/download/v{pushgateway_version}/{package}")) + .send() + .await? + .error_for_status()?; + + let total_size = response + .content_length() + .ok_or_else(|| anyhow!("didn't receive content length"))?; + let mut downloaded = 0; + + let pb = multi_progress.add(ProgressBar::new(total_size)); + + // https://github.com/console-rs/indicatif/blob/HEAD/examples/download.rs#L12 + pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg} [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")? + .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) + .progress_chars("=> ")); + pb.set_message("Downloading Pushgateway"); + + let mut buffer = BufWriter::new(destination); + + while let Some(ref chunk) = response.chunk().await? { + buffer.write_all(chunk)?; + hasher.update(chunk); + + let new_size = (downloaded + chunk.len() as u64).min(total_size); + downloaded = new_size; + + pb.set_position(downloaded); + } + + pb.finish_and_clear(); + multi_progress.remove(&pb); + + let checksum = hex::encode(hasher.finalize()); + + Ok(checksum) +} + +/// Verify the checksum of the downloaded Prometheus archive. +/// +/// This will retrieve the checksum file from the Prometheus GitHub release page. +async fn verify_checksum_pushgateway( + calculated_checksum: String, + pushgateway_version: &str, + package: &str, +) -> Result<()> { + let checksums = CLIENT + .get(format!( + "https://github.com/prometheus/pushgateway/releases/download/v{pushgateway_version}/sha256sums.txt" + )) + .send() + .await? + .error_for_status()? + .text() + .await?; + + // Go through all the lines in the checksum file and look for the one that + // we need for our current service/version/os/arch. + let expected_checksum = checksums + .lines() + .find_map(|line| match line.split_once(" ") { + Some((checksum, filename)) if package == filename => Some(checksum), + _ => None, + }) + .ok_or_else(|| anyhow!("unable to find checksum for {package} in checksum list"))?; + + if expected_checksum != calculated_checksum { + error!( + ?expected_checksum, + ?calculated_checksum, + "Calculated checksum for downloaded archive did not match expected checksum", + ); + bail!("checksum did not match"); + } + + Ok(()) +} + +/// Unpack the Pushgateway archive into the `pushgateway_path`. This will remove +/// the prefix that is contained in the tar archive. +async fn unpack_pushgateway( + archive: &File, + pushgateway_path: &PathBuf, + pushgateway_version: &str, + multi_progress: &MultiProgress, +) -> Result<()> { + let (os, arch) = determine_os_and_arch()?; + + let tar_file = GzDecoder::new(archive); + let mut ar = tar::Archive::new(tar_file); + + // This prefix will be removed from the files in the archive. + let prefix = format!("pushgateway-{pushgateway_version}.{os}-{arch}/"); + + let pb = multi_progress.add(ProgressBar::new_spinner()); + pb.set_style(ProgressStyle::default_spinner()); + pb.enable_steady_tick(Duration::from_millis(120)); + pb.set_message("Unpacking Pushgateway..."); + + for entry in ar.entries()? { + let mut entry = entry?; + let path = entry.path()?; + + debug!("Unpacking {}", path.display()); + + // Remove the prefix and join it with the base directory. + let path = path.strip_prefix(&prefix)?.to_owned(); + let path = pushgateway_path.join(path); + + entry.unpack(&path)?; + } + + pb.finish_and_clear(); + multi_progress.remove(&pb); + Ok(()) } @@ -342,8 +585,11 @@ fn to_scrape_config(metric_endpoint: &Url) -> prometheus::ScrapeConfig { None => metric_endpoint.host_str().unwrap().to_string(), }; + static COUNTER: AtomicUsize = AtomicUsize::new(0); + let num = COUNTER.fetch_add(1, Ordering::SeqCst); + prometheus::ScrapeConfig { - job_name: "app".to_string(), + job_name: format!("app_{num}"), static_configs: vec![prometheus::StaticScrapeConfig { targets: vec![host], }], @@ -370,7 +616,7 @@ async fn check_endpoint(url: &Url) -> Result<()> { /// Start a prometheus process. This will block until the Prometheus process /// stops. async fn start_prometheus( - prometheus_binary_path: &PathBuf, + prometheus_path: &PathBuf, prometheus_config: &prometheus::Config, ) -> Result<()> { // First write the config to a temp file @@ -395,7 +641,7 @@ async fn start_prometheus( #[cfg(target_os = "windows")] let program = "prometheus.exe"; - let prometheus_path = prometheus_binary_path.join(program); + let prometheus_path = prometheus_path.join(program); debug!("Invoking prometheus at {}", prometheus_path.display()); @@ -416,11 +662,36 @@ async fn start_prometheus( Ok(()) } -async fn start_web_server(listen_address: &SocketAddr) -> Result<()> { - let app = Router::new() +/// Start a prometheus process. This will block until the Prometheus process +/// stops. +async fn start_pushgateway(pushgateway_path: &PathBuf, _: &prometheus::Config) -> Result<()> { + info!("Starting Pushgateway"); + let mut child = process::Command::new(pushgateway_path.join("pushgateway")) + .arg("--web.listen-address=:9091") + .arg("--web.external-url=http://localhost:6789/pushgateway") // TODO: Make sure this matches with that is actually running. + .spawn() + .context("Unable to start Pushgateway")?; + + let status = child.wait().await?; + if !status.success() { + anyhow::bail!("Pushgateway exited with status {}", status) + } + + Ok(()) +} + +async fn start_web_server(listen_address: &SocketAddr, args: Arguments) -> Result<()> { + let mut app = Router::new() // .route("/api/ ... ") // This can expose endpoints that the ui app can call .route("/explorer/*path", get(explorer_handler)) - .route("/prometheus/*path", any(prometheus_handler)); + .route("/prometheus/*path", any(prometheus_handler)) + .route("/prometheus", any(prometheus_handler)); + + if args.enable_pushgateway { + app = app + .route("/pushgateway/*path", any(pushgateway_handler)) + .route("/pushgateway", any(pushgateway_handler)); + } let server = axum::Server::try_bind(listen_address) .with_context(|| format!("failed to bind to {}", listen_address))? @@ -455,19 +726,23 @@ async fn explorer_handler(Path(path): Path) -> impl IntoResponse { } } -async fn prometheus_handler(mut req: http::Request) -> impl IntoResponse { - let path_query = req - .uri() - .path_and_query() - .map(|v| v.as_str()) - .unwrap_or_else(|| req.uri().path()); +async fn prometheus_handler(req: http::Request) -> impl IntoResponse { + let upstream_base = Url::parse("http://localhost:9090").unwrap(); + proxy_handler(req, upstream_base).await +} - // TODO hardcoded for now - let uri = format!("http://127.0.0.1:9090{}", path_query); +async fn pushgateway_handler(req: http::Request) -> impl IntoResponse { + let upstream_base = Url::parse("http://localhost:9091").unwrap(); + proxy_handler(req, upstream_base).await +} - trace!("Proxying request to {}", uri); +async fn proxy_handler(mut req: http::Request, upstream_base: Url) -> impl IntoResponse { + trace!(req_uri=?req.uri(),method=?req.method(),"Proxying request"); - *req.uri_mut() = Uri::try_from(uri).unwrap(); + // NOTE: The username/password is not forwarded + let mut url = upstream_base.join(req.uri().path()).unwrap(); + url.set_query(req.uri().query()); + *req.uri_mut() = Uri::try_from(url.as_str()).unwrap(); let res = CLIENT.execute(req.try_into().unwrap()).await; diff --git a/src/bin/am/commands/system.rs b/src/bin/am/commands/system.rs index 90bc581..d250cec 100644 --- a/src/bin/am/commands/system.rs +++ b/src/bin/am/commands/system.rs @@ -1,5 +1,6 @@ use anyhow::Result; use clap::{Parser, Subcommand}; +use indicatif::MultiProgress; pub mod prune; @@ -16,8 +17,8 @@ pub enum SubCommands { Prune(prune::Arguments), } -pub async fn handle_command(args: Arguments) -> Result<()> { +pub async fn handle_command(args: Arguments, mp: MultiProgress) -> Result<()> { match args.command { - SubCommands::Prune(args) => prune::handle_command(args).await, + SubCommands::Prune(args) => prune::handle_command(args, mp).await, } } diff --git a/src/bin/am/commands/system/prune.rs b/src/bin/am/commands/system/prune.rs index abb9560..756414e 100644 --- a/src/bin/am/commands/system/prune.rs +++ b/src/bin/am/commands/system/prune.rs @@ -2,6 +2,7 @@ use crate::interactive; use anyhow::{bail, Context, Result}; use clap::Parser; use directories::ProjectDirs; +use indicatif::MultiProgress; use std::io; use tracing::{debug, info}; @@ -13,7 +14,7 @@ pub struct Arguments { force: bool, } -pub async fn handle_command(args: Arguments) -> Result<()> { +pub async fn handle_command(args: Arguments, _: MultiProgress) -> Result<()> { // If the users hasn't specified the `force` argument, then ask the user if // they want to continue. if !args.force && !interactive::confirm("Prune all am program files?")? { diff --git a/src/bin/am/interactive.rs b/src/bin/am/interactive.rs index bf7f333..9e804c9 100644 --- a/src/bin/am/interactive.rs +++ b/src/bin/am/interactive.rs @@ -1,15 +1,74 @@ use dialoguer::theme::SimpleTheme; use dialoguer::{Confirm, Input}; -use std::io; +use indicatif::MultiProgress; +use std::io::{stderr, IoSlice, Result, Write}; +use tracing_subscriber::fmt::MakeWriter; -pub fn user_input(prompt: impl Into) -> io::Result { +pub fn user_input(prompt: impl Into) -> Result { Ok(Input::with_theme(&SimpleTheme) .with_prompt(prompt) .interact()?) } -pub fn confirm(prompt: impl Into) -> io::Result { +pub fn confirm(prompt: impl Into) -> Result { Ok(Confirm::with_theme(&SimpleTheme) .with_prompt(prompt) .interact()?) } + +/// A Writer that will suspend any progress bar during calls to the write trait. +/// This will prevent the output from being mangled. +/// +/// The main use case for this is to use it in conjunction with other components +/// that write to stderr, such as the tracing library. If both indicatif and +/// tracing would be using stderr directly, it would result in progress bars +/// being interrupted by other output. +#[derive(Clone)] +pub struct IndicatifWriter { + multi_progress: indicatif::MultiProgress, +} + +impl IndicatifWriter { + /// Create a new IndicatifWriter. Make sure to use the returned + /// MultiProgress when creating any progress bars. + pub fn new() -> (Self, MultiProgress) { + let multi_progress = MultiProgress::new(); + ( + Self { + multi_progress: multi_progress.clone(), + }, + multi_progress, + ) + } +} + +impl Write for IndicatifWriter { + fn write(&mut self, buf: &[u8]) -> Result { + self.multi_progress.suspend(|| stderr().write(buf)) + } + + fn flush(&mut self) -> Result<()> { + self.multi_progress.suspend(|| stderr().flush()) + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> Result { + self.multi_progress + .suspend(|| stderr().write_vectored(bufs)) + } + + fn write_all(&mut self, buf: &[u8]) -> Result<()> { + self.multi_progress.suspend(|| stderr().write_all(buf)) + } + + fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> Result<()> { + self.multi_progress.suspend(|| stderr().write_fmt(fmt)) + } +} + +impl<'a> MakeWriter<'a> for IndicatifWriter { + type Writer = IndicatifWriter; + + fn make_writer(&'a self) -> Self::Writer { + self.clone() + } +} diff --git a/src/bin/am/main.rs b/src/bin/am/main.rs index c4a7d1a..42f2461 100644 --- a/src/bin/am/main.rs +++ b/src/bin/am/main.rs @@ -1,8 +1,9 @@ use anyhow::{Context, Result}; use clap::Parser; use commands::{handle_command, Application}; -use std::io; +use interactive::IndicatifWriter; use tracing::level_filters::LevelFilter; +use tracing::{debug, error}; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry}; @@ -11,32 +12,41 @@ mod commands; mod interactive; #[tokio::main] -async fn main() -> Result<()> { +async fn main() { + let (writer, multi_progress) = IndicatifWriter::new(); + let app = Application::parse(); - if let Err(err) = init_logging() { + if let Err(err) = init_logging(writer) { eprintln!("Unable to initialize logging: {:#}", err); std::process::exit(1); } - handle_command(app).await + let result = handle_command(app, multi_progress).await; + match result { + Ok(_) => debug!("Command completed successfully"), + Err(err) => { + error!("Command failed: {:?}", err); + std::process::exit(1); + } + } } /// Initialize logging for the application. /// -/// Currently, we have a straight forward logging setup that will log everything -/// that is level info and higher to stderr. Users are able to influence this by -/// exporting the `RUST_LOG` environment variable. +/// Currently, we check for the `RUST_LOG` environment variable and use that for +/// logging. If it isn't set or contains a invalid directive, we will show _all_ +/// logs from INFO level. /// /// For example: for local development it is convenient to set the environment /// variable to `RUST_LOG=am=trace,info`. This will display all log messages /// within the `am` module, but will only show info for other modules. -fn init_logging() -> Result<()> { +fn init_logging(writer: IndicatifWriter) -> Result<()> { // The filter layer controls which log levels to display. let filter = EnvFilter::try_from_default_env() .unwrap_or_else(|_| EnvFilter::default().add_directive(LevelFilter::INFO.into())); - let log_layer = tracing_subscriber::fmt::layer().with_writer(io::stderr); + let log_layer = tracing_subscriber::fmt::layer().with_writer(writer); Registry::default() .with(filter) From 03175802e6552c2cd94daffcac5bb2172612bb91 Mon Sep 17 00:00:00 2001 From: Mari Date: Thu, 22 Jun 2023 16:28:21 +0200 Subject: [PATCH 2/4] get rid of first code duplication --- src/bin/am/commands/start.rs | 112 ++++------------------------------- src/bin/am/downloader.rs | 72 ++++++++++++++++++++++ src/bin/am/main.rs | 1 + 3 files changed, 83 insertions(+), 102 deletions(-) create mode 100644 src/bin/am/downloader.rs diff --git a/src/bin/am/commands/start.rs b/src/bin/am/commands/start.rs index 4cbd646..473ca2d 100644 --- a/src/bin/am/commands/start.rs +++ b/src/bin/am/commands/start.rs @@ -1,3 +1,4 @@ +use crate::downloader::download_github_release; use crate::interactive; use anyhow::{anyhow, bail, Context, Result}; use autometrics_am::prometheus; @@ -12,11 +13,10 @@ use flate2::read::GzDecoder; use futures_util::FutureExt; use http::{StatusCode, Uri}; use include_dir::{include_dir, Dir}; -use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use once_cell::sync::Lazy; -use sha2::{Digest, Sha256}; use std::fs::File; -use std::io::{BufWriter, Seek, SeekFrom, Write}; +use std::io::{Seek, SeekFrom}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -29,7 +29,7 @@ use url::Url; // Create a reqwest client that will be used to make HTTP requests. This allows // for keep-alives if we are making multiple requests to the same host. -static CLIENT: Lazy = Lazy::new(|| { +pub static CLIENT: Lazy = Lazy::new(|| { reqwest::Client::builder() .user_agent(concat!("am/", env!("CARGO_PKG_VERSION"))) .connect_timeout(Duration::from_secs(5)) @@ -199,8 +199,10 @@ async fn install_prometheus( let mut prometheus_archive = NamedTempFile::new()?; - let calculated_checksum = download_prometheus( + let calculated_checksum = download_github_release( prometheus_archive.as_file(), + "prometheus", + "prometheus", prometheus_version, &package, &multi_progress, @@ -222,54 +224,6 @@ async fn install_prometheus( .await } -/// Download the specified version of Prometheus into `destination`. It will -/// also calculate the SHA256 checksum of the downloaded file. -async fn download_prometheus( - destination: &File, - prometheus_version: &str, - package: &str, - multi_progress: &MultiProgress, -) -> Result { - let mut hasher = Sha256::new(); - let mut response = CLIENT - .get(format!("https://github.com/prometheus/prometheus/releases/download/v{prometheus_version}/{package}")) - .send() - .await? - .error_for_status()?; - - let total_size = response - .content_length() - .ok_or_else(|| anyhow!("didn't receive content length"))?; - let mut downloaded = 0; - - let pb = multi_progress.add(ProgressBar::new(total_size)); - - // https://github.com/console-rs/indicatif/blob/HEAD/examples/download.rs#L12 - pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg} [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")? - .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) - .progress_chars("=> ")); - pb.set_message("Downloading Prometheus"); - - let mut buffer = BufWriter::new(destination); - - while let Some(ref chunk) = response.chunk().await? { - buffer.write_all(chunk)?; - hasher.update(chunk); - - let new_size = (downloaded + chunk.len() as u64).min(total_size); - downloaded = new_size; - - pb.set_position(downloaded); - } - - pb.finish_and_clear(); - multi_progress.remove(&pb); - - let checksum = hex::encode(hasher.finalize()); - - Ok(checksum) -} - /// Verify the checksum of the downloaded Prometheus archive. /// /// This will retrieve the checksum file from the Prometheus GitHub release page. @@ -366,8 +320,10 @@ async fn install_pushgateway( let mut pushgateway_archive = NamedTempFile::new()?; - let calculated_checksum = download_pushgateway( + let calculated_checksum = download_github_release( pushgateway_archive.as_file(), + "prometheus", + "pushgateway", pushgateway_version, &package, &multi_progress, @@ -389,54 +345,6 @@ async fn install_pushgateway( .await } -/// Download the specified version of Pushgateway into `destination`. It will -/// also calculate the SHA256 checksum of the downloaded file. -async fn download_pushgateway( - destination: &File, - pushgateway_version: &str, - package: &str, - multi_progress: &MultiProgress, -) -> Result { - let mut hasher = Sha256::new(); - let mut response = CLIENT - .get(format!("https://github.com/prometheus/pushgateway/releases/download/v{pushgateway_version}/{package}")) - .send() - .await? - .error_for_status()?; - - let total_size = response - .content_length() - .ok_or_else(|| anyhow!("didn't receive content length"))?; - let mut downloaded = 0; - - let pb = multi_progress.add(ProgressBar::new(total_size)); - - // https://github.com/console-rs/indicatif/blob/HEAD/examples/download.rs#L12 - pb.set_style(ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg} [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")? - .with_key("eta", |state: &ProgressState, w: &mut dyn std::fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) - .progress_chars("=> ")); - pb.set_message("Downloading Pushgateway"); - - let mut buffer = BufWriter::new(destination); - - while let Some(ref chunk) = response.chunk().await? { - buffer.write_all(chunk)?; - hasher.update(chunk); - - let new_size = (downloaded + chunk.len() as u64).min(total_size); - downloaded = new_size; - - pb.set_position(downloaded); - } - - pb.finish_and_clear(); - multi_progress.remove(&pb); - - let checksum = hex::encode(hasher.finalize()); - - Ok(checksum) -} - /// Verify the checksum of the downloaded Prometheus archive. /// /// This will retrieve the checksum file from the Prometheus GitHub release page. diff --git a/src/bin/am/downloader.rs b/src/bin/am/downloader.rs new file mode 100644 index 0000000..fd3bb63 --- /dev/null +++ b/src/bin/am/downloader.rs @@ -0,0 +1,72 @@ +use crate::commands::start::CLIENT; +use anyhow::{anyhow, Result}; +use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; +use sha2::{Digest, Sha256}; +use std::fmt; +use std::fs::File; +use std::io::{BufWriter, Write}; + +/// downloads `package` into `destination`, returning the sha256sum hex-digest of the downloaded file +pub async fn download_github_release( + destination: &File, + org: &str, + repo: &str, + version: &str, + package: &str, + multi_progress: &MultiProgress, +) -> Result { + let mut hasher = Sha256::new(); + let mut response = CLIENT + .get(format!( + "https://github.com/{org}/{repo}/releases/download/v{version}/{package}" + )) + .send() + .await? + .error_for_status()?; + + let total_size = response + .content_length() + .ok_or_else(|| anyhow!("didn't receive content length"))?; + let mut downloaded = 0; + + let pb = multi_progress.add(ProgressBar::new(total_size)); + + // https://github.com/console-rs/indicatif/blob/HEAD/examples/download.rs#L12 + pb.set_style( + ProgressStyle::with_template("{spinner:.green} [{elapsed_precise}] {msg} [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({eta})")? + .with_key("eta", |state: &ProgressState, w: &mut dyn fmt::Write| write!(w, "{:.1}s", state.eta().as_secs_f64()).unwrap()) + .progress_chars("=> ") + ); + + pb.set_message(format!( + "Downloading {package} from github.com/{org}/{repo}" + )); + + let mut buffer = BufWriter::new(destination); + + while let Some(ref chunk) = response.chunk().await? { + buffer.write_all(chunk)?; + hasher.update(chunk); + + let new_size = (downloaded + chunk.len() as u64).min(total_size); + downloaded = new_size; + + pb.set_position(downloaded); + } + + pb.finish_and_clear(); + multi_progress.remove(&pb); + + let checksum = hex::encode(hasher.finalize()); + Ok(checksum) +} + +pub async fn verify_checksum( + _sha256sum: &str, + _org: &str, + _repo: &str, + _version: &str, + _package: &str, +) -> Result<()> { + todo!() +} diff --git a/src/bin/am/main.rs b/src/bin/am/main.rs index 42f2461..46cd6db 100644 --- a/src/bin/am/main.rs +++ b/src/bin/am/main.rs @@ -9,6 +9,7 @@ use tracing_subscriber::util::SubscriberInitExt; use tracing_subscriber::{EnvFilter, Registry}; mod commands; +mod downloader; mod interactive; #[tokio::main] From 468cf03ee88c24759cf248f6286b756649d1f1e1 Mon Sep 17 00:00:00 2001 From: Mari Date: Fri, 23 Jun 2023 16:33:26 +0200 Subject: [PATCH 3/4] de-duplicate sha256sum checker --- src/bin/am/commands/start.rs | 102 +++++++---------------------------- src/bin/am/downloader.rs | 44 ++++++++++++--- 2 files changed, 55 insertions(+), 91 deletions(-) diff --git a/src/bin/am/commands/start.rs b/src/bin/am/commands/start.rs index 473ca2d..ef61e89 100644 --- a/src/bin/am/commands/start.rs +++ b/src/bin/am/commands/start.rs @@ -1,6 +1,6 @@ -use crate::downloader::download_github_release; +use crate::downloader::{download_github_release, verify_checksum}; use crate::interactive; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use autometrics_am::prometheus; use axum::body::{self, Body}; use axum::extract::Path; @@ -209,7 +209,14 @@ async fn install_prometheus( ) .await?; - verify_checksum_prometheus(calculated_checksum, prometheus_version, &package).await?; + verify_checksum( + &calculated_checksum, + "prometheus", + "prometheus", + prometheus_version, + &package, + ) + .await?; // Make sure we set the position to the beginning of the file so that we can // unpack it. @@ -224,46 +231,6 @@ async fn install_prometheus( .await } -/// Verify the checksum of the downloaded Prometheus archive. -/// -/// This will retrieve the checksum file from the Prometheus GitHub release page. -async fn verify_checksum_prometheus( - calculated_checksum: String, - prometheus_version: &str, - package: &str, -) -> Result<()> { - let checksums = CLIENT - .get(format!( - "https://github.com/prometheus/prometheus/releases/download/v{prometheus_version}/sha256sums.txt" - )) - .send() - .await? - .error_for_status()? - .text() - .await?; - - // Go through all the lines in the checksum file and look for the one that - // we need for our current service/version/os/arch. - let expected_checksum = checksums - .lines() - .find_map(|line| match line.split_once(" ") { - Some((checksum, filename)) if package == filename => Some(checksum), - _ => None, - }) - .ok_or_else(|| anyhow!("unable to find checksum for {package} in checksum list"))?; - - if expected_checksum != calculated_checksum { - error!( - ?expected_checksum, - ?calculated_checksum, - "Calculated checksum for downloaded archive did not match expected checksum", - ); - bail!("checksum did not match"); - } - - Ok(()) -} - /// Unpack the Prometheus archive into the `prometheus_path`. This will remove /// the prefix that is contained in the tar archive. async fn unpack_prometheus( @@ -330,7 +297,14 @@ async fn install_pushgateway( ) .await?; - verify_checksum_pushgateway(calculated_checksum, pushgateway_version, &package).await?; + verify_checksum( + &calculated_checksum, + "prometheus", + "pushgateway", + pushgateway_version, + &package, + ) + .await?; // Make sure we set the position to the beginning of the file so that we can // unpack it. @@ -345,46 +319,6 @@ async fn install_pushgateway( .await } -/// Verify the checksum of the downloaded Prometheus archive. -/// -/// This will retrieve the checksum file from the Prometheus GitHub release page. -async fn verify_checksum_pushgateway( - calculated_checksum: String, - pushgateway_version: &str, - package: &str, -) -> Result<()> { - let checksums = CLIENT - .get(format!( - "https://github.com/prometheus/pushgateway/releases/download/v{pushgateway_version}/sha256sums.txt" - )) - .send() - .await? - .error_for_status()? - .text() - .await?; - - // Go through all the lines in the checksum file and look for the one that - // we need for our current service/version/os/arch. - let expected_checksum = checksums - .lines() - .find_map(|line| match line.split_once(" ") { - Some((checksum, filename)) if package == filename => Some(checksum), - _ => None, - }) - .ok_or_else(|| anyhow!("unable to find checksum for {package} in checksum list"))?; - - if expected_checksum != calculated_checksum { - error!( - ?expected_checksum, - ?calculated_checksum, - "Calculated checksum for downloaded archive did not match expected checksum", - ); - bail!("checksum did not match"); - } - - Ok(()) -} - /// Unpack the Pushgateway archive into the `pushgateway_path`. This will remove /// the prefix that is contained in the tar archive. async fn unpack_pushgateway( diff --git a/src/bin/am/downloader.rs b/src/bin/am/downloader.rs index fd3bb63..4361731 100644 --- a/src/bin/am/downloader.rs +++ b/src/bin/am/downloader.rs @@ -1,10 +1,11 @@ use crate::commands::start::CLIENT; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; use sha2::{Digest, Sha256}; use std::fmt; use std::fs::File; use std::io::{BufWriter, Write}; +use tracing::error; /// downloads `package` into `destination`, returning the sha256sum hex-digest of the downloaded file pub async fn download_github_release( @@ -62,11 +63,40 @@ pub async fn download_github_release( } pub async fn verify_checksum( - _sha256sum: &str, - _org: &str, - _repo: &str, - _version: &str, - _package: &str, + sha256sum: &str, + org: &str, + repo: &str, + version: &str, + package: &str, ) -> Result<()> { - todo!() + let checksums = CLIENT + .get(format!( + "https://github.com/{org}/{repo}/releases/download/v{version}/sha256sums.txt" + )) + .send() + .await? + .error_for_status()? + .text() + .await?; + + // Go through all the lines in the checksum file and look for the one that + // we need for our current service/version/os/arch. + let expected_checksum = checksums + .lines() + .find_map(|line| match line.split_once(" ") { + Some((checksum, filename)) if package == filename => Some(checksum), + _ => None, + }) + .ok_or_else(|| anyhow!("unable to find checksum for {package} in checksum list"))?; + + if expected_checksum != sha256sum { + error!( + ?expected_checksum, + calculated_checksum = ?sha256sum, + "Calculated checksum for downloaded archive did not match expected checksum", + ); + bail!("checksum did not match"); + } + + Ok(()) } From d455e40e7589ce68b6699372217d3efe505b1f08 Mon Sep 17 00:00:00 2001 From: Mari Date: Fri, 23 Jun 2023 16:42:11 +0200 Subject: [PATCH 4/4] de-duplicate archive unpacking --- src/bin/am/commands/start.rs | 104 +++++------------------------------ src/bin/am/downloader.rs | 38 ++++++++++++- 2 files changed, 52 insertions(+), 90 deletions(-) diff --git a/src/bin/am/commands/start.rs b/src/bin/am/commands/start.rs index ef61e89..335f5b6 100644 --- a/src/bin/am/commands/start.rs +++ b/src/bin/am/commands/start.rs @@ -1,4 +1,4 @@ -use crate::downloader::{download_github_release, verify_checksum}; +use crate::downloader::{download_github_release, unpack, verify_checksum}; use crate::interactive; use anyhow::{bail, Context, Result}; use autometrics_am::prometheus; @@ -9,11 +9,10 @@ use axum::routing::{any, get}; use axum::Router; use clap::Parser; use directories::ProjectDirs; -use flate2::read::GzDecoder; use futures_util::FutureExt; use http::{StatusCode, Uri}; use include_dir::{include_dir, Dir}; -use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use indicatif::MultiProgress; use once_cell::sync::Lazy; use std::fs::File; use std::io::{Seek, SeekFrom}; @@ -195,7 +194,9 @@ async fn install_prometheus( multi_progress: MultiProgress, ) -> Result<()> { let (os, arch) = determine_os_and_arch()?; - let package = format!("prometheus-{prometheus_version}.{os}-{arch}.tar.gz"); + let base = format!("prometheus-{prometheus_version}.{os}-{arch}"); + let package = format!("{base}.tar.gz"); + let prefix = format!("{base}/"); let mut prometheus_archive = NamedTempFile::new()?; @@ -222,55 +223,16 @@ async fn install_prometheus( // unpack it. prometheus_archive.as_file_mut().seek(SeekFrom::Start(0))?; - unpack_prometheus( + unpack( prometheus_archive.as_file(), + "prometheus", prometheus_path, - prometheus_version, + &prefix, &multi_progress, ) .await } -/// Unpack the Prometheus archive into the `prometheus_path`. This will remove -/// the prefix that is contained in the tar archive. -async fn unpack_prometheus( - archive: &File, - prometheus_path: &PathBuf, - prometheus_version: &str, - multi_progress: &MultiProgress, -) -> Result<()> { - let (os, arch) = determine_os_and_arch()?; - - let tar_file = GzDecoder::new(archive); - let mut ar = tar::Archive::new(tar_file); - - // This prefix will be removed from the files in the archive. - let prefix = format!("prometheus-{prometheus_version}.{os}-{arch}/"); - - let pb = multi_progress.add(ProgressBar::new_spinner()); - pb.set_style(ProgressStyle::default_spinner()); - pb.enable_steady_tick(Duration::from_millis(120)); - pb.set_message("Unpacking Prometheus..."); - - for entry in ar.entries()? { - let mut entry = entry?; - let path = entry.path()?; - - debug!("Unpacking {}", path.display()); - - // Remove the prefix and join it with the base directory. - let path = path.strip_prefix(&prefix)?.to_owned(); - let path = prometheus_path.join(path); - - entry.unpack(&path)?; - } - - pb.finish_and_clear(); - multi_progress.remove(&pb); - - Ok(()) -} - /// Install the specified version of Pushgateway into `pushgateway_path`. /// /// This function will first create a temporary file to download the Pushgateway @@ -283,7 +245,10 @@ async fn install_pushgateway( multi_progress: MultiProgress, ) -> Result<()> { let (os, arch) = determine_os_and_arch()?; - let package = format!("pushgateway-{pushgateway_version}.{os}-{arch}.tar.gz"); + + let base = format!("pushgateway-{pushgateway_version}.{os}-{arch}"); + let package = format!("{base}.tar.gz"); + let prefix = format!("{base}/"); let mut pushgateway_archive = NamedTempFile::new()?; @@ -310,55 +275,16 @@ async fn install_pushgateway( // unpack it. pushgateway_archive.as_file_mut().seek(SeekFrom::Start(0))?; - unpack_pushgateway( + unpack( pushgateway_archive.as_file(), + "pushgateway", pushgateway_path, - pushgateway_version, + &prefix, &multi_progress, ) .await } -/// Unpack the Pushgateway archive into the `pushgateway_path`. This will remove -/// the prefix that is contained in the tar archive. -async fn unpack_pushgateway( - archive: &File, - pushgateway_path: &PathBuf, - pushgateway_version: &str, - multi_progress: &MultiProgress, -) -> Result<()> { - let (os, arch) = determine_os_and_arch()?; - - let tar_file = GzDecoder::new(archive); - let mut ar = tar::Archive::new(tar_file); - - // This prefix will be removed from the files in the archive. - let prefix = format!("pushgateway-{pushgateway_version}.{os}-{arch}/"); - - let pb = multi_progress.add(ProgressBar::new_spinner()); - pb.set_style(ProgressStyle::default_spinner()); - pb.enable_steady_tick(Duration::from_millis(120)); - pb.set_message("Unpacking Pushgateway..."); - - for entry in ar.entries()? { - let mut entry = entry?; - let path = entry.path()?; - - debug!("Unpacking {}", path.display()); - - // Remove the prefix and join it with the base directory. - let path = path.strip_prefix(&prefix)?.to_owned(); - let path = pushgateway_path.join(path); - - entry.unpack(&path)?; - } - - pb.finish_and_clear(); - multi_progress.remove(&pb); - - Ok(()) -} - /// Translates the OS and arch provided by Rust to the convention used by /// Prometheus. fn determine_os_and_arch() -> Result<(&'static str, &'static str)> { diff --git a/src/bin/am/downloader.rs b/src/bin/am/downloader.rs index 4361731..1656ec9 100644 --- a/src/bin/am/downloader.rs +++ b/src/bin/am/downloader.rs @@ -1,11 +1,14 @@ use crate::commands::start::CLIENT; use anyhow::{anyhow, bail, Result}; +use flate2::read::GzDecoder; use indicatif::{MultiProgress, ProgressBar, ProgressState, ProgressStyle}; use sha2::{Digest, Sha256}; use std::fmt; use std::fs::File; use std::io::{BufWriter, Write}; -use tracing::error; +use std::path::PathBuf; +use std::time::Duration; +use tracing::{debug, error}; /// downloads `package` into `destination`, returning the sha256sum hex-digest of the downloaded file pub async fn download_github_release( @@ -100,3 +103,36 @@ pub async fn verify_checksum( Ok(()) } + +pub async fn unpack( + archive: &File, + package: &str, + destination_path: &PathBuf, + prefix: &str, + multi_progress: &MultiProgress, +) -> Result<()> { + let tar_file = GzDecoder::new(archive); + let mut ar = tar::Archive::new(tar_file); + + let pb = multi_progress.add(ProgressBar::new_spinner()); + pb.set_style(ProgressStyle::default_spinner()); + pb.enable_steady_tick(Duration::from_millis(120)); + pb.set_message(format!("Unpacking {package}...")); + + for entry in ar.entries()? { + let mut entry = entry?; + let path = entry.path()?; + + debug!("Unpacking {}", path.display()); + + // Remove the prefix and join it with the base directory. + let path = path.strip_prefix(&prefix)?.to_owned(); + let path = destination_path.join(path); + + entry.unpack(&path)?; + } + + pb.finish_and_clear(); + multi_progress.remove(&pb); + Ok(()) +}