From c8d1d014a4cd83e61b285c2b55f6cd3b48800e87 Mon Sep 17 00:00:00 2001 From: Valdemar Erk Date: Sat, 18 Sep 2021 20:14:48 +0200 Subject: [PATCH 1/5] Support for transport compression This is a revive of #733 Closes #693 --- Cargo.toml | 10 +- examples/e01_basic_ping_bot/Cargo.toml | 2 +- src/client/bridge/gateway/shard_runner.rs | 7 +- src/error.rs | 14 +++ src/gateway/mod.rs | 8 +- src/gateway/shard.rs | 38 +++++--- src/gateway/ws_client_ext.rs | 4 +- src/internal/inflater.rs | 109 ++++++++++++++++++++++ src/internal/mod.rs | 6 ++ src/internal/ws_impl.rs | 68 +++++++++----- src/json.rs | 3 + 11 files changed, 223 insertions(+), 46 deletions(-) create mode 100644 src/internal/inflater.rs diff --git a/Cargo.toml b/Cargo.toml index 0613b25dd16..223031a6cd4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,8 @@ version = "0.4.10" [dependencies.flate2] optional = true +default-features = false +features = ["zlib"] version = "1.0.13" [dependencies.reqwest] @@ -169,12 +171,13 @@ version = "0.4" [features] # Defaults with different backends -default = ["default_no_backend", "rustls_backend"] -default_native_tls = ["default_no_backend", "native_tls_backend"] -default_tokio_0_2 = ["default_no_backend", "rustls_tokio_0_2_backend"] +default = ["default_no_backend", "rustls_backend", "transport_compression"] +default_native_tls = ["default_no_backend", "native_tls_backend", "transport_compression"] +default_tokio_0_2 = ["default_no_backend", "rustls_tokio_0_2_backend", "transport_compression"] default_native_tls_tokio_0_2 = [ "default_no_backend", "native_tls_tokio_0_2_backend", + "transport_compression", ] # Serenity requires a backend, this picks all default features without a backend. @@ -205,6 +208,7 @@ standard_framework = ["framework", "uwl", "command_attr", "static_assertions"] unstable_discord_api = [] utils = ["base64"] voice = ["client", "model"] +transport_compression = ["gateway", "flate2"] # Enables simd accelerated parsing simdjson = ["simd-json"] diff --git a/examples/e01_basic_ping_bot/Cargo.toml b/examples/e01_basic_ping_bot/Cargo.toml index 7f9cb61751b..782d6d9f678 100644 --- a/examples/e01_basic_ping_bot/Cargo.toml +++ b/examples/e01_basic_ping_bot/Cargo.toml @@ -5,5 +5,5 @@ authors = ["my name "] edition = "2018" [dependencies] -serenity = { path = "../../", default-features = false, features = ["client", "gateway", "rustls_backend", "model"] } +serenity = { path = "../../", default-features = false, features = ["client", "gateway", "rustls_backend", "model", "transport_compression"] } tokio = { version = "1.0", features = ["macros", "rt-multi-thread"] } diff --git a/src/client/bridge/gateway/shard_runner.rs b/src/client/bridge/gateway/shard_runner.rs index b81c87da3ec..de84ebe8e51 100644 --- a/src/client/bridge/gateway/shard_runner.rs +++ b/src/client/bridge/gateway/shard_runner.rs @@ -297,6 +297,7 @@ impl ShardRunner { let _ = self .shard .client + .stream .close(Some(CloseFrame { code: close_code.into(), reason: Cow::from(""), @@ -306,7 +307,7 @@ impl ShardRunner { // In return, we wait for either a Close Frame response, or an error, after which this WS is deemed // disconnected from Discord. loop { - match self.shard.client.next().await { + match self.shard.client.stream.next().await { Some(Ok(tungstenite::Message::Close(_))) => break, Some(Err(_)) => { warn!( @@ -409,10 +410,10 @@ impl ShardRunner { code: code.into(), reason: Cow::from(reason), }; - self.shard.client.close(Some(close)).await.is_ok() + self.shard.client.stream.close(Some(close)).await.is_ok() }, ShardClientMessage::Runner(ShardRunnerMessage::Message(msg)) => { - self.shard.client.send(msg).await.is_ok() + self.shard.client.stream.send(msg).await.is_ok() }, ShardClientMessage::Runner(ShardRunnerMessage::SetActivity(activity)) => { // To avoid a clone of `activity`, we do a little bit of diff --git a/src/error.rs b/src/error.rs index 110113b0b4b..829004cba6f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,6 +7,8 @@ use std::{ #[cfg(feature = "gateway")] use async_tungstenite::tungstenite::error::Error as TungsteniteError; +#[cfg(feature = "transport_compression")] +use flate2::DecompressError; #[cfg(feature = "http")] use reqwest::{header::InvalidHeaderValue, Error as ReqwestError}; use serde_json::Error as JsonError; @@ -93,6 +95,9 @@ pub enum Error { /// [client]: crate::client #[cfg(feature = "client")] Client(ClientError), + /// Error when decompressing a payload. + #[cfg(feature = "transport_compression")] + Flate2(DecompressError), /// A [collector] error. /// /// [collector]: crate::collector @@ -175,6 +180,13 @@ impl From for Error { } } +#[cfg(feature = "transport_compression")] +impl From for Error { + fn from(e: DecompressError) -> Error { + Error::Flate2(e) + } +} + #[cfg(feature = "gateway")] impl From for Error { fn from(e: TungsteniteError) -> Error { @@ -227,6 +239,8 @@ impl Display for Error { Error::Http(inner) => fmt::Display::fmt(&inner, f), #[cfg(all(feature = "gateway", not(feature = "native_tls_backend_marker")))] Error::Rustls(inner) => fmt::Display::fmt(&inner, f), + #[cfg(feature = "transport_compression")] + Error::Flate2(inner) => fmt::Display::fmt(&inner, f), #[cfg(feature = "gateway")] Error::Tungstenite(inner) => fmt::Display::fmt(&inner, f), } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 02a38d6f10b..8cab5102339 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -59,6 +59,8 @@ pub use self::{ }; #[cfg(feature = "client")] use crate::client::bridge::gateway::ShardClientMessage; +#[cfg(feature = "transport_compression")] +use crate::internal::Inflater; use crate::json::Value; use crate::model::{gateway::Activity, user::OnlineStatus}; @@ -66,7 +68,11 @@ pub type CurrentPresence = (Option, OnlineStatus); use async_tungstenite::{tokio::ConnectStream, WebSocketStream}; -pub type WsStream = WebSocketStream; +pub struct WsClient { + #[cfg(feature = "transport_compression")] + pub(crate) inflater: Inflater, + pub(crate) stream: WebSocketStream, +} /// Indicates the current connection stage of a [`Shard`]. /// diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index be76410f1e5..778cd20710b 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -7,6 +7,7 @@ use async_tungstenite::tungstenite::{ error::Error as TungsteniteError, protocol::frame::CloseFrame, }; +use async_tungstenite::{tokio::ConnectStream, WebSocketStream}; use tokio::sync::Mutex; use tracing::{debug, error, info, instrument, trace, warn}; use url::Url; @@ -18,7 +19,7 @@ use super::{ ReconnectType, ShardAction, WebSocketGatewayClientExt, - WsStream, + WsClient, }; use crate::client::bridge::gateway::{ChunkGuildFilter, GatewayIntents}; use crate::constants::{self, close_codes}; @@ -27,6 +28,8 @@ use crate::internal::prelude::*; use crate::internal::ws_impl::create_native_tls_client; #[cfg(all(feature = "rustls_backend_marker", not(feature = "native_tls_backend_marker")))] use crate::internal::ws_impl::create_rustls_client; +#[cfg(feature = "transport_compression")] +use crate::internal::Inflater; use crate::model::{ event::{Event, GatewayEvent}, gateway::Activity, @@ -67,7 +70,7 @@ use crate::model::{ /// [docs]: https://discord.com/developers/docs/topics/gateway#sharding /// [module docs]: crate::gateway#sharding pub struct Shard { - pub client: WsStream, + pub client: WsClient, current_presence: CurrentPresence, /// A tuple of: /// @@ -140,7 +143,12 @@ impl Shard { intents: GatewayIntents, ) -> Result { let url = ws_url.lock().await.clone(); - let client = connect(&url).await?; + let stream = connect(&url).await?; + let client = WsClient { + #[cfg(feature = "transport_compression")] + inflater: Inflater::new(), + stream, + }; let current_presence = (None, OnlineStatus::Online); let heartbeat_instants = (None, None); @@ -735,7 +743,7 @@ impl Shard { /// This will set the stage of the shard before and after instantiation of /// the client. #[instrument(skip(self))] - pub async fn initialize(&mut self) -> Result { + pub async fn initialize(&mut self) -> Result<()> { debug!("[Shard {:?}] Initializing.", self.shard_info); // We need to do two, sort of three things here: @@ -749,10 +757,14 @@ impl Shard { self.stage = ConnectionStage::Connecting; self.started = Instant::now(); let url = &self.ws_url.lock().await.clone(); - let client = connect(url).await?; + // Reset inflater + #[cfg(feature = "transport_compression")] + self.client.inflater.reset(); + // Make new websocket stream + self.client.stream = connect(url).await?; self.stage = ConnectionStage::Handshake; - Ok(client) + Ok(()) } #[instrument(skip(self))] @@ -769,7 +781,7 @@ impl Shard { pub async fn resume(&mut self) -> Result<()> { debug!("[Shard {:?}] Attempting to resume", self.shard_info); - self.client = self.initialize().await?; + self.initialize().await?; self.stage = ConnectionStage::Resuming; match self.session_id.as_ref() { @@ -785,7 +797,7 @@ impl Shard { info!("[Shard {:?}] Attempting to reconnect", self.shard_info()); self.reset().await; - self.client = self.initialize().await?; + self.initialize().await?; Ok(()) } @@ -797,21 +809,25 @@ impl Shard { } #[cfg(all(feature = "rustls_backend_marker", not(feature = "native_tls_backend_marker")))] -async fn connect(base_url: &str) -> Result { +async fn connect(base_url: &str) -> Result> { let url = build_gateway_url(base_url)?; Ok(create_rustls_client(url).await?) } #[cfg(feature = "native_tls_backend_marker")] -async fn connect(base_url: &str) -> Result { +async fn connect(base_url: &str) -> Result { let url = build_gateway_url(base_url)?; Ok(create_native_tls_client(url).await?) } fn build_gateway_url(base: &str) -> Result { - Url::parse(&format!("{}?v={}", base, constants::GATEWAY_VERSION)).map_err(|why| { + #[cfg(feature = "transport_compression")] + const COMPRESSION: &str = "?compress=zlib-stream"; + #[cfg(not(feature = "transport_compression"))] + const COMPRESSION: &str = ""; + Url::parse(&format!("{}?v={}&encoding=json{}", base, constants::GATEWAY_VERSION, COMPRESSION)).map_err(|why| { warn!("Error building gateway URL with base `{}`: {:?}", base, why); Error::Gateway(GatewayError::BuildingUrl) diff --git a/src/gateway/ws_client_ext.rs b/src/gateway/ws_client_ext.rs index a396b40bc5a..6d7183931db 100644 --- a/src/gateway/ws_client_ext.rs +++ b/src/gateway/ws_client_ext.rs @@ -7,7 +7,7 @@ use tracing::{debug, trace}; use crate::client::bridge::gateway::{ChunkGuildFilter, GatewayIntents}; use crate::constants::{self, OpCode}; -use crate::gateway::{CurrentPresence, WsStream}; +use crate::gateway::{CurrentPresence, WsClient}; use crate::internal::prelude::*; use crate::internal::ws_impl::SenderExt; use crate::json::json; @@ -49,7 +49,7 @@ pub trait WebSocketGatewayClientExt { } #[async_trait] -impl WebSocketGatewayClientExt for WsStream { +impl WebSocketGatewayClientExt for WsClient { #[instrument(skip(self))] async fn send_chunk_guild( &mut self, diff --git a/src/internal/inflater.rs b/src/internal/inflater.rs new file mode 100644 index 00000000000..fe17ccd2938 --- /dev/null +++ b/src/internal/inflater.rs @@ -0,0 +1,109 @@ +use std::convert::TryInto; + +use flate2::{Decompress, DecompressError, FlushDecompress}; +use tracing::trace; + +const ZLIB_SUFFIX: [u8; 4] = [0x00, 0x00, 0xff, 0xff]; +const INTERNAL_BUFFER_SIZE: usize = 32 * 1024; + +pub struct Inflater { + decompress: Decompress, + compressed: Vec, + internal_buffer: Vec, + buffer: Vec, + countdown_to_resize: u8, +} + +impl Inflater { + pub fn new() -> Self { + Self { + decompress: Decompress::new(true), + compressed: Vec::new(), + internal_buffer: Vec::with_capacity(INTERNAL_BUFFER_SIZE), + buffer: Vec::with_capacity(32 * 1024), + countdown_to_resize: u8::max_value(), + } + } + + pub fn extend(&mut self, slice: &[u8]) { + self.compressed.extend_from_slice(&slice); + } + + pub fn msg(&mut self) -> Result, DecompressError> { + let length = self.compressed.len(); + if length >= 4 && self.compressed[(length - 4)..] == ZLIB_SUFFIX { + // There is a payload to be decompressed. + let before = self.decompress.total_in(); + let mut offset = 0; + loop { + self.internal_buffer.clear(); + + self.decompress.decompress_vec( + &self.compressed[offset..], + &mut self.internal_buffer, + FlushDecompress::Sync, + )?; + + offset = (self.decompress.total_in() - before).try_into().unwrap_or(0); + self.buffer.extend_from_slice(&self.internal_buffer[..]); + if self.internal_buffer.len() < self.internal_buffer.capacity() + || offset > self.compressed.len() + { + break; + } + } + + trace!("in:out: {}:{}", self.compressed.len(), self.buffer.len()); + self.compressed.clear(); + + #[allow(clippy::cast_precision_loss)] + { + // To get around the u64 → f64 precision loss lint + // it does really not matter that it happens here + trace!( + "Data saved: {}KiB ({:.2}%)", + ((self.decompress.total_out() - self.decompress.total_in()) / 1024), + ((self.decompress.total_in() as f64) / (self.decompress.total_out() as f64) + * 100.0) + ); + } + trace!("Capacity: {}", self.buffer.capacity()); + Ok(Some(&self.buffer)) + } else { + // Received a partial payload. + Ok(None) + } + } + + // Clear the buffer, and shrink it if it has more space + // enough to grow the length more than 4 times. + pub fn clear(&mut self) { + self.countdown_to_resize -= 1; + + // Only shrink capacity if it is less than 4 + // times the size, this is to prevent too + // frequent shrinking. + let cap = self.buffer.capacity(); + if self.countdown_to_resize == 0 && self.buffer.len() < cap * 4 { + // When shrink_to goes stable use that on the following line. + // https://github.com/rust-lang/rust/issues/56431 + self.compressed.shrink_to_fit(); + self.buffer.shrink_to_fit(); + trace!("compressed: {}", self.compressed.capacity()); + trace!("buffer: {}", self.buffer.capacity()); + self.countdown_to_resize = u8::max_value(); + } + self.compressed.clear(); + self.internal_buffer.clear(); + self.buffer.clear(); + } + + // Reset the inflater + pub fn reset(&mut self) { + self.decompress.reset(true); + self.compressed.clear(); + self.internal_buffer.clear(); + self.buffer.clear(); + self.countdown_to_resize = u8::MAX; + } +} diff --git a/src/internal/mod.rs b/src/internal/mod.rs index 2288c69bd09..60cae02765f 100644 --- a/src/internal/mod.rs +++ b/src/internal/mod.rs @@ -5,3 +5,9 @@ pub mod prelude; #[cfg(feature = "gateway")] pub mod ws_impl; + +#[cfg(feature = "transport_compression")] +mod inflater; + +#[cfg(feature = "transport_compression")] +pub(crate) use inflater::Inflater; diff --git a/src/internal/ws_impl.rs b/src/internal/ws_impl.rs index c063a0bc97a..06add176aeb 100644 --- a/src/internal/ws_impl.rs +++ b/src/internal/ws_impl.rs @@ -7,16 +7,17 @@ use std::{ use async_trait::async_trait; use async_tungstenite::tungstenite::Message; -use flate2::read::ZlibDecoder; -use futures::stream::SplitSink; +use async_tungstenite::{tokio::ConnectStream, WebSocketStream}; use futures::{SinkExt, StreamExt, TryStreamExt}; use tokio::time::timeout; use tracing::{instrument, warn}; use url::Url; -use crate::gateway::{GatewayError, WsStream}; +use crate::gateway::{GatewayError, WsClient}; use crate::internal::prelude::*; -use crate::json::{from_reader, from_str, to_string}; +#[cfg(feature = "transport_compression")] +use crate::internal::Inflater; +use crate::json::{from_str, to_string}; #[async_trait] pub trait ReceiverExt { @@ -30,47 +31,64 @@ pub trait SenderExt { } #[async_trait] -impl ReceiverExt for WsStream { +impl ReceiverExt for WsClient { async fn recv_json(&mut self) -> Result> { const TIMEOUT: tokio::time::Duration = tokio::time::Duration::from_millis(500); - let ws_message = match timeout(TIMEOUT, self.next()).await { + let ws_message = match timeout(TIMEOUT, self.stream.next()).await { Ok(Some(Ok(v))) => Some(v), Ok(Some(Err(e))) => return Err(e.into()), Ok(None) | Err(_) => None, }; - convert_ws_message(ws_message) + convert_ws_message( + #[cfg(feature = "transport_compression")] + &mut self.inflater, + ws_message, + ) } async fn try_recv_json(&mut self) -> Result> { - convert_ws_message(self.try_next().await.ok().flatten()) + convert_ws_message( + #[cfg(feature = "transport_compression")] + &mut self.inflater, + self.stream.try_next().await.ok().flatten(), + ) } } #[async_trait] -impl SenderExt for SplitSink { +impl SenderExt for WsClient { async fn send_json(&mut self, value: &Value) -> Result<()> { - Ok(to_string(value).map(Message::Text).map_err(Error::from).map(|m| self.send(m))?.await?) - } -} - -#[async_trait] -impl SenderExt for WsStream { - async fn send_json(&mut self, value: &Value) -> Result<()> { - Ok(to_string(value).map(Message::Text).map_err(Error::from).map(|m| self.send(m))?.await?) + Ok(to_string(value) + .map(Message::Text) + .map_err(Error::from) + .map(|m| self.stream.send(m))? + .await?) } } #[inline] -pub(crate) fn convert_ws_message(message: Option) -> Result> { +pub(crate) fn convert_ws_message( + #[cfg(feature = "transport_compression")] inflater: &mut Inflater, + message: Option, +) -> Result> { Ok(match message { + #[cfg(feature = "transport_compression")] Some(Message::Binary(bytes)) => { - from_reader(ZlibDecoder::new(&bytes[..])).map(Some).map_err(|why| { - warn!("Err deserializing bytes: {:?}; bytes: {:?}", why, bytes); - - why - })? + inflater.extend(&bytes); + match inflater.msg()? { + Some(msg) => { + let ret = serde_json::from_slice(&msg[..]).map(Some).map_err(|why| { + warn!("Err deserializing bytes: {:?}; bytes: {:?}", why, bytes); + + why + })?; + inflater.clear(); + ret + }, + None => None, + } }, Some(Message::Text(mut payload)) => from_str(&mut payload).map(Some).map_err(|why| { warn!("Err deserializing text: {:?}; text: {}", why, payload,); @@ -152,7 +170,7 @@ fn websocket_config() -> async_tungstenite::tungstenite::protocol::WebSocketConf #[cfg(all(feature = "rustls_backend_marker", not(feature = "native_tls_backend_marker")))] #[instrument] -pub(crate) async fn create_rustls_client(url: Url) -> Result { +pub(crate) async fn create_rustls_client(url: Url) -> Result> { let (stream, _) = async_tungstenite::tokio::connect_async_with_config::(url, Some(websocket_config())) .await @@ -163,7 +181,7 @@ pub(crate) async fn create_rustls_client(url: Url) -> Result { #[cfg(feature = "native_tls_backend_marker")] #[instrument] -pub(crate) async fn create_native_tls_client(url: Url) -> Result { +pub(crate) async fn create_native_tls_client(url: Url) -> Result> { let (stream, _) = async_tungstenite::tokio::connect_async_with_config::( url.into(), Some(websocket_config()), diff --git a/src/json.rs b/src/json.rs index f021b786b5b..cd84b038e8e 100644 --- a/src/json.rs +++ b/src/json.rs @@ -76,6 +76,7 @@ where Ok(simd_json::from_str(s)?) } +#[allow(dead_code)] #[cfg(not(feature = "simd-json"))] pub(crate) fn from_reader(r: R) -> Result where @@ -84,6 +85,8 @@ where { Ok(serde_json::from_reader(r)?) } + +#[allow(dead_code)] #[cfg(feature = "simd-json")] pub(crate) fn from_reader(r: R) -> Result where From e170aec791f813e14ba55111afd6dbb5a3dfbe12 Mon Sep 17 00:00:00 2001 From: Valdemar Erk Date: Sat, 18 Sep 2021 20:16:40 +0200 Subject: [PATCH 2/5] format --- src/model/invite.rs | 2 +- src/model/webhook.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/model/invite.rs b/src/model/invite.rs index 0b03d5f2093..55eb7e5a864 100644 --- a/src/model/invite.rs +++ b/src/model/invite.rs @@ -414,5 +414,5 @@ pub enum InviteTargetType { enum_number!(InviteTargetType { Normal, Stream, - EmmbeddedApplication, + EmmbeddedApplication }); diff --git a/src/model/webhook.rs b/src/model/webhook.rs index 764468e2d80..79f688ae4fd 100644 --- a/src/model/webhook.rs +++ b/src/model/webhook.rs @@ -43,7 +43,7 @@ pub enum WebhookType { enum_number!(WebhookType { Incoming, ChannelFollower, - Application, + Application }); impl WebhookType { From 98e8cb473f652edaca8f2203ee4deef2acb2baa7 Mon Sep 17 00:00:00 2001 From: Valdemar Erk Date: Sat, 18 Sep 2021 20:30:10 +0200 Subject: [PATCH 3/5] small fix --- src/gateway/shard.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index 778cd20710b..8f351a8fd00 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -816,7 +816,7 @@ async fn connect(base_url: &str) -> Result> { } #[cfg(feature = "native_tls_backend_marker")] -async fn connect(base_url: &str) -> Result { +async fn connect(base_url: &str) -> Result> { let url = build_gateway_url(base_url)?; Ok(create_native_tls_client(url).await?) @@ -827,6 +827,7 @@ fn build_gateway_url(base: &str) -> Result { const COMPRESSION: &str = "?compress=zlib-stream"; #[cfg(not(feature = "transport_compression"))] const COMPRESSION: &str = ""; + Url::parse(&format!("{}?v={}&encoding=json{}", base, constants::GATEWAY_VERSION, COMPRESSION)).map_err(|why| { warn!("Error building gateway URL with base `{}`: {:?}", base, why); From 390d596bc087dd805a3f22bfbfa957ae98b007ae Mon Sep 17 00:00:00 2001 From: Valdemar Erk Date: Sat, 18 Sep 2021 20:47:10 +0200 Subject: [PATCH 4/5] clippy --- src/internal/inflater.rs | 2 +- src/internal/ws_impl.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/internal/inflater.rs b/src/internal/inflater.rs index fe17ccd2938..0beca2d456a 100644 --- a/src/internal/inflater.rs +++ b/src/internal/inflater.rs @@ -26,7 +26,7 @@ impl Inflater { } pub fn extend(&mut self, slice: &[u8]) { - self.compressed.extend_from_slice(&slice); + self.compressed.extend_from_slice(slice); } pub fn msg(&mut self) -> Result, DecompressError> { diff --git a/src/internal/ws_impl.rs b/src/internal/ws_impl.rs index 06add176aeb..b9ce60cfd4c 100644 --- a/src/internal/ws_impl.rs +++ b/src/internal/ws_impl.rs @@ -79,7 +79,7 @@ pub(crate) fn convert_ws_message( inflater.extend(&bytes); match inflater.msg()? { Some(msg) => { - let ret = serde_json::from_slice(&msg[..]).map(Some).map_err(|why| { + let ret = serde_json::from_slice(msg).map(Some).map_err(|why| { warn!("Err deserializing bytes: {:?}; bytes: {:?}", why, bytes); why From f8d057db7e2327a0a9fc4505b530e054516e90cd Mon Sep 17 00:00:00 2001 From: Valdemar Erk Date: Sun, 19 Sep 2021 20:41:42 +0200 Subject: [PATCH 5/5] fmt again --- src/gateway/shard.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/gateway/shard.rs b/src/gateway/shard.rs index 8f351a8fd00..5e4058249a3 100644 --- a/src/gateway/shard.rs +++ b/src/gateway/shard.rs @@ -828,9 +828,10 @@ fn build_gateway_url(base: &str) -> Result { #[cfg(not(feature = "transport_compression"))] const COMPRESSION: &str = ""; - Url::parse(&format!("{}?v={}&encoding=json{}", base, constants::GATEWAY_VERSION, COMPRESSION)).map_err(|why| { - warn!("Error building gateway URL with base `{}`: {:?}", base, why); + Url::parse(&format!("{}?v={}&encoding=json{}", base, constants::GATEWAY_VERSION, COMPRESSION)) + .map_err(|why| { + warn!("Error building gateway URL with base `{}`: {:?}", base, why); - Error::Gateway(GatewayError::BuildingUrl) - }) + Error::Gateway(GatewayError::BuildingUrl) + }) }