Skip to content

Commit

Permalink
Introduce support for persistent metadata watches (kube-rs#1145)
Browse files Browse the repository at this point in the history
* Introduce support for persistent metadata watches

The `watch` (and `watch_metadata` respectively) functions on the Api
type are fallible, and watches are not recovered. Errors may happen for
any reason, such as network induced errors, restarts (etcd can only
cache so many resourve versions), and so on. To get around these
failures, we have a `watcher()` utility in the runtime crate that
manages the underlying stream in a persistent way, recovering on
failure.

This change introduces support for persistent metadata watches, through
a `metadata_watcher` function in the same crate. Watches may be
established on any type of resources, the main difference is that the
returned types no longer correspond to the type of the Api. Instead,
a concrete metadata type is returned.

To support this with no breaking changes and to allow for more maintable
code, a few utility functions and traits are introduced in the `runtime`
crate.

Signed-off-by: Matei David <matei@buoyant.io>

* Run clippy

Signed-off-by: Matei David <matei@buoyant.io>

* Make closure arg generic

Signed-off-by: Matei David <matei@buoyant.io>

* Fix doc test

Signed-off-by: Matei David <matei@buoyant.io>

* Bump MSRV to 1.63.0

Signed-off-by: Matei David <matei@buoyant.io>

* Rename AsyncFn to StepFn

Signed-off-by: Matei David <matei@buoyant.io>

* Add a compile-time typecheck and a meta example to dynamic watcher

Signed-off-by: Matei David <matei@buoyant.io>

* Rename watch_metadata to metadata_watcher and allow module rep

Signed-off-by: Matei David <matei@buoyant.io>

* Add trait to specialize Api calls instead of relying on closures

Signed-off-by: Matei David <matei@buoyant.io>

* Change meta watcher fn name in example

Signed-off-by: Matei David <matei@buoyant.io>

* Parse evar as 1

Signed-off-by: Matei David <matei@buoyant.io>

* Refactor dynamic_watcher example

Signed-off-by: Matei David <matei@buoyant.io>

---------

Signed-off-by: Matei David <matei@buoyant.io>
  • Loading branch information
mateiidavid authored and jmintb committed Mar 6, 2023
1 parent 537c34b commit b6d8ebd
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 21 deletions.
31 changes: 25 additions & 6 deletions examples/dynamic_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
use futures::{StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use kube::{
api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt},
discovery::{self, Scope},
runtime::{watcher, WatchStreamExt},
discovery::{self, ApiCapabilities, Scope},
runtime::{metadata_watcher, watcher, WatchStreamExt},
Client,
};
use serde::de::DeserializeOwned;
use tracing::*;

use std::env;
use std::{env, fmt::Debug};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

// If set will receive only the metadata for watched resources
let watch_metadata = env::var("WATCH_METADATA").map(|s| s == "1").unwrap_or(false);

// Take dynamic resource identifiers:
let group = env::var("GROUP").unwrap_or_else(|_| "clux.dev".into());
let version = env::var("VERSION").unwrap_or_else(|_| "v1".into());
Expand All @@ -27,14 +31,29 @@ async fn main() -> anyhow::Result<()> {
// Use the full resource info to create an Api with the ApiResource as its DynamicType
let api = Api::<DynamicObject>::all_with(client, &ar);

// Start a metadata or a full resource watch
if watch_metadata {
handle_events(metadata_watcher(api, ListParams::default()), caps).await?
} else {
handle_events(watcher(api, ListParams::default()), caps).await?
}

Ok(())
}

async fn handle_events<K: kube::Resource + Clone + Debug + Send + DeserializeOwned + 'static>(
stream: impl Stream<Item = watcher::Result<watcher::Event<K>>> + Send + 'static,
api_caps: ApiCapabilities,
) -> anyhow::Result<()> {
// Fully compatible with kube-runtime
let mut items = watcher(api, ListParams::default()).applied_objects().boxed();
let mut items = stream.applied_objects().boxed();
while let Some(p) = items.try_next().await? {
if caps.scope == Scope::Cluster {
if api_caps.scope == Scope::Cluster {
info!("saw {}", p.name_any());
} else {
info!("saw {} in {}", p.name_any(), p.namespace().unwrap());
}
}

Ok(())
}
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ json-patch = "0.3.0"
serde_json = "1.0.68"
thiserror = "1.0.29"
backoff = "0.4.0"
async-trait = "0.1.64"

[dependencies.k8s-openapi]
version = "0.17.0"
Expand Down
32 changes: 29 additions & 3 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -895,17 +895,29 @@ mod tests {
use crate::{
applier,
reflector::{self, ObjectRef},
watcher, Controller,
watcher::{self, metadata_watcher, watcher, Event},
Controller,
};
use futures::{pin_mut, StreamExt, TryStreamExt};
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::{core::ObjectMeta, Api};
use kube_client::{core::ObjectMeta, Api, Resource};
use serde::de::DeserializeOwned;
use tokio::time::timeout;

fn assert_send<T: Send>(x: T) -> T {
x
}

// Used to typecheck that a type T is a generic type that implements Stream
// and returns a WatchEvent generic over a resource `K`
fn assert_stream<T, K>(x: T) -> T
where
T: Stream<Item = watcher::Result<Event<K>>> + Send,
K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
{
x
}

fn mock_type<T>() -> T {
unimplemented!(
"mock_type is not supposed to be called, only used for filling holes in type assertions"
Expand All @@ -924,6 +936,20 @@ mod tests {
);
}

// not #[test] because we don't want to actually run it, we just want to
// assert that it typechecks
//
// will check return types for `watcher` and `watch_metadata` do not drift
// given an arbitrary K that implements `Resource` (e.g ConfigMap)
#[allow(dead_code, unused_must_use)]
fn test_watcher_stream_type_drift() {
assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
assert_stream(metadata_watcher(
mock_type::<Api<ConfigMap>>(),
Default::default(),
));
}

#[tokio::test]
async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
// This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ pub use finalizer::finalizer;
pub use reflector::reflector;
pub use scheduler::scheduler;
pub use utils::WatchStreamExt;
pub use watcher::watcher;
pub use watcher::{metadata_watcher, watcher};
162 changes: 151 additions & 11 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
//! See [`watcher`] for the primary entry point.
use crate::utils::ResetTimerBackoff;
use async_trait::async_trait;
use backoff::{backoff::Backoff, ExponentialBackoff};
use derivative::Derivative;
use futures::{stream::BoxStream, Stream, StreamExt};
use kube_client::{
api::{ListParams, Resource, ResourceExt, WatchEvent},
core::{metadata::PartialObjectMeta, ObjectList},
error::ErrorResponse,
Api, Error as ClientErr,
};
Expand Down Expand Up @@ -114,7 +116,7 @@ impl<K> Event<K> {
#[derive(Derivative)]
#[derivative(Debug)]
/// The internal finite state machine driving the [`watcher`]
enum State<K: Resource + Clone> {
enum State<K> {
/// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
Empty,
/// The initial LIST was successful, so we should move on to starting the actual watch.
Expand All @@ -132,15 +134,85 @@ enum State<K: Resource + Clone> {
},
}

/// Used to control whether the watcher receives the full object, or only the
/// metadata
#[async_trait]
trait ApiMode {
type Value: Clone;

async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
async fn watch(
&self,
lp: &ListParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
}

/// A wrapper around the `Api` of a `Resource` type that when used by the
/// watcher will return the entire (full) object
struct FullObject<'a, K> {
api: &'a Api<K>,
}

#[async_trait]
impl<K> ApiMode for FullObject<'_, K>
where
K: Clone + Debug + DeserializeOwned + Send + 'static,
{
type Value = K;

async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
self.api.list(lp).await
}

async fn watch(
&self,
lp: &ListParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
self.api.watch(lp, version).await.map(StreamExt::boxed)
}
}

/// A wrapper around the `Api` of a `Resource` type that when used by the
/// watcher will return only the metadata associated with an object
struct MetaOnly<'a, K> {
api: &'a Api<K>,
}

#[async_trait]
impl<K> ApiMode for MetaOnly<'_, K>
where
K: Clone + Debug + DeserializeOwned + Send + 'static,
{
type Value = PartialObjectMeta;

async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
self.api.list_metadata(lp).await
}

async fn watch(
&self,
lp: &ListParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
self.api.watch_metadata(lp, version).await.map(StreamExt::boxed)
}
}

/// Progresses the watcher a single step, returning (event, state)
///
/// This function should be trampolined: if event == `None`
/// then the function should be called again until it returns a Some.
async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: &Api<K>,
async fn step_trampolined<A>(
api: &A,
list_params: &ListParams,
state: State<K>,
) -> (Option<Result<Event<K>>>, State<K>) {
state: State<A::Value>,
) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
where
A: ApiMode,
A::Value: Resource + 'static,
{
match state {
State::Empty => match api.list(list_params).await {
Ok(list) => {
Expand All @@ -164,7 +236,7 @@ async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send
State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await {
Ok(stream) => (None, State::Watching {
resource_version,
stream: stream.boxed(),
stream,
}),
Err(err) => {
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
Expand Down Expand Up @@ -234,11 +306,15 @@ async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send
}

/// Trampoline helper for `step_trampolined`
async fn step<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: &Api<K>,
async fn step<A>(
api: &A,
list_params: &ListParams,
mut state: State<K>,
) -> (Result<Event<K>>, State<K>) {
mut state: State<A::Value>,
) -> (Result<Event<A::Value>>, State<A::Value>)
where
A: ApiMode,
A::Value: Resource + 'static,
{
loop {
match step_trampolined(api, list_params, state).await {
(Some(result), new_state) => return (result, new_state),
Expand Down Expand Up @@ -303,7 +379,71 @@ pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
futures::stream::unfold(
(api, list_params, State::Empty),
|(api, list_params, state)| async {
let (event, state) = step(&api, &list_params, state).await;
let (event, state) = step(&FullObject { api: &api }, &list_params, state).await;
Some((event, (api, list_params, state)))
},
)
}

/// Watches a Kubernetes Resource for changes continuously and receives only the
/// metadata
///
/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
///
/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
/// You can apply your own backoff by not polling the stream for a duration after errors.
/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
/// will terminate eagerly as soon as they receive an [`Err`].
///
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`].
/// Direct users may want to flatten composite events via [`WatchStreamExt`]:
///
/// ```no_run
/// use kube::{
/// api::{Api, ListParams, ResourceExt}, Client,
/// runtime::{watcher, metadata_watcher, WatchStreamExt}
/// };
/// use k8s_openapi::api::core::v1::Pod;
/// use futures::{StreamExt, TryStreamExt};
/// #[tokio::main]
/// async fn main() -> Result<(), watcher::Error> {
/// let client = Client::try_default().await.unwrap();
/// let pods: Api<Pod> = Api::namespaced(client, "apps");
///
/// metadata_watcher(pods, ListParams::default()).applied_objects()
/// .try_for_each(|p| async move {
/// println!("Applied: {}", p.name_any());
/// Ok(())
/// })
/// .await?;
/// Ok(())
/// }
/// ```
/// [`WatchStreamExt`]: super::WatchStreamExt
/// [`reflector`]: super::reflector::reflector
/// [`Api::watch`]: kube_client::Api::watch
///
/// # Recovery
///
/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
///
/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
/// an [`Event::Restarted`]. The internals mechanics of recovery should be considered an implementation detail.
#[allow(clippy::module_name_repetitions)]
pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
api: Api<K>,
list_params: ListParams,
) -> impl Stream<Item = Result<Event<PartialObjectMeta>>> + Send {
futures::stream::unfold(
(api, list_params, State::Empty),
|(api, list_params, state)| async {
let (event, state) = step(&MetaOnly { api: &api }, &list_params, state).await;
Some((event, (api, list_params, state)))
},
)
Expand Down

0 comments on commit b6d8ebd

Please sign in to comment.