Skip to content

Commit

Permalink
Merge pull request #213 from AcalaNetwork/add_missing_methods
Browse files Browse the repository at this point in the history
add keyed future missing methods
  • Loading branch information
antifuchs authored Dec 7, 2023
2 parents c1a9fdb + c615d35 commit 5cd2d5e
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
49 changes: 46 additions & 3 deletions governor/src/state/keyed/future.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::prelude::v1::*;

use crate::{
clock, middleware::RateLimitingMiddleware, state::keyed::KeyedStateStore, Jitter, NotUntil,
RateLimiter,
clock, errors::InsufficientCapacity, middleware::RateLimitingMiddleware,
state::keyed::KeyedStateStore, Jitter, NotUntil, RateLimiter,
};
use futures_timer::Delay;
use std::hash::Hash;
use std::{hash::Hash, num::NonZeroU32};

#[cfg(feature = "std")]
/// # Keyed rate limiters - `async`/`await`
Expand Down Expand Up @@ -57,4 +57,47 @@ where
}
}
}

/// Asynchronously resolves as soon as the rate limiter allows it.
///
/// This is similar to `until_key_ready` except it waits for an abitrary number
/// of `n` cells to be available.
///
/// Returns `InsufficientCapacity` if the `n` provided exceeds the maximum
/// capacity of the rate limiter.
pub async fn until_key_n_ready(
&self,
key: &K,
n: NonZeroU32,
) -> Result<MW::PositiveOutcome, InsufficientCapacity> {
self.until_key_n_ready_with_jitter(key, n, Jitter::NONE)
.await
}

/// Asynchronously resolves as soon as the rate limiter allows it, with a
/// randomized wait period.
///
/// This is similar to `until_key_ready_with_jitter` except it waits for an
/// abitrary number of `n` cells to be available.
///
/// Returns `InsufficientCapacity` if the `n` provided exceeds the maximum
/// capacity of the rate limiter.
pub async fn until_key_n_ready_with_jitter(
&self,
key: &K,
n: NonZeroU32,
jitter: Jitter,
) -> Result<MW::PositiveOutcome, InsufficientCapacity> {
loop {
match self.check_key_n(key, n)? {
Ok(x) => {
return Ok(x);
}
Err(negative) => {
let delay = Delay::new(jitter + negative.wait_time_from(self.clock.now()));
delay.await;
}
}
}
}
}
24 changes: 24 additions & 0 deletions governor/tests/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,18 @@ fn pauses_keyed() {
assert_ge!(i.elapsed(), Duration::from_millis(100));
}

#[test]
fn pauses_keyed_n() {
let lim = RateLimiter::keyed(Quota::per_second(nonzero!(10u32)));

for _ in 0..6 {
lim.check_key(&1u32).unwrap();
}
let i = Instant::now();
block_on(lim.until_key_n_ready(&1u32, nonzero!(5u32))).unwrap();
assert_ge!(i.elapsed(), Duration::from_millis(100));
}

#[test]
fn proceeds() {
let lim = RateLimiter::direct(Quota::per_second(nonzero!(2u32)));
Expand All @@ -79,6 +91,14 @@ fn proceeds_keyed() {
assert_le!(i.elapsed(), MAX_TEST_RUN_DURATION);
}

#[test]
fn proceeds_keyed_n() {
let lim = RateLimiter::keyed(Quota::per_second(nonzero!(3u32)));
let i = Instant::now();
block_on(lim.until_key_n_ready(&1u32, nonzero!(2u32))).unwrap();
assert_le!(i.elapsed(), MAX_TEST_RUN_DURATION);
}

#[test]
fn multiple() {
let lim = Arc::new(RateLimiter::direct(Quota::per_second(nonzero!(10u32))));
Expand Down Expand Up @@ -125,4 +145,8 @@ fn errors_on_exceeded_capacity() {
let lim = RateLimiter::direct(Quota::per_second(nonzero!(10u32)));

block_on(lim.until_n_ready(nonzero!(11u32))).unwrap_err();

let lim = RateLimiter::keyed(Quota::per_second(nonzero!(10u32)));

block_on(lim.until_key_n_ready(&1u32, nonzero!(11u32))).unwrap_err();
}

0 comments on commit 5cd2d5e

Please sign in to comment.