Skip to content

Commit

Permalink
Add retry support to sparse registries
Browse files Browse the repository at this point in the history
  • Loading branch information
arlosi committed Sep 9, 2022
1 parent 9467f81 commit c1da457
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 83 deletions.
9 changes: 5 additions & 4 deletions src/cargo/core/package.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::core::{Dependency, Manifest, PackageId, SourceId, Target};
use crate::core::{SourceMap, Summary, Workspace};
use crate::ops;
use crate::util::config::PackageCacheLock;
use crate::util::errors::{CargoResult, HttpNot200};
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::interning::InternedString;
use crate::util::network::Retry;
use crate::util::{self, internal, Config, Progress, ProgressStyle};
Expand Down Expand Up @@ -868,18 +868,19 @@ impl<'a, 'cfg> Downloads<'a, 'cfg> {
let code = handle.response_code()?;
if code != 200 && code != 0 {
let url = handle.effective_url()?.unwrap_or(url);
return Err(HttpNot200 {
return Err(HttpNotSuccessful {
code,
url: url.to_string(),
body: data,
}
.into());
}
Ok(())
Ok(data)
})
.with_context(|| format!("failed to download from `{}`", dl.url))?
};
match ret {
Some(()) => break (dl, data),
Some(data) => break (dl, data),
None => {
self.pending_ids.insert(dl.id);
self.enqueue(dl, handle)?
Expand Down
14 changes: 1 addition & 13 deletions src/cargo/ops/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::sources::{RegistrySource, SourceConfigMap, CRATES_IO_DOMAIN, CRATES_I
use crate::util::config::{self, Config, SslVersionConfig, SslVersionConfigRange};
use crate::util::errors::CargoResult;
use crate::util::important_paths::find_root_manifest_for_wd;
use crate::util::IntoUrl;
use crate::util::{truncate_with_ellipsis, IntoUrl};
use crate::{drop_print, drop_println, version};

mod auth;
Expand Down Expand Up @@ -963,18 +963,6 @@ pub fn search(
limit: u32,
reg: Option<String>,
) -> CargoResult<()> {
fn truncate_with_ellipsis(s: &str, max_width: usize) -> String {
// We should truncate at grapheme-boundary and compute character-widths,
// yet the dependencies on unicode-segmentation and unicode-width are
// not worth it.
let mut chars = s.chars();
let mut prefix = (&mut chars).take(max_width - 1).collect::<String>();
if chars.next().is_some() {
prefix.push('…');
}
prefix
}

let (mut registry, _, source_id) =
registry(config, None, index.as_deref(), reg.as_deref(), false, false)?;
let (crates, total_crates) = registry.search(query, limit).with_context(|| {
Expand Down
125 changes: 69 additions & 56 deletions src/cargo/sources/registry/http_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use crate::ops;
use crate::sources::registry::download;
use crate::sources::registry::MaybeLock;
use crate::sources::registry::{LoadResponse, RegistryConfig, RegistryData};
use crate::util::errors::CargoResult;
use crate::util::{Config, Filesystem, IntoUrl, Progress, ProgressStyle};
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::network::Retry;
use crate::util::{internal, Config, Filesystem, IntoUrl, Progress, ProgressStyle};
use anyhow::Context;
use cargo_util::paths;
use curl::easy::{HttpVersion, List};
Expand Down Expand Up @@ -83,15 +84,12 @@ pub struct Downloads<'cfg> {
/// When a download is started, it is added to this map. The key is a
/// "token" (see `Download::token`). It is removed once the download is
/// finished.
pending: HashMap<usize, (Download, EasyHandle)>,
/// Set of paths currently being downloaded, mapped to their tokens.
pending: HashMap<usize, (Download<'cfg>, EasyHandle)>,
/// Set of paths currently being downloaded.
/// This should stay in sync with `pending`.
pending_ids: HashMap<PathBuf, usize>,
/// The final result of each download. A pair `(token, result)`. This is a
/// temporary holding area, needed because curl can report multiple
/// downloads at once, but the main loop (`wait`) is written to only
/// handle one at a time.
results: HashMap<PathBuf, Result<CompletedDownload, curl::Error>>,
pending_ids: HashSet<PathBuf>,
/// The final result of each download.
results: HashMap<PathBuf, CargoResult<CompletedDownload>>,
/// The next ID to use for creating a token (see `Download::token`).
next: usize,
/// Progress bar.
Expand All @@ -100,7 +98,7 @@ pub struct Downloads<'cfg> {
downloads_finished: usize,
}

struct Download {
struct Download<'cfg> {
/// The token for this download, used as the key of the `Downloads::pending` map
/// and stored in `EasyHandle` as well.
token: usize,
Expand All @@ -117,6 +115,9 @@ struct Download {
/// Statistics updated from the progress callback in libcurl.
total: Cell<u64>,
current: Cell<u64>,

/// Logic used to track retrying this download if it's a spurious failure.
retry: Retry<'cfg>,
}

struct CompletedDownload {
Expand Down Expand Up @@ -155,7 +156,7 @@ impl<'cfg> HttpRegistry<'cfg> {
downloads: Downloads {
next: 0,
pending: HashMap::new(),
pending_ids: HashMap::new(),
pending_ids: HashSet::new(),
results: HashMap::new(),
progress: RefCell::new(Some(Progress::with_style(
"Fetch",
Expand Down Expand Up @@ -217,37 +218,60 @@ impl<'cfg> HttpRegistry<'cfg> {
);

// Collect the results from the Multi handle.
let pending = &mut self.downloads.pending;
self.multi.messages(|msg| {
let token = msg.token().expect("failed to read token");
let (_, handle) = &pending[&token];
let result = match msg.result_for(handle) {
Some(result) => result,
None => return, // transfer is not yet complete.
};

let (download, mut handle) = pending.remove(&token).unwrap();
self.downloads.pending_ids.remove(&download.path).unwrap();

let result = match result {
Ok(()) => {
self.downloads.downloads_finished += 1;
match handle.response_code() {
Ok(code) => Ok(CompletedDownload {
response_code: code,
data: download.data.take(),
index_version: download
.index_version
.take()
.unwrap_or_else(|| UNKNOWN.to_string()),
}),
Err(e) => Err(e),
let results = {
let mut results = Vec::new();
let pending = &mut self.downloads.pending;
self.multi.messages(|msg| {
let token = msg.token().expect("failed to read token");
let (_, handle) = &pending[&token];
if let Some(result) = msg.result_for(handle) {
results.push((token, result));
};
});
results
};
for (token, result) in results {
let (mut download, handle) = self.downloads.pending.remove(&token).unwrap();
let mut handle = self.multi.remove(handle)?;
let data = download.data.take();
let url = self.full_url(&download.path);
let result = match download.retry.r#try(|| {
result.with_context(|| format!("failed to download from `{}`", url))?;
let code = handle.response_code()?;
// Keep this list of expected status codes in sync with the codes handled in `load`
if !matches!(code, 200 | 304 | 401 | 404 | 451) {
let url = handle.effective_url()?.unwrap_or(&url);
return Err(HttpNotSuccessful {
code,
url: url.to_owned(),
body: data,
}
.into());
}
Ok(data)
}) {
Ok(Some(data)) => Ok(CompletedDownload {
response_code: handle.response_code()?,
data,
index_version: download
.index_version
.take()
.unwrap_or_else(|| UNKNOWN.to_string()),
}),
Ok(None) => {
// retry the operation
let handle = self.multi.add(handle)?;
self.downloads.pending.insert(token, (download, handle));
continue;
}
Err(e) => Err(e),
};

assert!(self.downloads.pending_ids.remove(&download.path));
self.downloads.results.insert(download.path, result);
});
self.downloads.downloads_finished += 1;
}

self.downloads.tick()?;

Ok(())
Expand Down Expand Up @@ -339,6 +363,8 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
debug!("downloaded the index file `{}` twice", path.display())
}

// The status handled here need to be kept in sync with the codes handled
// in `handle_completed_downloads`
match result.response_code {
200 => {}
304 => {
Expand All @@ -355,13 +381,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
return Poll::Ready(Ok(LoadResponse::NotFound));
}
code => {
return Err(anyhow::anyhow!(
"server returned unexpected HTTP status code {} for {}\nbody: {}",
code,
self.full_url(path),
str::from_utf8(&result.data).unwrap_or("<invalid utf8>"),
))
.into();
return Err(internal(format!("unexpected HTTP status code {code}"))).into();
}
}

Expand All @@ -371,13 +391,6 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
}));
}

if self.config.offline() {
return Poll::Ready(Err(anyhow::anyhow!(
"can't download index file from '{}': you are in offline mode (--offline)",
self.url
)));
}

// Looks like we're going to have to do a network request.
self.start_fetch()?;

Expand Down Expand Up @@ -433,9 +446,8 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
let token = self.downloads.next;
self.downloads.next += 1;
debug!("downloading {} as {}", path.display(), token);
assert_eq!(
self.downloads.pending_ids.insert(path.to_path_buf(), token),
None,
assert!(
self.downloads.pending_ids.insert(path.to_path_buf()),
"path queued for download more than once"
);

Expand Down Expand Up @@ -496,6 +508,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
index_version: RefCell::new(None),
total: Cell::new(0),
current: Cell::new(0),
retry: Retry::new(self.config)?,
};

// Finally add the request we've lined up to the pool of requests that cURL manages.
Expand Down Expand Up @@ -613,7 +626,7 @@ impl<'cfg> RegistryData for HttpRegistry<'cfg> {
let timeout = self
.multi
.get_timeout()?
.unwrap_or_else(|| Duration::new(5, 0));
.unwrap_or_else(|| Duration::new(1, 0));
self.multi
.wait(&mut [], timeout)
.with_context(|| "failed to wait on curl `Multi`")?;
Expand Down
15 changes: 11 additions & 4 deletions src/cargo/util/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,32 @@ use anyhow::Error;
use std::fmt;
use std::path::PathBuf;

use super::truncate_with_ellipsis;

pub type CargoResult<T> = anyhow::Result<T>;

#[derive(Debug)]
pub struct HttpNot200 {
pub struct HttpNotSuccessful {
pub code: u32,
pub url: String,
pub body: Vec<u8>,
}

impl fmt::Display for HttpNot200 {
impl fmt::Display for HttpNotSuccessful {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let body = std::str::from_utf8(&self.body)
.map(|s| truncate_with_ellipsis(s, 512))
.unwrap_or_else(|_| format!("[{} non-utf8 bytes]", self.body.len()));

write!(
f,
"failed to get 200 response from `{}`, got {}",
"failed to get successful HTTP response from `{}`, got {}\nbody:\n{body}",
self.url, self.code
)
}
}

impl std::error::Error for HttpNot200 {}
impl std::error::Error for HttpNotSuccessful {}

// =============================================================================
// Verbose error
Expand Down
12 changes: 12 additions & 0 deletions src/cargo/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,15 @@ pub fn indented_lines(text: &str) -> String {
})
.collect()
}

pub fn truncate_with_ellipsis(s: &str, max_width: usize) -> String {
// We should truncate at grapheme-boundary and compute character-widths,
// yet the dependencies on unicode-segmentation and unicode-width are
// not worth it.
let mut chars = s.chars();
let mut prefix = (&mut chars).take(max_width - 1).collect::<String>();
if chars.next().is_some() {
prefix.push('…');
}
prefix
}
17 changes: 11 additions & 6 deletions src/cargo/util/network.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Error;

use crate::util::errors::{CargoResult, HttpNot200};
use crate::util::errors::{CargoResult, HttpNotSuccessful};
use crate::util::Config;
use std::task::Poll;

Expand Down Expand Up @@ -31,6 +31,7 @@ impl<'a> Retry<'a> {
})
}

/// Returns `Ok(None)` for operations that should be re-tried.
pub fn r#try<T>(&mut self, f: impl FnOnce() -> CargoResult<T>) -> CargoResult<Option<T>> {
match f() {
Err(ref e) if maybe_spurious(e) && self.remaining > 0 => {
Expand Down Expand Up @@ -73,7 +74,7 @@ fn maybe_spurious(err: &Error) -> bool {
return true;
}
}
if let Some(not_200) = err.downcast_ref::<HttpNot200>() {
if let Some(not_200) = err.downcast_ref::<HttpNotSuccessful>() {
if 500 <= not_200.code && not_200.code < 600 {
return true;
}
Expand Down Expand Up @@ -114,14 +115,16 @@ fn with_retry_repeats_the_call_then_works() {
use crate::core::Shell;

//Error HTTP codes (5xx) are considered maybe_spurious and will prompt retry
let error1 = HttpNot200 {
let error1 = HttpNotSuccessful {
code: 501,
url: "Uri".to_string(),
body: Vec::new(),
}
.into();
let error2 = HttpNot200 {
let error2 = HttpNotSuccessful {
code: 502,
url: "Uri".to_string(),
body: Vec::new(),
}
.into();
let mut results: Vec<CargoResult<()>> = vec![Ok(()), Err(error1), Err(error2)];
Expand All @@ -137,14 +140,16 @@ fn with_retry_finds_nested_spurious_errors() {

//Error HTTP codes (5xx) are considered maybe_spurious and will prompt retry
//String error messages are not considered spurious
let error1 = anyhow::Error::from(HttpNot200 {
let error1 = anyhow::Error::from(HttpNotSuccessful {
code: 501,
url: "Uri".to_string(),
body: Vec::new(),
});
let error1 = anyhow::Error::from(error1.context("A non-spurious wrapping err"));
let error2 = anyhow::Error::from(HttpNot200 {
let error2 = anyhow::Error::from(HttpNotSuccessful {
code: 502,
url: "Uri".to_string(),
body: Vec::new(),
});
let error2 = anyhow::Error::from(error2.context("A second chained error"));
let mut results: Vec<CargoResult<()>> = vec![Ok(()), Err(error1), Err(error2)];
Expand Down

0 comments on commit c1da457

Please sign in to comment.