Skip to content

Commit

Permalink
clean up signal handling + handle SIGTERM - for #152
Browse files Browse the repository at this point in the history
  • Loading branch information
clux committed Apr 8, 2020
1 parent 745c7dd commit a7f4813
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 20 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
* People parsing `~/.kube/config` must use the `KubeConfig` struct instead
* `Reflector<K>` now only takes an `Api<K>` to construct (.params method)
* `Informer<K>` now only takes an `Api<K>` to construct (.params method)
* `Reflector` is now self-polls #151
* `Reflector` now has basic signal handling #152
* `Informer::init_from` -> `Informer::set_version`
* `Reflector` now self-polls #151 + handles signals #152
* `Reflector::poll` made private in favour of `Reflector::run`
* `Api::watch` no longer filters out error events (`next` -> `try_next`)
* `Api::watch` returns `Result<WatchEvent>` rather than `WatchEvent`
* `WatchEvent::Bookmark` added to enum
Expand Down
21 changes: 12 additions & 9 deletions kube/examples/configmap_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ use kube::{
Client,
};

fn spawn_periodic_reader(rf: Reflector<ConfigMap>) {
tokio::spawn(async move {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let cms: Vec<_> = rf.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current configmaps: {:?}", cms);
}
});
}

/// Example way to read secrets
#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -18,15 +29,7 @@ async fn main() -> anyhow::Result<()> {
let lp = ListParams::default().timeout(10); // short watch timeout in this example
let rf = Reflector::new(cms).params(lp);

let rf2 = rf.clone(); // read from a clone in a task
tokio::spawn(async move {
loop {
// Periodically read our state
tokio::time::delay_for(std::time::Duration::from_secs(5)).await;
let pods: Vec<_> = rf2.state().await.unwrap().iter().map(Meta::name).collect();
info!("Current configmaps: {:?}", pods);
}
});
spawn_periodic_reader(rf.clone()); // read from a clone in a task

rf.run().await?; // run reflector and listen for signals
Ok(())
Expand Down
36 changes: 27 additions & 9 deletions kube/src/runtime/reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use crate::{
};
use futures::{future::FutureExt, lock::Mutex, pin_mut, select, TryStreamExt};
use serde::de::DeserializeOwned;
use tokio::{signal, time::delay_for};
use tokio::{
signal::{self, ctrl_c},
time::delay_for,
};

use std::{collections::BTreeMap, sync::Arc, time::Duration};

Expand Down Expand Up @@ -53,24 +56,37 @@ where
pub async fn run(self) -> Result<()> {
self.reset().await?;
loop {
let signal_fut = signal::ctrl_c().fuse(); // TODO: SIGTERM
let stream_fut = self.poll().fuse();
pin_mut!(signal_fut, stream_fut);
// local development needs listening for ctrl_c
let ctrlc_fut = ctrl_c().fuse();
// kubernetes apps need to listen for SIGTERM (30s warning)
use signal::unix::{signal, SignalKind}; // TODO: conditional compile
let mut sigterm = signal(SignalKind::terminate()).unwrap();
let sigterm_fut = sigterm.recv().fuse();

// and reflector needs to poll continuously
let poll_fut = self.poll().fuse();

// Then pin then futures to the stack, and wait for any of them
pin_mut!(ctrlc_fut, sigterm_fut, poll_fut);
select! {
sig = signal_fut => {
ctrlc = ctrlc_fut => {
warn!("Intercepted ctrl_c signal");
return Ok(());
},
stream = stream_fut => {
if let Err(e) = stream {
sigterm = sigterm_fut => {
warn!("Intercepted SIGTERM");
return Ok(());
}
poll = poll_fut => {
// Error handle if not ok, otherwise, we do another iteration
if let Err(e) = poll {
warn!("Poll error on {}: {}: {:?}", self.api.resource.kind, e, e);
// If desynched due to mismatching resourceVersion, retry in a bit
let dur = Duration::from_secs(10);
delay_for(dur).await;
self.reset().await?; // propagate error if this failed..
}
},
complete => continue, // another poll
}
}
}
}
Expand All @@ -86,6 +102,7 @@ where
// For every event, modify our state
while let Some(ev) = stream.try_next().await? {
let mut state = self.state.lock().await;
// Informer-like version tracking:
match &ev {
WatchEvent::Added(o)
| WatchEvent::Modified(o)
Expand All @@ -101,6 +118,7 @@ where
}

let data = &mut state.data;
// Core Reflector logic
match ev {
WatchEvent::Added(o) => {
debug!("Adding {} to {}", Meta::name(&o), kind);
Expand Down

0 comments on commit a7f4813

Please sign in to comment.