Skip to content

Commit

Permalink
implements owns_stream and owns_stream_with for Controller
Browse files Browse the repository at this point in the history
Signed-off-by: David Herberth <github@dav1d.de>
  • Loading branch information
Dav1dde committed Mar 30, 2023
1 parent aed07be commit 32b223c
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 4 deletions.
3 changes: 2 additions & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ rust-version = "1.63.0"
edition = "2021"

[features]
unstable-runtime = ["unstable-runtime-subscribe"]
unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-owns_stream"]
unstable-runtime-subscribe = []
unstable-runtime-owns_stream = []

[package.metadata.docs.rs]
features = ["k8s-openapi/v1_26", "unstable-runtime"]
Expand Down
71 changes: 68 additions & 3 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -558,12 +558,15 @@ where
///
/// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
#[must_use]
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
pub fn owns<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
wc: Config,
) -> Self {
self.owns_with(api, (), wc)
) -> Self
where
Child::DynamicType: Default,
{
self.owns_with(api, Default::default(), wc)
}

/// Specify `Child` objects which `K` owns and should be watched
Expand All @@ -584,6 +587,68 @@ where
self
}

/// Trigger the reconciliation process for `Child` objets which `K` wns emitted by `trigger`.
///
/// Same as [`Controller::owns`], but instad of a resource a stream of resources is used.
/// This allows for customized and pre-filtered watch streams to be used as a trigger.
///
/// # Example:
///
/// ```no_run
/// # use futures::StreamExt;
/// # use k8s_openapi::api::core::v1::ConfigMap;
/// # use k8s_openapi::api::apps::v1::StatefulSet;
/// # use kube::runtime::controller::Action;
/// # use kube::runtime::{watcher, Controller, WatchStreamExt};
/// # use kube::{Api, Client, Error, ResourceExt};
/// # use std::sync::Arc;
/// # let client: Client = todo!();
/// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Store<Pod>>) -> Action { Action::await_change() }
/// # type CustomResource = ConfigMap;
///
/// let sts_stream = watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
/// .touched_objects()
/// .predicate_filter(predicates::generation);
///
/// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
/// .owns_stream(sts_stream)
/// .run(reconcile, error_policy, Arc::new(()))
/// .for_each(|_| std::future::ready(()))
/// .await;
/// # };
#[cfg(feature = "unstable-runtime-owns_stream")]
#[must_use]
pub fn owns_stream<Child: Resource + Send + 'static>(
mut self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
) -> Self
where
Child::DynamicType: Default,
{
let child_watcher = trigger_owners(trigger, self.dyntype.clone(), Default::default());
self.trigger_selector.push(child_watcher.boxed());
self
}

/// Trigger the reconciliation process for `Child` objets which `K` wns emitted by `trigger`.
///
/// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
#[cfg(feature = "unstable-runtime-owns_stream")]
#[must_use]
pub fn owns_stream_with<Child: Resource + Send + 'static>(
mut self,
trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
dyntype: Child::DynamicType,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
{
let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
self.trigger_selector.push(child_watcher.boxed());
self
}

/// Specify `Watched` object which `K` has a custom relation to and should be watched
///
/// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,
Expand Down

0 comments on commit 32b223c

Please sign in to comment.