Skip to content

Commit

Permalink
feat: add Extensions to object store PutMultipartOpts (#7214)
Browse files Browse the repository at this point in the history
  • Loading branch information
crepererum authored Feb 27, 2025
1 parent 9384224 commit b90a261
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 6 deletions.
11 changes: 9 additions & 2 deletions object_store/src/aws/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,12 @@ impl S3Client {
location: &Path,
opts: PutMultipartOpts,
) -> Result<MultipartId> {
let PutMultipartOpts {
tags,
attributes,
extensions,
} = opts;

let mut request = self.request(Method::POST, location);
if let Some(algorithm) = self.config.checksum {
match algorithm {
Expand All @@ -644,8 +650,9 @@ impl S3Client {
let response = request
.query(&[("uploads", "")])
.with_encryption_headers()
.with_attributes(opts.attributes)
.with_tags(opts.tags)
.with_attributes(attributes)
.with_tags(tags)
.with_extensions(extensions)
.idempotent(true)
.send()
.await?
Expand Down
11 changes: 9 additions & 2 deletions object_store/src/azure/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,12 @@ impl AzureClient {
parts: Vec<PartId>,
opts: PutMultipartOpts,
) -> Result<PutResult> {
let PutMultipartOpts {
tags,
attributes,
extensions,
} = opts;

let blocks = parts
.into_iter()
.map(|part| BlockId::from(part.content_id))
Expand All @@ -607,8 +613,9 @@ impl AzureClient {
let payload = BlockList { blocks }.to_xml().into();
let response = self
.put_request(path, payload)
.with_attributes(opts.attributes)
.with_tags(opts.tags)
.with_attributes(attributes)
.with_tags(tags)
.with_extensions(extensions)
.query(&[("comp", "blocklist")])
.idempotent(true)
.send()
Expand Down
17 changes: 17 additions & 0 deletions object_store/src/buffered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ pub struct BufWriter {
max_concurrency: usize,
attributes: Option<Attributes>,
tags: Option<TagSet>,
extensions: Option<::http::Extensions>,
state: BufWriterState,
store: Arc<dyn ObjectStore>,
}
Expand Down Expand Up @@ -259,6 +260,7 @@ impl BufWriter {
max_concurrency: 8,
attributes: None,
tags: None,
extensions: None,
state: BufWriterState::Buffer(path, PutPayloadMut::new()),
}
}
Expand Down Expand Up @@ -289,6 +291,19 @@ impl BufWriter {
}
}

/// Set the extensions of the uploaded object
///
/// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
/// that need to pass context-specific information (like tracing spans) via trait methods.
///
/// These extensions are ignored entirely by backends offered through this crate.
pub fn with_extensions(self, extensions: ::http::Extensions) -> Self {
Self {
extensions: Some(extensions),
..self
}
}

/// Write data to the writer in [`Bytes`].
///
/// Unlike [`AsyncWrite::poll_write`], `put` can write data without extra copying.
Expand Down Expand Up @@ -325,6 +340,7 @@ impl BufWriter {
let opts = PutMultipartOpts {
attributes: self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
extensions: self.extensions.take().unwrap_or_default(),
};
let upload = self.store.put_multipart_opts(&path, opts).await?;
let mut chunked =
Expand Down Expand Up @@ -384,6 +400,7 @@ impl AsyncWrite for BufWriter {
let opts = PutMultipartOpts {
attributes: self.attributes.take().unwrap_or_default(),
tags: self.tags.take().unwrap_or_default(),
extensions: self.extensions.take().unwrap_or_default(),
};
let store = Arc::clone(&self.store);
self.state = BufWriterState::Prepare(Box::pin(async move {
Expand Down
10 changes: 9 additions & 1 deletion object_store/src/gcp/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,9 +453,17 @@ impl GoogleCloudStorageClient {
path: &Path,
opts: PutMultipartOpts,
) -> Result<MultipartId> {
let PutMultipartOpts {
// not supported by GCP
tags: _,
attributes,
extensions,
} = opts;

let response = self
.request(Method::POST, path)
.with_attributes(opts.attributes)
.with_attributes(attributes)
.with_extensions(extensions)
.header(&CONTENT_LENGTH, "0")
.query(&[("uploads", "")])
.send()
Expand Down
27 changes: 26 additions & 1 deletion object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,7 @@ impl From<Attributes> for PutOptions {
}

/// Options for [`ObjectStore::put_multipart_opts`]
#[derive(Debug, Clone, PartialEq, Eq, Default)]
#[derive(Debug, Clone, Default)]
pub struct PutMultipartOpts {
/// Provide a [`TagSet`] for this object
///
Expand All @@ -1233,8 +1233,33 @@ pub struct PutMultipartOpts {
///
/// Implementations that don't support an attribute should return an error
pub attributes: Attributes,
/// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations
/// that need to pass context-specific information (like tracing spans) via trait methods.
///
/// These extensions are ignored entirely by backends offered through this crate.
///
/// They are also eclused from [`PartialEq`] and [`Eq`].
pub extensions: ::http::Extensions,
}

impl PartialEq<Self> for PutMultipartOpts {
fn eq(&self, other: &Self) -> bool {
let Self {
tags,
attributes,
extensions: _,
} = self;
let Self {
tags: other_tags,
attributes: other_attributes,
extensions: _,
} = other;
(tags == other_tags) && (attributes == other_attributes)
}
}

impl Eq for PutMultipartOpts {}

impl From<TagSet> for PutMultipartOpts {
fn from(tags: TagSet) -> Self {
Self {
Expand Down

0 comments on commit b90a261

Please sign in to comment.