From 90d9786b78496d6f0c4f909520e64c7fa645cdda Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Fri, 21 Feb 2025 13:04:58 +0100 Subject: [PATCH 1/3] feat: add `Extensions` to object store `GetOptions` Closes #7155. --- object_store/src/extensions.rs | 243 +++++++++++++++++++++++++++++++++ object_store/src/lib.rs | 6 + 2 files changed, 249 insertions(+) create mode 100644 object_store/src/extensions.rs diff --git a/object_store/src/extensions.rs b/object_store/src/extensions.rs new file mode 100644 index 000000000000..c564082ab980 --- /dev/null +++ b/object_store/src/extensions.rs @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Implementation of [`Extensions`]. +use std::{ + any::{Any, TypeId}, + collections::HashMap, + fmt::Debug, +}; + +/// Holds opaque extensions. +#[derive(Default)] +pub struct Extensions { + inner: HashMap>, +} + +impl Extensions { + /// Create new, empty extensions collection. + pub fn new() -> Self { + Self::default() + } + + /// Set extensions by type. + /// + /// Returns existing extension if there was one. + pub fn set(&mut self, t: T) -> Option + where + T: Clone + Debug + Send + Sync + 'static, + { + let ext = ExtensionImpl { inner: t }; + let existing = self.inner.insert(TypeId::of::(), Box::new(ext)); + existing.map(|ext| *ext.to_any().downcast::().expect("type ID is correct")) + } + + /// Get immutable reference to extension by type. + pub fn get(&self) -> Option<&T> + where + T: Clone + Debug + Send + Sync + 'static, + { + self.inner.get(&TypeId::of::()).map(|ext| { + ext.as_any() + .downcast_ref::() + .expect("type ID is correct") + }) + } + + /// Get mutable reference to extension by type. + pub fn get_mut(&mut self) -> Option<&mut T> + where + T: Clone + Debug + Send + Sync + 'static, + { + self.inner.get_mut(&TypeId::of::()).map(|ext| { + ext.as_any_mut() + .downcast_mut::() + .expect("type ID is correct") + }) + } + + /// Remove extension by type. + pub fn remove(&mut self) -> Option + where + T: Clone + Debug + Send + Sync + 'static, + { + let existing = self.inner.remove(&TypeId::of::()); + existing.map(|ext| *ext.to_any().downcast::().expect("type ID is correct")) + } +} + +impl Debug for Extensions { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_set().entries(self.inner.values()).finish() + } +} + +impl Clone for Extensions { + fn clone(&self) -> Self { + Self { + inner: self + .inner + .iter() + .map(|(ty_id, ext)| (*ty_id, ext.as_ref().clone())) + .collect(), + } + } +} + +/// Helper trait to capture relevant trait bounds for extensions into a vtable. +trait Extension: Debug + Send + Sync + 'static { + /// Dyn-compatible [`Clone`]. + fn clone(&self) -> Box; + + /// Converts to boxed [`Any`]. + fn to_any(self: Box) -> Box; + + /// Converts to [`Any`] reference. + fn as_any(&self) -> &dyn Any; + + /// Converts to [`Any`] mutable reference. + fn as_any_mut(&mut self) -> &mut dyn Any; +} + +/// Our one-and-only implementation of [`Extension`]. +struct ExtensionImpl +where + T: Clone + Debug + Send + Sync + 'static, +{ + inner: T, +} + +impl Debug for ExtensionImpl +where + T: Clone + Debug + Send + Sync + 'static, +{ + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.inner.fmt(f) + } +} + +impl Extension for ExtensionImpl +where + T: Clone + Debug + Send + Sync + 'static, +{ + fn clone(&self) -> Box { + Box::new(Self { + inner: self.inner.clone(), + }) + } + + fn to_any(self: Box) -> Box { + let this = *self; + Box::new(this.inner) as _ + } + + fn as_any(&self) -> &dyn Any { + &self.inner + } + + fn as_any_mut(&mut self) -> &mut dyn Any { + &mut self.inner + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_extensions_traits() { + let ext = Extensions::default(); + assert_send(&ext); + assert_sync(&ext); + } + + #[test] + fn test_debug() { + let mut ext = Extensions::default(); + ext.set(String::from("foo")); + ext.set(1u8); + + let dbg_str = format!("{ext:?}"); + + // order is NOT deterministic + let variant_a = r#"{"foo", 1}"#; + let variant_b = r#"{1, "foo"}"#; + assert!( + (dbg_str == variant_a) || (dbg_str == variant_b), + "'{dbg_str}' is neither '{variant_a}' nor '{variant_b}'", + ); + } + + #[test] + fn test_get_set_remove() { + let mut ext = Extensions::default(); + assert_eq!(ext.get::(), None); + assert_eq!(ext.get::(), None); + assert_eq!(ext.get_mut::(), None); + assert_eq!(ext.get_mut::(), None); + + assert_eq!(ext.set(String::from("foo")), None); + assert_eq!(ext.get::(), Some(&String::from("foo"))); + assert_eq!(ext.get::(), None); + assert_eq!(ext.get_mut::(), Some(&mut String::from("foo"))); + assert_eq!(ext.get_mut::(), None); + + assert_eq!(ext.set(1u8), None); + assert_eq!(ext.get::(), Some(&String::from("foo"))); + assert_eq!(ext.get::(), Some(&1u8)); + assert_eq!(ext.get_mut::(), Some(&mut String::from("foo"))); + assert_eq!(ext.get_mut::(), Some(&mut 1u8)); + + assert_eq!(ext.set(String::from("bar")), Some(String::from("foo"))); + assert_eq!(ext.get::(), Some(&String::from("bar"))); + assert_eq!(ext.get::(), Some(&1u8)); + assert_eq!(ext.get_mut::(), Some(&mut String::from("bar"))); + assert_eq!(ext.get_mut::(), Some(&mut 1u8)); + + ext.get_mut::().unwrap().push_str("baz"); + assert_eq!(ext.get::(), Some(&String::from("barbaz"))); + assert_eq!(ext.get::(), Some(&1u8)); + assert_eq!(ext.get_mut::(), Some(&mut String::from("barbaz"))); + assert_eq!(ext.get_mut::(), Some(&mut 1u8)); + + assert_eq!(ext.remove::(), Some(String::from("barbaz"))); + assert_eq!(ext.get::(), None); + assert_eq!(ext.get::(), Some(&1u8)); + assert_eq!(ext.get_mut::(), None); + assert_eq!(ext.get_mut::(), Some(&mut 1u8)); + assert_eq!(ext.remove::(), None); + } + + #[test] + fn test_clone() { + let mut ext = Extensions::default(); + ext.set(String::from("foo")); + + let ext2 = ext.clone(); + + ext.get_mut::().unwrap().push_str("bar"); + ext.set(1u8); + + assert_eq!(ext.get::(), Some(&String::from("foobar"))); + assert_eq!(ext.get::(), Some(&1)); + assert_eq!(ext2.get::(), Some(&String::from("foo"))); + assert_eq!(ext2.get::(), None); + } + + fn assert_send(_o: &T) {} + fn assert_sync(_o: &T) {} +} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 58f757b2972a..54a5f94e8782 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -511,6 +511,7 @@ pub mod buffered; #[cfg(not(target_arch = "wasm32"))] pub mod chunked; pub mod delimited; +pub mod extensions; #[cfg(feature = "gcp")] pub mod gcp; #[cfg(feature = "http")] @@ -967,6 +968,11 @@ pub struct GetOptions { /// /// pub head: bool, + /// Implementation-specific extensions. Intended for use by [`ObjectStore`] implementations + /// that need to pass context-specific information (like tracing spans) via trait methods. + /// + /// These extensions are ignored entirely by backends offered through this crate. + pub extensions: extensions::Extensions, } impl GetOptions { From 0915500a0040f8b31a08245b137bfd2e41d81dc3 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 27 Feb 2025 13:32:06 +0100 Subject: [PATCH 2/3] refactor: replace own `Extensions` by `http` version --- object_store/Cargo.toml | 4 +- object_store/src/extensions.rs | 243 --------------------------------- object_store/src/lib.rs | 3 +- 3 files changed, 3 insertions(+), 247 deletions(-) delete mode 100644 object_store/src/extensions.rs diff --git a/object_store/Cargo.toml b/object_store/Cargo.toml index 7e5124583446..0372514dba7d 100644 --- a/object_store/Cargo.toml +++ b/object_store/Cargo.toml @@ -34,6 +34,7 @@ async-trait = "0.1.53" bytes = "1.0" chrono = { version = "0.4.34", default-features = false, features = ["clock"] } futures = "0.3" +http = "1.2.0" humantime = "2.1" itertools = "0.14.0" parking_lot = { version = "0.12" } @@ -46,7 +47,6 @@ walkdir = { version = "2", optional = true } # Cloud storage support base64 = { version = "0.22", default-features = false, features = ["std"], optional = true } form_urlencoded = { version = "1.2", optional = true } -http = { version = "1.2.0", optional = true } http-body-util = { version = "0.1", optional = true } httparse = { version = "1.8.0", default-features = false, features = ["std"], optional = true } hyper = { version = "1.2", default-features = false, optional = true } @@ -66,7 +66,7 @@ nix = { version = "0.29.0", features = ["fs"] } [features] default = ["fs"] -cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "dep:http", "http-body-util", "form_urlencoded", "serde_urlencoded"] +cloud = ["serde", "serde_json", "quick-xml", "hyper", "reqwest", "reqwest/stream", "chrono/serde", "base64", "rand", "ring", "http-body-util", "form_urlencoded", "serde_urlencoded"] azure = ["cloud", "httparse"] fs = ["walkdir"] gcp = ["cloud", "rustls-pemfile"] diff --git a/object_store/src/extensions.rs b/object_store/src/extensions.rs deleted file mode 100644 index c564082ab980..000000000000 --- a/object_store/src/extensions.rs +++ /dev/null @@ -1,243 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Implementation of [`Extensions`]. -use std::{ - any::{Any, TypeId}, - collections::HashMap, - fmt::Debug, -}; - -/// Holds opaque extensions. -#[derive(Default)] -pub struct Extensions { - inner: HashMap>, -} - -impl Extensions { - /// Create new, empty extensions collection. - pub fn new() -> Self { - Self::default() - } - - /// Set extensions by type. - /// - /// Returns existing extension if there was one. - pub fn set(&mut self, t: T) -> Option - where - T: Clone + Debug + Send + Sync + 'static, - { - let ext = ExtensionImpl { inner: t }; - let existing = self.inner.insert(TypeId::of::(), Box::new(ext)); - existing.map(|ext| *ext.to_any().downcast::().expect("type ID is correct")) - } - - /// Get immutable reference to extension by type. - pub fn get(&self) -> Option<&T> - where - T: Clone + Debug + Send + Sync + 'static, - { - self.inner.get(&TypeId::of::()).map(|ext| { - ext.as_any() - .downcast_ref::() - .expect("type ID is correct") - }) - } - - /// Get mutable reference to extension by type. - pub fn get_mut(&mut self) -> Option<&mut T> - where - T: Clone + Debug + Send + Sync + 'static, - { - self.inner.get_mut(&TypeId::of::()).map(|ext| { - ext.as_any_mut() - .downcast_mut::() - .expect("type ID is correct") - }) - } - - /// Remove extension by type. - pub fn remove(&mut self) -> Option - where - T: Clone + Debug + Send + Sync + 'static, - { - let existing = self.inner.remove(&TypeId::of::()); - existing.map(|ext| *ext.to_any().downcast::().expect("type ID is correct")) - } -} - -impl Debug for Extensions { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_set().entries(self.inner.values()).finish() - } -} - -impl Clone for Extensions { - fn clone(&self) -> Self { - Self { - inner: self - .inner - .iter() - .map(|(ty_id, ext)| (*ty_id, ext.as_ref().clone())) - .collect(), - } - } -} - -/// Helper trait to capture relevant trait bounds for extensions into a vtable. -trait Extension: Debug + Send + Sync + 'static { - /// Dyn-compatible [`Clone`]. - fn clone(&self) -> Box; - - /// Converts to boxed [`Any`]. - fn to_any(self: Box) -> Box; - - /// Converts to [`Any`] reference. - fn as_any(&self) -> &dyn Any; - - /// Converts to [`Any`] mutable reference. - fn as_any_mut(&mut self) -> &mut dyn Any; -} - -/// Our one-and-only implementation of [`Extension`]. -struct ExtensionImpl -where - T: Clone + Debug + Send + Sync + 'static, -{ - inner: T, -} - -impl Debug for ExtensionImpl -where - T: Clone + Debug + Send + Sync + 'static, -{ - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - self.inner.fmt(f) - } -} - -impl Extension for ExtensionImpl -where - T: Clone + Debug + Send + Sync + 'static, -{ - fn clone(&self) -> Box { - Box::new(Self { - inner: self.inner.clone(), - }) - } - - fn to_any(self: Box) -> Box { - let this = *self; - Box::new(this.inner) as _ - } - - fn as_any(&self) -> &dyn Any { - &self.inner - } - - fn as_any_mut(&mut self) -> &mut dyn Any { - &mut self.inner - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn test_extensions_traits() { - let ext = Extensions::default(); - assert_send(&ext); - assert_sync(&ext); - } - - #[test] - fn test_debug() { - let mut ext = Extensions::default(); - ext.set(String::from("foo")); - ext.set(1u8); - - let dbg_str = format!("{ext:?}"); - - // order is NOT deterministic - let variant_a = r#"{"foo", 1}"#; - let variant_b = r#"{1, "foo"}"#; - assert!( - (dbg_str == variant_a) || (dbg_str == variant_b), - "'{dbg_str}' is neither '{variant_a}' nor '{variant_b}'", - ); - } - - #[test] - fn test_get_set_remove() { - let mut ext = Extensions::default(); - assert_eq!(ext.get::(), None); - assert_eq!(ext.get::(), None); - assert_eq!(ext.get_mut::(), None); - assert_eq!(ext.get_mut::(), None); - - assert_eq!(ext.set(String::from("foo")), None); - assert_eq!(ext.get::(), Some(&String::from("foo"))); - assert_eq!(ext.get::(), None); - assert_eq!(ext.get_mut::(), Some(&mut String::from("foo"))); - assert_eq!(ext.get_mut::(), None); - - assert_eq!(ext.set(1u8), None); - assert_eq!(ext.get::(), Some(&String::from("foo"))); - assert_eq!(ext.get::(), Some(&1u8)); - assert_eq!(ext.get_mut::(), Some(&mut String::from("foo"))); - assert_eq!(ext.get_mut::(), Some(&mut 1u8)); - - assert_eq!(ext.set(String::from("bar")), Some(String::from("foo"))); - assert_eq!(ext.get::(), Some(&String::from("bar"))); - assert_eq!(ext.get::(), Some(&1u8)); - assert_eq!(ext.get_mut::(), Some(&mut String::from("bar"))); - assert_eq!(ext.get_mut::(), Some(&mut 1u8)); - - ext.get_mut::().unwrap().push_str("baz"); - assert_eq!(ext.get::(), Some(&String::from("barbaz"))); - assert_eq!(ext.get::(), Some(&1u8)); - assert_eq!(ext.get_mut::(), Some(&mut String::from("barbaz"))); - assert_eq!(ext.get_mut::(), Some(&mut 1u8)); - - assert_eq!(ext.remove::(), Some(String::from("barbaz"))); - assert_eq!(ext.get::(), None); - assert_eq!(ext.get::(), Some(&1u8)); - assert_eq!(ext.get_mut::(), None); - assert_eq!(ext.get_mut::(), Some(&mut 1u8)); - assert_eq!(ext.remove::(), None); - } - - #[test] - fn test_clone() { - let mut ext = Extensions::default(); - ext.set(String::from("foo")); - - let ext2 = ext.clone(); - - ext.get_mut::().unwrap().push_str("bar"); - ext.set(1u8); - - assert_eq!(ext.get::(), Some(&String::from("foobar"))); - assert_eq!(ext.get::(), Some(&1)); - assert_eq!(ext2.get::(), Some(&String::from("foo"))); - assert_eq!(ext2.get::(), None); - } - - fn assert_send(_o: &T) {} - fn assert_sync(_o: &T) {} -} diff --git a/object_store/src/lib.rs b/object_store/src/lib.rs index 54a5f94e8782..21352f5761ec 100644 --- a/object_store/src/lib.rs +++ b/object_store/src/lib.rs @@ -511,7 +511,6 @@ pub mod buffered; #[cfg(not(target_arch = "wasm32"))] pub mod chunked; pub mod delimited; -pub mod extensions; #[cfg(feature = "gcp")] pub mod gcp; #[cfg(feature = "http")] @@ -972,7 +971,7 @@ pub struct GetOptions { /// that need to pass context-specific information (like tracing spans) via trait methods. /// /// These extensions are ignored entirely by backends offered through this crate. - pub extensions: extensions::Extensions, + pub extensions: ::http::Extensions, } impl GetOptions { From f4fc7bf7ba01c360cddfd99f494b93967bce1288 Mon Sep 17 00:00:00 2001 From: Marco Neumann Date: Thu, 27 Feb 2025 13:37:15 +0100 Subject: [PATCH 3/3] feat: wire `Extensions` into HTTP stack --- object_store/src/client/builder.rs | 7 +++++++ object_store/src/client/mod.rs | 23 ++++++++++++++++++----- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/object_store/src/client/builder.rs b/object_store/src/client/builder.rs index 0fbc12fd9484..fcbc6e8baee8 100644 --- a/object_store/src/client/builder.rs +++ b/object_store/src/client/builder.rs @@ -92,6 +92,13 @@ impl HttpRequestBuilder { self } + pub(crate) fn extensions(mut self, extensions: ::http::Extensions) -> Self { + if let Ok(r) = &mut self.request { + *r.extensions_mut() = extensions; + } + self + } + pub(crate) fn header(mut self, name: K, value: V) -> Self where K: TryInto, diff --git a/object_store/src/client/mod.rs b/object_store/src/client/mod.rs index 4fe3cff159a2..36252f54f18c 100644 --- a/object_store/src/client/mod.rs +++ b/object_store/src/client/mod.rs @@ -718,27 +718,40 @@ impl GetOptionsExt for HttpRequestBuilder { fn with_get_options(mut self, options: GetOptions) -> Self { use hyper::header::*; - if let Some(range) = options.range { + let GetOptions { + if_match, + if_none_match, + if_modified_since, + if_unmodified_since, + range, + version: _, + head: _, + extensions, + } = options; + + if let Some(range) = range { self = self.header(RANGE, range.to_string()); } - if let Some(tag) = options.if_match { + if let Some(tag) = if_match { self = self.header(IF_MATCH, tag); } - if let Some(tag) = options.if_none_match { + if let Some(tag) = if_none_match { self = self.header(IF_NONE_MATCH, tag); } const DATE_FORMAT: &str = "%a, %d %b %Y %H:%M:%S GMT"; - if let Some(date) = options.if_unmodified_since { + if let Some(date) = if_unmodified_since { self = self.header(IF_UNMODIFIED_SINCE, date.format(DATE_FORMAT).to_string()); } - if let Some(date) = options.if_modified_since { + if let Some(date) = if_modified_since { self = self.header(IF_MODIFIED_SINCE, date.format(DATE_FORMAT).to_string()); } + self = self.extensions(extensions); + self } }