Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit a0931f3

Browse files
tomakagavofyork
authored andcommitted
Make the behaviour in libp2p generic (#2525)
* Make the behaviour in libp2p generic * Fix indentation * Fix bad merge
1 parent 3593edc commit a0931f3

File tree

4 files changed

+138
-138
lines changed

4 files changed

+138
-138
lines changed

core/network-libp2p/src/behaviour.rs

+102-116
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
// You should have received a copy of the GNU General Public License
1515
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
1616

17-
use crate::custom_proto::{CustomProto, CustomProtoOut, RegisteredProtocol};
17+
use crate::DiscoveryNetBehaviour;
1818
use futures::prelude::*;
1919
use libp2p::NetworkBehaviour;
20-
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, PublicKey};
20+
use libp2p::core::{Multiaddr, PeerId, ProtocolsHandler, protocols_handler::IntoProtocolsHandler, PublicKey};
2121
use libp2p::core::swarm::{ConnectedPoint, NetworkBehaviour, NetworkBehaviourAction};
2222
use libp2p::core::swarm::{NetworkBehaviourEventProcess, PollParameters};
2323
#[cfg(not(target_os = "unknown"))]
@@ -29,19 +29,19 @@ use libp2p::mdns::{Mdns, MdnsEvent};
2929
use libp2p::multiaddr::Protocol;
3030
use libp2p::ping::{Ping, PingConfig, PingEvent, PingSuccess};
3131
use log::{debug, info, trace, warn};
32-
use std::{borrow::Cow, cmp, time::Duration};
32+
use std::{cmp, iter, time::Duration};
3333
use tokio_io::{AsyncRead, AsyncWrite};
3434
use tokio_timer::{Delay, clock::Clock};
3535
use void;
3636

3737
/// General behaviour of the network.
3838
#[derive(NetworkBehaviour)]
39-
#[behaviour(out_event = "BehaviourOut<TMessage>", poll_method = "poll")]
40-
pub struct Behaviour<TMessage, TSubstream> {
39+
#[behaviour(out_event = "BehaviourOut<TBehaviourEv>", poll_method = "poll")]
40+
pub struct Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
41+
/// Main protocol that handles everything except the discovery and the technicalities.
42+
user_protocol: UserBehaviourWrap<TBehaviour>,
4143
/// Periodically ping nodes, and close the connection if it's unresponsive.
4244
ping: Ping<TSubstream>,
43-
/// Custom protocols (dot, bbq, sub, etc.).
44-
custom_protocols: CustomProto<TMessage, TSubstream>,
4545
/// Discovers nodes of the network. Defined below.
4646
discovery: DiscoveryBehaviour<TSubstream>,
4747
/// Periodically identifies the remote and responds to incoming requests.
@@ -52,26 +52,23 @@ pub struct Behaviour<TMessage, TSubstream> {
5252

5353
/// Queue of events to produce for the outside.
5454
#[behaviour(ignore)]
55-
events: Vec<BehaviourOut<TMessage>>,
55+
events: Vec<BehaviourOut<TBehaviourEv>>,
5656
}
5757

58-
impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
58+
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
5959
/// Builds a new `Behaviour`.
6060
pub fn new(
61+
user_protocol: TBehaviour,
6162
user_agent: String,
6263
local_public_key: PublicKey,
63-
protocol: RegisteredProtocol<TMessage>,
6464
known_addresses: Vec<(PeerId, Multiaddr)>,
65-
peerset: substrate_peerset::Peerset,
6665
enable_mdns: bool,
6766
) -> Self {
6867
let identify = {
6968
let proto_version = "/substrate/1.0".to_string();
7069
Identify::new(proto_version, user_agent, local_public_key.clone())
7170
};
7271

73-
let custom_protocols = CustomProto::new(protocol, peerset);
74-
7572
let mut kademlia = Kademlia::new(local_public_key.clone().into_peer_id());
7673
for (peer_id, addr) in &known_addresses {
7774
kademlia.add_connected_address(peer_id, addr.clone());
@@ -84,8 +81,8 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
8481

8582
let clock = Clock::new();
8683
Behaviour {
84+
user_protocol: UserBehaviourWrap(user_protocol),
8785
ping: Ping::new(PingConfig::new()),
88-
custom_protocols,
8986
discovery: DiscoveryBehaviour {
9087
user_defined: known_addresses,
9188
kademlia,
@@ -111,95 +108,34 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
111108
}
112109
}
113110

114-
/// Sends a message to a peer.
115-
///
116-
/// Has no effect if the custom protocol is not open with the given peer.
117-
///
118-
/// Also note that even we have a valid open substream, it may in fact be already closed
119-
/// without us knowing, in which case the packet will not be received.
120-
#[inline]
121-
pub fn send_custom_message(&mut self, target: &PeerId, data: TMessage) {
122-
self.custom_protocols.send_packet(target, data)
123-
}
124-
125111
/// Returns the list of nodes that we know exist in the network.
126112
pub fn known_peers(&self) -> impl Iterator<Item = &PeerId> {
127113
self.discovery.kademlia.kbuckets_entries()
128114
}
129115

130-
/// Returns true if we try to open protocols with the given peer.
131-
pub fn is_enabled(&self, peer_id: &PeerId) -> bool {
132-
self.custom_protocols.is_enabled(peer_id)
133-
}
134-
135-
/// Returns true if we have an open protocol with the given peer.
136-
pub fn is_open(&self, peer_id: &PeerId) -> bool {
137-
self.custom_protocols.is_open(peer_id)
138-
}
139-
140116
/// Adds a hard-coded address for the given peer, that never expires.
141117
pub fn add_known_address(&mut self, peer_id: PeerId, addr: Multiaddr) {
142118
if self.discovery.user_defined.iter().all(|(p, a)| *p != peer_id && *a != addr) {
143119
self.discovery.user_defined.push((peer_id, addr));
144120
}
145121
}
146122

147-
/// Disconnects the custom protocols from a peer.
148-
///
149-
/// The peer will still be able to use Kademlia or other protocols, but will get disconnected
150-
/// after a few seconds of inactivity.
151-
///
152-
/// This is asynchronous and does not instantly close the custom protocols.
153-
/// Corresponding closing events will be generated once the closing actually happens.
154-
///
155-
/// Has no effect if we're not connected to the `PeerId`.
156-
#[inline]
157-
pub fn drop_node(&mut self, peer_id: &PeerId) {
158-
self.custom_protocols.disconnect_peer(peer_id)
123+
/// Returns a shared reference to the user protocol.
124+
pub fn user_protocol(&self) -> &TBehaviour {
125+
&self.user_protocol.0
159126
}
160127

161-
/// Returns the state of the peerset manager, for debugging purposes.
162-
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
163-
self.custom_protocols.peerset_debug_info()
128+
/// Returns a mutable reference to the user protocol.
129+
pub fn user_protocol_mut(&mut self) -> &mut TBehaviour {
130+
&mut self.user_protocol.0
164131
}
165132
}
166133

167134
/// Event that can be emitted by the behaviour.
168135
#[derive(Debug)]
169-
pub enum BehaviourOut<TMessage> {
170-
/// Opened a custom protocol with the remote.
171-
CustomProtocolOpen {
172-
/// Version of the protocol that has been opened.
173-
version: u8,
174-
/// Id of the node we have opened a connection with.
175-
peer_id: PeerId,
176-
/// Endpoint used for this custom protocol.
177-
endpoint: ConnectedPoint,
178-
},
179-
180-
/// Closed a custom protocol with the remote.
181-
CustomProtocolClosed {
182-
/// Id of the peer we were connected to.
183-
peer_id: PeerId,
184-
/// Reason why the substream closed, for diagnostic purposes.
185-
reason: Cow<'static, str>,
186-
},
187-
188-
/// Receives a message on a custom protocol substream.
189-
CustomMessage {
190-
/// Id of the peer the message came from.
191-
peer_id: PeerId,
192-
/// Message that has been received.
193-
message: TMessage,
194-
},
195-
196-
/// A substream with a remote is clogged. We should avoid sending more data to it if possible.
197-
Clogged {
198-
/// Id of the peer the message came from.
199-
peer_id: PeerId,
200-
/// Copy of the messages that are within the buffer, for further diagnostic.
201-
messages: Vec<TMessage>,
202-
},
136+
pub enum BehaviourOut<TBehaviourEv> {
137+
/// Message from the user protocol.
138+
UserProtocol(TBehaviourEv),
203139

204140
/// We have obtained debug information from a peer.
205141
Identified {
@@ -218,38 +154,21 @@ pub enum BehaviourOut<TMessage> {
218154
},
219155
}
220156

221-
impl<TMessage> From<CustomProtoOut<TMessage>> for BehaviourOut<TMessage> {
222-
fn from(other: CustomProtoOut<TMessage>) -> BehaviourOut<TMessage> {
223-
match other {
224-
CustomProtoOut::CustomProtocolOpen { version, peer_id, endpoint } => {
225-
BehaviourOut::CustomProtocolOpen { version, peer_id, endpoint }
226-
}
227-
CustomProtoOut::CustomProtocolClosed { peer_id, reason } => {
228-
BehaviourOut::CustomProtocolClosed { peer_id, reason }
229-
}
230-
CustomProtoOut::CustomMessage { peer_id, message } => {
231-
BehaviourOut::CustomMessage { peer_id, message }
232-
}
233-
CustomProtoOut::Clogged { peer_id, messages } => {
234-
BehaviourOut::Clogged { peer_id, messages }
235-
}
236-
}
237-
}
238-
}
239-
240-
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TMessage, TSubstream> {
157+
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<void::Void> for Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
241158
fn inject_event(&mut self, event: void::Void) {
242159
void::unreachable(event)
243160
}
244161
}
245162

246-
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<CustomProtoOut<TMessage>> for Behaviour<TMessage, TSubstream> {
247-
fn inject_event(&mut self, event: CustomProtoOut<TMessage>) {
248-
self.events.push(event.into());
163+
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<UserEventWrap<TBehaviourEv>> for Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
164+
fn inject_event(&mut self, event: UserEventWrap<TBehaviourEv>) {
165+
self.events.push(BehaviourOut::UserProtocol(event.0));
249166
}
250167
}
251168

252-
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behaviour<TMessage, TSubstream> {
169+
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent>
170+
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
171+
where TBehaviour: DiscoveryNetBehaviour {
253172
fn inject_event(&mut self, event: IdentifyEvent) {
254173
match event {
255174
IdentifyEvent::Identified { peer_id, mut info, .. } => {
@@ -270,7 +189,7 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behav
270189
for addr in &info.listen_addrs {
271190
self.discovery.kademlia.add_connected_address(&peer_id, addr.clone());
272191
}
273-
self.custom_protocols.add_discovered_nodes(Some(peer_id.clone()));
192+
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id.clone()));
274193
self.events.push(BehaviourOut::Identified { peer_id, info });
275194
}
276195
IdentifyEvent::Error { .. } => {}
@@ -282,12 +201,14 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<IdentifyEvent> for Behav
282201
}
283202
}
284203

285-
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behaviour<TMessage, TSubstream> {
204+
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<KademliaOut>
205+
for Behaviour<TBehaviour, TBehaviourEv, TSubstream>
206+
where TBehaviour: DiscoveryNetBehaviour {
286207
fn inject_event(&mut self, out: KademliaOut) {
287208
match out {
288209
KademliaOut::Discovered { .. } => {}
289210
KademliaOut::KBucketAdded { peer_id, .. } => {
290-
self.custom_protocols.add_discovered_nodes(Some(peer_id));
211+
self.user_protocol.0.add_discovered_nodes(iter::once(peer_id));
291212
}
292213
KademliaOut::FindNodeResult { key, closer_peers } => {
293214
trace!(target: "sub-libp2p", "Libp2p => Query for {:?} yielded {:?} results",
@@ -303,7 +224,7 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<KademliaOut> for Behavio
303224
}
304225
}
305226

306-
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TMessage, TSubstream> {
227+
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
307228
fn inject_event(&mut self, event: PingEvent) {
308229
match event {
309230
PingEvent { peer, result: Ok(PingSuccess::Ping { rtt }) } => {
@@ -316,19 +237,21 @@ impl<TMessage, TSubstream> NetworkBehaviourEventProcess<PingEvent> for Behaviour
316237
}
317238

318239
#[cfg(not(target_os = "unknown"))]
319-
impl<TMessage, TSubstream> NetworkBehaviourEventProcess<MdnsEvent> for Behaviour<TMessage, TSubstream> {
240+
impl<TBehaviour, TBehaviourEv, TSubstream> NetworkBehaviourEventProcess<MdnsEvent> for
241+
Behaviour<TBehaviour, TBehaviourEv, TSubstream>
242+
where TBehaviour: DiscoveryNetBehaviour {
320243
fn inject_event(&mut self, event: MdnsEvent) {
321244
match event {
322245
MdnsEvent::Discovered(list) => {
323-
self.custom_protocols.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id));
246+
self.user_protocol.0.add_discovered_nodes(list.into_iter().map(|(peer_id, _)| peer_id));
324247
},
325248
MdnsEvent::Expired(_) => {}
326249
}
327250
}
328251
}
329252

330-
impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
331-
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<TMessage>>> {
253+
impl<TBehaviour, TBehaviourEv, TSubstream> Behaviour<TBehaviour, TBehaviourEv, TSubstream> {
254+
fn poll<TEv>(&mut self) -> Async<NetworkBehaviourAction<TEv, BehaviourOut<TBehaviourEv>>> {
332255
if !self.events.is_empty() {
333256
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)))
334257
}
@@ -337,6 +260,69 @@ impl<TMessage, TSubstream> Behaviour<TMessage, TSubstream> {
337260
}
338261
}
339262

263+
/// Because of limitations with the network behaviour custom derive and trait impl duplication, we
264+
/// have to wrap the user protocol into a struct.
265+
pub struct UserBehaviourWrap<TInner>(TInner);
266+
/// Event produced by `UserBehaviourWrap`.
267+
pub struct UserEventWrap<TInner>(TInner);
268+
impl<TInner: NetworkBehaviour> NetworkBehaviour for UserBehaviourWrap<TInner> {
269+
type ProtocolsHandler = TInner::ProtocolsHandler;
270+
type OutEvent = UserEventWrap<TInner::OutEvent>;
271+
fn new_handler(&mut self) -> Self::ProtocolsHandler { self.0.new_handler() }
272+
fn addresses_of_peer(&mut self, peer_id: &PeerId) -> Vec<Multiaddr> {
273+
self.0.addresses_of_peer(peer_id)
274+
}
275+
fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
276+
self.0.inject_connected(peer_id, endpoint)
277+
}
278+
fn inject_disconnected(&mut self, peer_id: &PeerId, endpoint: ConnectedPoint) {
279+
self.0.inject_disconnected(peer_id, endpoint)
280+
}
281+
fn inject_node_event(
282+
&mut self,
283+
peer_id: PeerId,
284+
event: <<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::OutEvent
285+
) {
286+
self.0.inject_node_event(peer_id, event)
287+
}
288+
fn poll(
289+
&mut self,
290+
params: &mut PollParameters
291+
) -> Async<NetworkBehaviourAction<<<Self::ProtocolsHandler as IntoProtocolsHandler>::Handler as ProtocolsHandler>::InEvent, Self::OutEvent>> {
292+
match self.0.poll(params) {
293+
Async::NotReady => Async::NotReady,
294+
Async::Ready(NetworkBehaviourAction::GenerateEvent(ev)) =>
295+
Async::Ready(NetworkBehaviourAction::GenerateEvent(UserEventWrap(ev))),
296+
Async::Ready(NetworkBehaviourAction::DialAddress { address }) =>
297+
Async::Ready(NetworkBehaviourAction::DialAddress { address }),
298+
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }) =>
299+
Async::Ready(NetworkBehaviourAction::DialPeer { peer_id }),
300+
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }) =>
301+
Async::Ready(NetworkBehaviourAction::SendEvent { peer_id, event }),
302+
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }) =>
303+
Async::Ready(NetworkBehaviourAction::ReportObservedAddr { address }),
304+
}
305+
}
306+
fn inject_replaced(&mut self, peer_id: PeerId, closed_endpoint: ConnectedPoint, new_endpoint: ConnectedPoint) {
307+
self.0.inject_replaced(peer_id, closed_endpoint, new_endpoint)
308+
}
309+
fn inject_addr_reach_failure(&mut self, peer_id: Option<&PeerId>, addr: &Multiaddr, error: &dyn std::error::Error) {
310+
self.0.inject_addr_reach_failure(peer_id, addr, error)
311+
}
312+
fn inject_dial_failure(&mut self, peer_id: &PeerId) {
313+
self.0.inject_dial_failure(peer_id)
314+
}
315+
fn inject_new_listen_addr(&mut self, addr: &Multiaddr) {
316+
self.0.inject_new_listen_addr(addr)
317+
}
318+
fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) {
319+
self.0.inject_expired_listen_addr(addr)
320+
}
321+
fn inject_new_external_addr(&mut self, addr: &Multiaddr) {
322+
self.0.inject_new_external_addr(addr)
323+
}
324+
}
325+
340326
/// Implementation of `NetworkBehaviour` that discovers the nodes on the network.
341327
pub struct DiscoveryBehaviour<TSubstream> {
342328
/// User-defined list of nodes and their addresses. Typically includes bootstrap nodes and

core/network-libp2p/src/custom_proto/behaviour.rs

+10-8
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
// You should have received a copy of the GNU General Public License
1515
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
1616

17+
use crate::DiscoveryNetBehaviour;
1718
use crate::custom_proto::handler::{CustomProtoHandlerProto, CustomProtoHandlerOut, CustomProtoHandlerIn};
1819
use crate::custom_proto::upgrade::{CustomMessage, RegisteredProtocol};
1920
use fnv::FnvHashMap;
@@ -348,14 +349,6 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
348349
});
349350
}
350351

351-
/// Indicates to the peerset that we have discovered new addresses for a given node.
352-
pub fn add_discovered_nodes<I: IntoIterator<Item = PeerId>>(&mut self, peer_ids: I) {
353-
self.peerset.discovered(peer_ids.into_iter().map(|peer_id| {
354-
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
355-
peer_id
356-
}));
357-
}
358-
359352
/// Returns the state of the peerset manager, for debugging purposes.
360353
pub fn peerset_debug_info(&mut self) -> serde_json::Value {
361354
self.peerset.debug_info()
@@ -595,6 +588,15 @@ impl<TMessage, TSubstream> CustomProto<TMessage, TSubstream> {
595588
}
596589
}
597590

591+
impl<TMessage, TSubstream> DiscoveryNetBehaviour for CustomProto<TMessage, TSubstream> {
592+
fn add_discovered_nodes(&mut self, peer_ids: impl Iterator<Item = PeerId>) {
593+
self.peerset.discovered(peer_ids.into_iter().map(|peer_id| {
594+
debug!(target: "sub-libp2p", "PSM <= Discovered({:?})", peer_id);
595+
peer_id
596+
}));
597+
}
598+
}
599+
598600
impl<TMessage, TSubstream> NetworkBehaviour for CustomProto<TMessage, TSubstream>
599601
where
600602
TSubstream: AsyncRead + AsyncWrite,

0 commit comments

Comments
 (0)