Skip to content

Commit

Permalink
core/: Merge pending and established connection limits
Browse files Browse the repository at this point in the history
Merge pending and established limits for both incoming and outgoing
connections. More specifically merge
`ConnectionLimits::with_max_pending_incoming` with
`ConnectionLimits::with_max_established_incoming` and
`ConnectionLimits::with_max_pending_outgoing` with
`ConnectionLimits::with_max_established_outgoing`. Connection limits are
checked on `Network::dial` for outgoing and on `Network::accept` for
incoming connections.

This (a) simplifies connection limits from an implementations and user
perspective and (b) simplifies returning a connection handler on limit
error as limits can only be exceeded at the start of dialing and
accepting. See [1].

[1]: libp2p#2242 (comment)
  • Loading branch information
mxinden committed Sep 28, 2021
1 parent c13f033 commit e7f716f
Show file tree
Hide file tree
Showing 10 changed files with 117 additions and 215 deletions.
14 changes: 8 additions & 6 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,13 @@
# 0.30.0 [unreleased]

- Add `ConnectionLimit::with_max_established` (see [PR 2137]).
- Add `ConnectionLimits::with_max_established` (see [PR 2137]).

- Merge pending and established limits for both incoming and outgoing connections. More specifically
merge `ConnectionLimits::with_max_pending_incoming` with
`ConnectionLimits::with_max_established_incoming` and
`ConnectionLimits::with_max_pending_outgoing` with
`ConnectionLimits::with_max_established_outgoing`. Connection limits are checked on
`Network::dial` for outgoing and on `Network::accept` for incoming connections.

- Add `Keypair::to_protobuf_encoding` (see [PR 2142]).

Expand All @@ -22,11 +29,6 @@

- Remove `DisconnectedPeer::set_connected` and `Pool::add` (see [PR 2195]).

- Report `ConnectionLimit` error through `ConnectionError` and thus through
`NetworkEvent::ConnectionClosed` instead of previously through
`PendingConnectionError` and thus `NetworkEvent::{IncomingConnectionError,
DialError}` (see [PR 2191]).

- Report abortion of pending connection through `DialError`,
`UnknownPeerDialError` or `IncomingConnectionError` (see [PR 2191]).

Expand Down
9 changes: 0 additions & 9 deletions core/src/connection/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::connection::ConnectionLimit;
use crate::transport::TransportError;
use std::{fmt, io};

Expand All @@ -29,10 +28,6 @@ pub enum ConnectionError<THandlerErr> {
// TODO: Eventually this should also be a custom error?
IO(io::Error),

/// The connection was dropped because the connection limit
/// for a peer has been reached.
ConnectionLimit(ConnectionLimit),

/// The connection handler produced an error.
Handler(THandlerErr),
}
Expand All @@ -45,9 +40,6 @@ where
match self {
ConnectionError::IO(err) => write!(f, "Connection error: I/O error: {}", err),
ConnectionError::Handler(err) => write!(f, "Connection error: Handler error: {}", err),
ConnectionError::ConnectionLimit(l) => {
write!(f, "Connection error: Connection limit: {}.", l)
}
}
}
}
Expand All @@ -60,7 +52,6 @@ where
match self {
ConnectionError::IO(err) => Some(err),
ConnectionError::Handler(err) => Some(err),
ConnectionError::ConnectionLimit(..) => None,
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@

use super::{
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
Connected, ConnectedPoint, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
};
use crate::{muxing::StreamMuxer, Executor};
use fnv::FnvHashMap;
Expand Down Expand Up @@ -439,15 +439,15 @@ impl<'a, I> EstablishedEntry<'a, I> {
///
/// When the connection is ultimately closed, [`Event::ConnectionClosed`]
/// is emitted by [`Manager::poll`].
pub fn start_close(mut self, error: Option<ConnectionLimit>) {
pub fn start_close(mut self) {
// Clone the sender so that we are guaranteed to have
// capacity for the close command (every sender gets a slot).
match self
.task
.get_mut()
.sender
.clone()
.try_send(task::Command::Close(error))
.try_send(task::Command::Close)
{
Ok(()) => {}
Err(e) => assert!(e.is_disconnected(), "No capacity for close command."),
Expand Down
14 changes: 5 additions & 9 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ use crate::{
connection::{
self,
handler::{THandlerError, THandlerInEvent, THandlerOutEvent},
Close, Connected, Connection, ConnectionError, ConnectionHandler, ConnectionLimit,
IntoConnectionHandler, PendingConnectionError, Substream,
Close, Connected, Connection, ConnectionError, ConnectionHandler, IntoConnectionHandler,
PendingConnectionError, Substream,
},
muxing::StreamMuxer,
Multiaddr,
Expand All @@ -43,7 +43,7 @@ pub enum Command<T> {
NotifyHandler(T),
/// Gracefully close the connection (active close) before
/// terminating the task.
Close(Option<ConnectionLimit>),
Close,
}

/// Events that a task can emit to its manager.
Expand Down Expand Up @@ -163,7 +163,6 @@ where
Closing {
closing_muxer: Close<M>,
handler: H::Handler,
error: Option<ConnectionLimit>,
},

/// The task is terminating with a final event for the `Manager`.
Expand Down Expand Up @@ -257,15 +256,14 @@ where
Poll::Ready(Some(Command::NotifyHandler(event))) => {
connection.inject_event(event)
}
Poll::Ready(Some(Command::Close(error))) => {
Poll::Ready(Some(Command::Close)) => {
// Don't accept any further commands.
this.commands.get_mut().close();
// Discard the event, if any, and start a graceful close.
let (handler, closing_muxer) = connection.close();
this.state = State::Closing {
handler,
closing_muxer,
error,
};
continue 'poll;
}
Expand Down Expand Up @@ -340,15 +338,14 @@ where

State::Closing {
handler,
error,
mut closing_muxer,
} => {
// Try to gracefully close the connection.
match closing_muxer.poll_unpin(cx) {
Poll::Ready(Ok(())) => {
let event = Event::Closed {
id: this.id,
error: error.map(ConnectionError::ConnectionLimit),
error: None,
handler,
};
this.state = State::Terminating(event);
Expand All @@ -364,7 +361,6 @@ where
Poll::Pending => {
this.state = State::Closing {
handler,
error,
closing_muxer,
};
return Poll::Pending;
Expand Down
Loading

0 comments on commit e7f716f

Please sign in to comment.