Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

customizable wait time on shard queuer #3031

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion src/gateway/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::ops::Range;
use std::sync::Arc;
#[cfg(feature = "framework")]
use std::sync::OnceLock;
use std::time::Duration;

use futures::channel::mpsc::UnboundedReceiver as Receiver;
use futures::future::BoxFuture;
Expand All @@ -49,7 +50,14 @@ use crate::cache::Settings as CacheSettings;
use crate::framework::Framework;
#[cfg(feature = "voice")]
use crate::gateway::VoiceGatewayManager;
use crate::gateway::{ActivityData, GatewayError, PresenceData, ShardManager, ShardManagerOptions};
use crate::gateway::{
ActivityData,
GatewayError,
PresenceData,
ShardManager,
ShardManagerOptions,
DEFAULT_WAIT_BETWEEN_SHARD_START,
};
use crate::http::Http;
use crate::internal::prelude::*;
use crate::internal::tokio::spawn_named;
Expand All @@ -73,6 +81,7 @@ pub struct ClientBuilder {
event_handler: Option<Arc<dyn EventHandler>>,
raw_event_handler: Option<Arc<dyn RawEventHandler>>,
presence: PresenceData,
wait_time_between_shard_start: Duration,
}

impl ClientBuilder {
Expand Down Expand Up @@ -106,6 +115,7 @@ impl ClientBuilder {
event_handler: None,
raw_event_handler: None,
presence: PresenceData::default(),
wait_time_between_shard_start: DEFAULT_WAIT_BETWEEN_SHARD_START,
}
}

Expand Down Expand Up @@ -153,6 +163,19 @@ impl ClientBuilder {
self.framework.as_deref()
}

/// Sets the time to wait between starting shards.
///
/// This should only be used when using a gateway proxy, such as [Sandwich] or [Twilight Gateway
/// Proxy], as otherwise this will lead to gateway disconnects if the shard start rate limit is
/// not respected.
///
/// [Sandwich]: https://github.com/WelcomerTeam/Sandwich-Daemon
/// [Twilight Gateway Proxy]: https://github.com/Gelbpunkt/gateway-proxy
pub fn wait_time_between_shard_start(mut self, wait_time: Duration) -> Self {
self.wait_time_between_shard_start = wait_time;
self
}

/// Sets the voice gateway handler to be used. It will receive voice events sent over the
/// gateway and then consider - based on its settings - whether to dispatch a command.
#[cfg(feature = "voice")]
Expand Down Expand Up @@ -318,6 +341,7 @@ impl IntoFuture for ClientBuilder {
intents,
presence: Some(presence),
max_concurrency,
wait_time_between_shard_start: self.wait_time_between_shard_start,
});

let client = Client {
Expand Down
6 changes: 5 additions & 1 deletion src/gateway/sharding/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame;
use tracing::{debug, error, info, trace, warn};
use url::Url;

pub use self::shard_manager::{ShardManager, ShardManagerOptions};
pub use self::shard_manager::{
ShardManager,
ShardManagerOptions,
DEFAULT_WAIT_BETWEEN_SHARD_START,
};
pub use self::shard_messenger::ShardMessenger;
pub use self::shard_queuer::{ShardQueue, ShardQueuer, ShardQueuerMessage};
pub use self::shard_runner::{ShardRunner, ShardRunnerMessage, ShardRunnerOptions};
Expand Down
9 changes: 8 additions & 1 deletion src/gateway/sharding/shard_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ use crate::internal::prelude::*;
use crate::internal::tokio::spawn_named;
use crate::model::gateway::GatewayIntents;

/// The default time to wait between starting each shard or set of shards.
pub const DEFAULT_WAIT_BETWEEN_SHARD_START: Duration = Duration::from_secs(5);

/// A manager for handling the status of shards by starting them, restarting them, and stopping
/// them when required.
///
Expand All @@ -50,7 +53,7 @@ use crate::model::gateway::GatewayIntents;
/// use std::sync::{Arc, OnceLock};
///
/// use serenity::gateway::client::EventHandler;
/// use serenity::gateway::{ShardManager, ShardManagerOptions};
/// use serenity::gateway::{ShardManager, ShardManagerOptions, DEFAULT_WAIT_BETWEEN_SHARD_START};
/// use serenity::http::Http;
/// use serenity::model::gateway::GatewayIntents;
/// use serenity::prelude::*;
Expand Down Expand Up @@ -84,6 +87,7 @@ use crate::model::gateway::GatewayIntents;
/// intents: GatewayIntents::non_privileged(),
/// presence: None,
/// max_concurrency,
/// wait_time_between_shard_start: DEFAULT_WAIT_BETWEEN_SHARD_START,
/// });
/// # Ok(())
/// # }
Expand Down Expand Up @@ -146,6 +150,7 @@ impl ShardManager {
http: opt.http,
intents: opt.intents,
presence: opt.presence,
wait_time_between_shard_start: opt.wait_time_between_shard_start,
};

spawn_named("shard_queuer::run", async move {
Expand Down Expand Up @@ -372,4 +377,6 @@ pub struct ShardManagerOptions {
pub intents: GatewayIntents,
pub presence: Option<PresenceData>,
pub max_concurrency: NonZeroU16,
/// Number of seconds to wait between starting each shard/set of shards start
pub wait_time_between_shard_start: Duration,
}
17 changes: 8 additions & 9 deletions src/gateway/sharding/shard_queuer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@ use crate::internal::prelude::*;
use crate::internal::tokio::spawn_named;
use crate::model::gateway::{GatewayIntents, ShardInfo};

const WAIT_BETWEEN_BOOTS_IN_SECONDS: u64 = 5;

/// The shard queuer is a simple loop that runs indefinitely to manage the startup of shards.
///
/// A shard queuer instance _should_ be run in its own thread, due to the blocking nature of the
Expand Down Expand Up @@ -68,6 +66,8 @@ pub struct ShardQueuer {
pub ws_url: Arc<str>,
/// The total amount of shards to start.
pub shard_total: NonZeroU16,
/// Number of seconds to wait between each start
pub wait_time_between_shard_start: Duration,
#[cfg(feature = "cache")]
pub cache: Arc<Cache>,
pub http: Arc<Http>,
Expand All @@ -94,14 +94,14 @@ impl ShardQueuer {
/// **Note**: This should be run in its own thread due to the blocking nature of the loop.
#[cfg_attr(feature = "tracing_instrument", instrument(skip(self)))]
pub async fn run(&mut self) {
// We read from the Rx channel in a loop, and use a timeout of 5 seconds so that we don't
// We read from the Rx channel in a loop, and use a timeout of
// {self.WAIT_TIME_BETWEEN_SHARD_START} (5 seconds normally) seconds so that we don't
// hang forever. When we receive a command to start a shard, we append it to our queue. The
// queue is popped in batches of shards, which are started in parallel. A batch is fired
// every 5 seconds at minimum in order to avoid being ratelimited.
const TIMEOUT: Duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS);
// every WAIT_TIME_BETWEEN_SHARD_START at minimum in order to avoid being ratelimited.

loop {
if let Ok(msg) = timeout(TIMEOUT, self.rx.next()).await {
if let Ok(msg) = timeout(self.wait_time_between_shard_start, self.rx.next()).await {
match msg {
Some(ShardQueuerMessage::SetShardTotal(shard_total)) => {
self.shard_total = shard_total;
Expand Down Expand Up @@ -157,14 +157,13 @@ impl ShardQueuer {
let Some(instant) = self.last_start else { return };

// We must wait 5 seconds between IDENTIFYs to avoid session invalidations.
let duration = Duration::from_secs(WAIT_BETWEEN_BOOTS_IN_SECONDS);
let elapsed = instant.elapsed();

if elapsed >= duration {
if elapsed >= self.wait_time_between_shard_start {
return;
}

let to_sleep = duration - elapsed;
let to_sleep = self.wait_time_between_shard_start - elapsed;

sleep(to_sleep).await;
}
Expand Down
Loading