Skip to content

Commit

Permalink
Added polkadot multisig client (#2341)
Browse files Browse the repository at this point in the history
Co-authored-by: kylezs <kyle@chainflip.io>
  • Loading branch information
j4m1ef0rd and kylezs authored Oct 28, 2022
1 parent 5a922b9 commit f8354f7
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 38 deletions.
21 changes: 19 additions & 2 deletions engine/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::sync::Arc;

use crate::multisig::eth::EthSigning;
use crate::multisig::{eth::EthSigning, polkadot::PolkadotSigning};
use anyhow::Context;

use chainflip_engine::{
Expand Down Expand Up @@ -129,6 +129,8 @@ fn main() -> anyhow::Result<()> {
let (
eth_outgoing_sender,
eth_incoming_receiver,
dot_outgoing_sender,
dot_incoming_receiver,
peer_update_sender,
p2p_fut,
) = p2p::start(state_chain_client.clone(), settings.node_p2p, latest_block_hash, &root_logger).await.context("Failed to start p2p module")?;
Expand All @@ -138,7 +140,7 @@ fn main() -> anyhow::Result<()> {
let (eth_multisig_client, eth_multisig_client_backend_future) =
multisig::start_client::<EthSigning>(
state_chain_client.our_account_id.clone(),
KeyStore::new(db),
KeyStore::new(db.clone()),
eth_incoming_receiver,
eth_outgoing_sender,
latest_ceremony_id,
Expand All @@ -149,6 +151,20 @@ fn main() -> anyhow::Result<()> {
eth_multisig_client_backend_future
);

let (dot_multisig_client, dot_multisig_client_backend_future) =
multisig::start_client::<PolkadotSigning>(
state_chain_client.our_account_id.clone(),
KeyStore::new(db),
dot_incoming_receiver,
dot_outgoing_sender,
latest_ceremony_id,
&root_logger,
);

scope.spawn(
dot_multisig_client_backend_future
);

// Start eth witnessers
scope.spawn(
eth::contract_witnesser::start(
Expand Down Expand Up @@ -193,6 +209,7 @@ fn main() -> anyhow::Result<()> {
state_chain_block_stream,
eth_broadcaster,
eth_multisig_client,
dot_multisig_client,
peer_update_sender,
epoch_start_sender,
#[cfg(feature = "ibiza")] eth_monitor_ingress_sender,
Expand Down
2 changes: 1 addition & 1 deletion engine/src/multisig/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod crypto;
/// Storage for the keys
pub mod db;

pub use crypto::{eth, ChainTag, CryptoScheme, Rng};
pub use crypto::{eth, polkadot, ChainTag, CryptoScheme, Rng};

#[cfg(test)]
mod tests;
Expand Down
22 changes: 18 additions & 4 deletions engine/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use std::{

use crate::{
common::read_clean_and_decode_hex_str_file,
multisig::{eth::EthSigning, CryptoScheme},
multisig::{eth::EthSigning, polkadot::PolkadotSigning, CryptoScheme},
settings::P2P as P2PSettings,
};

Expand Down Expand Up @@ -67,6 +67,8 @@ pub async fn start(
) -> anyhow::Result<(
MultisigMessageSender<EthSigning>,
MultisigMessageReceiver<EthSigning>,
MultisigMessageSender<PolkadotSigning>,
MultisigMessageReceiver<PolkadotSigning>,
UnboundedSender<PeerUpdate>,
impl Future<Output = anyhow::Result<()>>,
)> {
Expand Down Expand Up @@ -103,8 +105,13 @@ pub async fn start(
p2p_fut,
) = core::start(&node_key, settings.port, current_peers, our_account_id, logger);

let (eth_outgoing_sender, eth_incoming_receiver, muxer_future) =
P2PMuxer::start(incoming_message_receiver, outgoing_message_sender, logger);
let (
eth_outgoing_sender,
eth_incoming_receiver,
dot_outgoing_sender,
dot_incoming_receiver,
muxer_future,
) = P2PMuxer::start(incoming_message_receiver, outgoing_message_sender, logger);

let logger = logger.clone();

Expand Down Expand Up @@ -135,5 +142,12 @@ pub async fn start(
.boxed()
});

Ok((eth_outgoing_sender, eth_incoming_receiver, peer_update_sender, fut))
Ok((
eth_outgoing_sender,
eth_incoming_receiver,
dot_outgoing_sender,
dot_incoming_receiver,
peer_update_sender,
fut,
))
}
29 changes: 21 additions & 8 deletions engine/src/p2p/muxer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};

use crate::{
logging::COMPONENT_KEY,
multisig::{eth::EthSigning, ChainTag},
multisig::{eth::EthSigning, polkadot::PolkadotSigning, ChainTag},
p2p::{MultisigMessageReceiver, MultisigMessageSender, OutgoingMultisigStageMessages},
};

Expand All @@ -14,6 +14,8 @@ pub struct P2PMuxer {
all_outgoing_sender: UnboundedSender<OutgoingMultisigStageMessages>,
eth_incoming_sender: UnboundedSender<(AccountId, Vec<u8>)>,
eth_outgoing_receiver: UnboundedReceiver<OutgoingMultisigStageMessages>,
dot_incoming_sender: UnboundedSender<(AccountId, Vec<u8>)>,
dot_outgoing_receiver: UnboundedReceiver<OutgoingMultisigStageMessages>,
logger: slog::Logger,
}

Expand Down Expand Up @@ -89,16 +91,23 @@ impl P2PMuxer {
) -> (
MultisigMessageSender<EthSigning>,
MultisigMessageReceiver<EthSigning>,
MultisigMessageSender<PolkadotSigning>,
MultisigMessageReceiver<PolkadotSigning>,
impl Future<Output = ()>,
) {
let (eth_outgoing_sender, eth_outgoing_receiver) = tokio::sync::mpsc::unbounded_channel();
let (eth_incoming_sender, eth_incoming_receiver) = tokio::sync::mpsc::unbounded_channel();

let (dot_outgoing_sender, dot_outgoing_receiver) = tokio::sync::mpsc::unbounded_channel();
let (dot_incoming_sender, dot_incoming_receiver) = tokio::sync::mpsc::unbounded_channel();

let muxer = P2PMuxer {
all_incoming_receiver,
all_outgoing_sender,
eth_outgoing_receiver,
eth_incoming_sender,
dot_outgoing_receiver,
dot_incoming_sender,
logger: logger.new(slog::o!(COMPONENT_KEY => "P2PMuxer")),
};

Expand All @@ -107,6 +116,8 @@ impl P2PMuxer {
(
MultisigMessageSender::<EthSigning>::new(eth_outgoing_sender),
MultisigMessageReceiver::<EthSigning>::new(eth_incoming_receiver),
MultisigMessageSender::<PolkadotSigning>::new(dot_outgoing_sender),
MultisigMessageReceiver::<PolkadotSigning>::new(dot_incoming_receiver),
muxer_fut,
)
}
Expand All @@ -123,10 +134,9 @@ impl P2PMuxer {
.expect("eth receiver dropped");
},
ChainTag::Polkadot => {
slog::trace!(
self.logger,
"ignoring p2p message: polkadot scheme not yet supported",
)
self.dot_incoming_sender
.send((account_id, payload.to_owned()))
.expect("polkadot receiver dropped");
},
},
Err(err) => {
Expand Down Expand Up @@ -174,6 +184,9 @@ impl P2PMuxer {
Some(data) = self.eth_outgoing_receiver.recv() => {
self.process_outgoing(ChainTag::Ethereum, data).await;
}
Some(data) = self.dot_outgoing_receiver.recv() => {
self.process_outgoing(ChainTag::Polkadot, data).await;
}
}
}
}
Expand Down Expand Up @@ -203,7 +216,7 @@ mod tests {
tokio::sync::mpsc::unbounded_channel();
let (_, p2p_incoming_receiver) = tokio::sync::mpsc::unbounded_channel();

let (eth_outgoing_sender, _, muxer_future) =
let (eth_outgoing_sender, _, _, _, muxer_future) =
P2PMuxer::start(p2p_incoming_receiver, p2p_outgoing_sender, &logger);

let _jh = tokio::task::spawn(muxer_future);
Expand Down Expand Up @@ -231,7 +244,7 @@ mod tests {
tokio::sync::mpsc::unbounded_channel();
let (_, p2p_incoming_receiver) = tokio::sync::mpsc::unbounded_channel();

let (eth_outgoing_sender, _, muxer_future) =
let (eth_outgoing_sender, _, _, _, muxer_future) =
P2PMuxer::start(p2p_incoming_receiver, p2p_outgoing_sender, &logger);

let _jh = tokio::task::spawn(muxer_future);
Expand Down Expand Up @@ -272,7 +285,7 @@ mod tests {
let (p2p_outgoing_sender, _p2p_outgoing_receiver) = tokio::sync::mpsc::unbounded_channel();
let (p2p_incoming_sender, p2p_incoming_receiver) = tokio::sync::mpsc::unbounded_channel();

let (_eth_outgoing_sender, mut eth_incoming_receiver, muxer_future) =
let (_eth_outgoing_sender, mut eth_incoming_receiver, _, _, muxer_future) =
P2PMuxer::start(p2p_incoming_receiver, p2p_outgoing_sender, &logger);

tokio::spawn(muxer_future);
Expand Down
24 changes: 17 additions & 7 deletions engine/src/state_chain_observer/sc_observer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use crate::{
logging::COMPONENT_KEY,
multisig::{
client::{KeygenFailureReason, MultisigClientApi},
eth::EthSigning,
polkadot::PolkadotSigning,
KeyId, MessageHash,
},
p2p::{PeerInfo, PeerUpdate},
Expand All @@ -42,7 +44,7 @@ async fn handle_keygen_request<'a, MultisigClient, RpcClient>(
keygen_participants: BTreeSet<AccountId32>,
logger: slog::Logger,
) where
MultisigClient: MultisigClientApi<crate::multisig::eth::EthSigning> + Send + Sync + 'static,
MultisigClient: MultisigClientApi<EthSigning> + Send + Sync + 'static,
RpcClient: StateChainRpcApi + Send + Sync + 'static,
{
if keygen_participants.contains(&state_chain_client.our_account_id) {
Expand Down Expand Up @@ -89,7 +91,7 @@ async fn handle_signing_request<'a, MultisigClient, RpcClient>(
data: MessageHash,
logger: slog::Logger,
) where
MultisigClient: MultisigClientApi<crate::multisig::eth::EthSigning> + Send + Sync + 'static,
MultisigClient: MultisigClientApi<EthSigning> + Send + Sync + 'static,
RpcClient: StateChainRpcApi + Send + Sync + 'static,
{
if signers.contains(&state_chain_client.our_account_id) {
Expand Down Expand Up @@ -159,11 +161,12 @@ macro_rules! match_event {
}}
}

pub async fn start<BlockStream, RpcClient, EthRpc, MultisigClient>(
pub async fn start<BlockStream, RpcClient, EthRpc, EthMultisigClient, PolkadotMultisigClient>(
state_chain_client: Arc<StateChainClient<RpcClient>>,
sc_block_stream: BlockStream,
eth_broadcaster: EthBroadcaster<EthRpc>,
multisig_client: Arc<MultisigClient>,
eth_multisig_client: Arc<EthMultisigClient>,
dot_multisig_client: Arc<PolkadotMultisigClient>,
peer_update_sender: UnboundedSender<PeerUpdate>,
epoch_start_sender: broadcast::Sender<EpochStart>,
#[cfg(feature = "ibiza")] eth_monitor_ingress_sender: tokio::sync::mpsc::UnboundedSender<H160>,
Expand All @@ -181,7 +184,8 @@ where
BlockStream: Stream<Item = anyhow::Result<state_chain_runtime::Header>> + Send + 'static,
RpcClient: StateChainRpcApi + Send + Sync + 'static,
EthRpc: EthRpcApi + Send + Sync + 'static,
MultisigClient: MultisigClientApi<crate::multisig::eth::EthSigning> + Send + Sync + 'static,
EthMultisigClient: MultisigClientApi<EthSigning> + Send + Sync + 'static,
PolkadotMultisigClient: MultisigClientApi<PolkadotSigning> + Send + Sync + 'static,
{
with_task_scope(|scope| async {
let logger = logger.new(o!(COMPONENT_KEY => "SCObserver"));
Expand Down Expand Up @@ -322,9 +326,12 @@ where
keygen_participants,
),
) => {
// Ceremony id tracking is global, so update all other clients
dot_multisig_client.update_latest_ceremony_id(ceremony_id);

handle_keygen_request(
scope,
multisig_client.clone(),
eth_multisig_client.clone(),
state_chain_client.clone(),
ceremony_id,
keygen_participants,
Expand All @@ -339,9 +346,12 @@ where
payload,
),
) => {
// Ceremony id tracking is global, so update all other clients
dot_multisig_client.update_latest_ceremony_id(ceremony_id);

handle_signing_request(
scope,
multisig_client.clone(),
eth_multisig_client.clone(),
state_chain_client.clone(),
ceremony_id,
KeyId(key_id),
Expand Down
Loading

0 comments on commit f8354f7

Please sign in to comment.