Skip to content

Commit

Permalink
Implement MultiDispatcher for a DynamicObject steam
Browse files Browse the repository at this point in the history
Signed-off-by: Danil-Grigorev <danil.grigorev@suse.com>
  • Loading branch information
Danil-Grigorev committed Feb 26, 2025
1 parent a4035ee commit fa82100
Show file tree
Hide file tree
Showing 13 changed files with 441 additions and 211 deletions.
4 changes: 2 additions & 2 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ name = "multi_watcher"
path = "multi_watcher.rs"

[[example]]
name = "multi_reflector"
path = "multi_reflector.rs"
name = "broadcast_reflector"
path = "broadcast_reflector.rs"

[[example]]
name = "pod_api"
Expand Down
112 changes: 112 additions & 0 deletions examples/broadcast_reflector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use futures::{future, stream, StreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{ConfigMap, Secret},
};
use kube::{
api::ApiResource,
runtime::{
controller::Action,
reflector::multi_dispatcher::MultiDispatcher,
watcher, Controller, WatchStreamExt,
},
Api, Client, ResourceExt,
};
use std::{fmt::Debug, sync::Arc, time::Duration};
use thiserror::Error;
use tracing::*;

#[derive(Debug, Error)]
enum Infallible {}

// A generic reconciler that can be used with any object whose type is known at
// compile time. Will simply log its kind on reconciliation.
async fn reconcile<K>(_obj: Arc<K>, _ctx: Arc<()>) -> Result<Action, Infallible>
where
K: ResourceExt<DynamicType = ()>,
{
let kind = K::kind(&());
info!("Reconciled {kind}");
Ok(Action::await_change())
}

fn error_policy<K: ResourceExt>(_: Arc<K>, _: &Infallible, _ctx: Arc<()>) -> Action {
info!("error");
Action::requeue(Duration::from_secs(10))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let writer = MultiDispatcher::new(128);

// multireflector stream
let mut combo_stream = stream::select_all(vec![]);
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<Deployment>(&())),
Default::default(),
)
.boxed(),
);

// watching config maps, but ignoring in the final configuration
combo_stream.push(
watcher::watcher(
Api::all_with(client.clone(), &ApiResource::erase::<ConfigMap>(&())),
Default::default(),
)
.boxed(),
);

// Combine duplicate type streams with narrowed down selection
combo_stream.push(
watcher::watcher(
Api::default_namespaced_with(client.clone(), &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);
combo_stream.push(
watcher::watcher(
Api::namespaced_with(client.clone(), "kube-system", &ApiResource::erase::<Secret>(&())),
Default::default(),
)
.boxed(),
);

let watcher = combo_stream.broadcast_shared(writer.clone());

let (sub, reader) = writer.subscribe::<Deployment>();
let deploy = Controller::for_shared_stream(sub, reader)
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(()))
.for_each(|res| async move {
match res {
Ok(v) => info!("Reconciled deployment {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile metadata"),
};
});

let (sub, reader) = writer.subscribe::<Secret>();
let secret = Controller::for_shared_stream(sub, reader)
.shutdown_on_signal()
.run(reconcile, error_policy, Arc::new(()))
.for_each(|res| async move {
match res {
Ok(v) => info!("Reconciled secret {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile metadata"),
};
});

info!("long watches starting");
tokio::select! {
r = watcher.for_each(|_| future::ready(())) => println!("watcher exit: {r:?}"),
x = deploy => println!("deployments exit: {x:?}"),
x = secret => println!("secrets exit: {x:?}"),
}

Ok(())
}
145 changes: 0 additions & 145 deletions examples/multi_reflector.rs

This file was deleted.

8 changes: 7 additions & 1 deletion kube-core/src/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
pub use crate::discovery::ApiResource;
use crate::{
metadata::TypeMeta,
resource::{DynamicResourceScope, Resource},
resource::{DynamicResourceScope, Resource}, GroupVersionKind,
};

use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
Expand Down Expand Up @@ -73,6 +73,12 @@ impl DynamicObject {
) -> Result<K, ParseDynamicObjectError> {
Ok(serde_json::from_value(serde_json::to_value(self)?)?)
}

/// Returns the group, version, and kind (GVK) of this resource.
pub fn gvk(&self) -> Option<GroupVersionKind> {
let gvk = self.types.clone()?;
gvk.try_into().ok()
}
}

impl Resource for DynamicObject {
Expand Down
8 changes: 3 additions & 5 deletions kube-core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,9 @@ impl TypeMeta {
/// assert_eq!(type_meta.clone().singular().kind, "Pod");
/// assert_eq!(type_meta.clone().singular().api_version, "v1");
/// ```
pub fn singular(self) -> Self {
Self {
kind: self.kind.strip_suffix("List").unwrap_or(&self.kind).to_string(),
..self
}
pub fn singular(self) -> Option<Self> {
let kind = self.kind.strip_suffix("List")?.to_string();
(!kind.is_empty()).then_some(Self { kind, ..self })
}
}

Expand Down
4 changes: 3 additions & 1 deletion kube-core/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,9 @@ impl<'de, T: DeserializeOwned + Clone> serde::Deserialize<'de> for ObjectList<T>
} = DynamicList::deserialize(d)?;
let mut resources = vec![];
for o in items.iter_mut() {
o.types = Some(types.clone().singular());
if o.types.is_none() {
o.types = types.clone().singular();
}
let item = serde_json::to_value(o).map_err(de::Error::custom)?;
resources.push(serde_json::from_value(item).map_err(de::Error::custom)?)
}
Expand Down
Loading

0 comments on commit fa82100

Please sign in to comment.