Skip to content
This repository has been archived by the owner on Sep 13, 2022. It is now read-only.

feat(network): tag consensus peer #364

Merged
3 changes: 2 additions & 1 deletion built-in-services/metadata/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
rlp = "0.4"
bytes = "0.5"
derive_more = "0.15"
derive_more = "0.99"
byteorder = "1.3"

[dev-dependencies]
hex = "0.4"
cita_trie = "2.0"
async-trait = "0.1"
framework = { path = "../../framework" }
5 changes: 3 additions & 2 deletions built-in-services/metadata/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ fn mock_metadata() -> Metadata {
cycles_price: 1,
interval: 3000,
verifier_list: [ValidatorExtend {
bls_pub_key: Hex::from_string("0x04188ef9488c19458a963cc57b567adde7db8f8b6bec392d5cb7b67b0abc1ed6cd966edc451f6ac2ef38079460eb965e890d1f576e4039a20467820237cda753f07a8b8febae1ec052190973a1bcf00690ea8fc0168b3fbbccd1c4e402eda5ef22".to_owned()).unwrap(),
address: Address::from_hex("0xCAB8EEA4799C21379C20EF5BAA2CC8AF1BEC475B").unwrap(),
bls_pub_key: Hex::from_string("0x0401139331589f32220ec5f41f6faa0f5c3f4d36af011ab014cefd9d8f36b53b04a2031f681d1c9648a2a5d534d742931b0a5a4132da9ee752c1144d6396bed6cfc635c9687258cec9b60b387d35cf9e13f29091e11ae88024d74ca904c0ea3fb3".to_owned()).unwrap(),
pub_key: Hex::from_string("0x026c184a9016f6f71a234c86b141621f38b68c78602ab06768db4d83682c616004".to_owned()).unwrap(),
address: Address::from_hex("0x76961e339fe2f1f931d84c425754806fb4174c34").unwrap(),
propose_weight: 1,
vote_weight: 1,
}]
Expand Down
4 changes: 2 additions & 2 deletions core/api/src/schema/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub struct Proof {
#[derive(juniper::GraphQLObject, Clone)]
#[graphql(description = "Validator address set")]
pub struct Validator {
pub address: Address,
pub pubkey: Bytes,
pub propose_weight: i32,
pub vote_weight: i32,
}
Expand Down Expand Up @@ -142,7 +142,7 @@ impl From<protocol::types::Proof> for Proof {
impl From<protocol::types::Validator> for Validator {
fn from(validator: protocol::types::Validator) -> Self {
Validator {
address: Address::from(validator.address),
pubkey: Bytes::from(validator.pub_key),
propose_weight: validator.vote_weight as i32,
vote_weight: validator.vote_weight as i32,
}
Expand Down
1 change: 0 additions & 1 deletion core/consensus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ lazy_static = "1.4"
num-traits = "0.2"
rand = "0.7"


[features]
default = []
random_leader = ["overlord/random_leader"]
64 changes: 41 additions & 23 deletions core/consensus/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ use tokio::sync::mpsc::{channel, Receiver, Sender};
use common_apm::muta_apm;
use common_merkle::Merkle;

use core_network::{PeerId, PeerIdExt};

use protocol::traits::{
CommonConsensusAdapter, ConsensusAdapter, Context, ExecutorFactory, ExecutorParams,
ExecutorResp, Gossip, MemPool, MessageTarget, MixedTxHashes, PeerTrust, Priority, Rpc,
ExecutorResp, Gossip, MemPool, MessageTarget, MixedTxHashes, Network, PeerTrust, Priority, Rpc,
ServiceMapping, Storage, SynchronizationAdapter, TrustFeedback,
};
use protocol::types::{
Expand All @@ -43,7 +45,7 @@ const OVERLORD_GAP: usize = 10;
pub struct OverlordConsensusAdapter<
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage,
DB: cita_trie::DB,
Mapping: ServiceMapping,
Expand All @@ -66,7 +68,7 @@ impl<EF, M, N, S, DB, Mapping> ConsensusAdapter
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down Expand Up @@ -111,9 +113,11 @@ where
.await
}

MessageTarget::Specified(addr) => {
MessageTarget::Specified(pub_key) => {
let peer_id_bytes = PeerId::from_pubkey_bytes(pub_key)?.into_bytes_ext();

self.network
.users_cast(ctx, end, vec![addr], msg, Priority::High)
.multicast(ctx, end, [peer_id_bytes], msg, Priority::High)
.await
}
}
Expand Down Expand Up @@ -216,7 +220,7 @@ impl<EF, M, N, S, DB, Mapping> SynchronizationAdapter
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down Expand Up @@ -350,7 +354,7 @@ impl<EF, M, N, S, DB, Mapping> CommonConsensusAdapter
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down Expand Up @@ -488,6 +492,15 @@ where
Ok(serde_json::from_str(&exec_resp.succeed_data).expect("Decode metadata failed!"))
}

fn tag_consensus(&self, ctx: Context, pub_keys: Vec<Bytes>) -> ProtocolResult<()> {
let peer_ids_bytes = pub_keys
.iter()
.map(|pk| PeerId::from_pubkey_bytes(pk).map(PeerIdExt::into_bytes_ext))
.collect::<Result<_, _>>()?;

self.network.tag_consensus(ctx, peer_ids_bytes)
}

#[muta_apm::derive::tracing_span(kind = "consensus.adapter")]
fn report_bad(&self, ctx: Context, feedback: TrustFeedback) {
self.network.report(ctx, feedback);
Expand Down Expand Up @@ -548,11 +561,11 @@ where

let authority_map = previous_metadata
.verifier_list
.into_iter()
.iter()
.map(|v| {
let address = v.address.as_bytes();
let address = v.pub_key.decode();
let node = Node {
address: v.address.as_bytes(),
address: v.pub_key.decode(),
propose_weight: v.propose_weight,
vote_weight: v.vote_weight,
};
Expand All @@ -563,7 +576,10 @@ where
// TODO: useless check
// check proposer
if block.header.height != 0
&& !authority_map.contains_key(&block.header.proposer.as_bytes())
&& !previous_metadata
.verifier_list
.iter()
.any(|v| v.address == block.header.proposer)
{
log::error!(
"[consensus] verify_block_header, block.header.proposer: {:?}, authority_map: {:?}",
Expand All @@ -575,10 +591,12 @@ where

// check validators
for validator in block.header.validators.iter() {
if !authority_map.contains_key(&validator.address.as_bytes()) {
let validator_address = Address::from_pubkey_bytes(validator.pub_key.clone());

if !authority_map.contains_key(&validator.pub_key) {
log::error!(
"[consensus] verify_block_header, validator.address: {:?}, authority_map: {:?}",
validator.address,
validator_address,
authority_map
);
return Err(ConsensusError::VerifyBlockHeader(
Expand All @@ -587,14 +605,14 @@ where
)
.into());
} else {
let node = authority_map.get(&validator.address.as_bytes()).unwrap();
let node = authority_map.get(&validator.pub_key).unwrap();

if node.vote_weight != validator.vote_weight
|| node.propose_weight != validator.vote_weight
{
log::error!(
"[consensus] verify_block_header, validator.address: {:?}, authority_map: {:?}",
validator.address,
validator_address,
authority_map
);
return Err(ConsensusError::VerifyBlockHeader(
Expand Down Expand Up @@ -664,7 +682,7 @@ where
.verifier_list
.iter()
.map(|v| Node {
address: v.address.as_bytes(),
address: v.pub_key.decode(),
propose_weight: v.propose_weight,
vote_weight: v.vote_weight,
})
Expand All @@ -686,7 +704,7 @@ where
.iter()
.map(|node| (node.address.clone(), node.vote_weight))
.collect::<HashMap<overlord::types::Address, u32>>();
self.verity_proof_weight(
self.verify_proof_weight(
ctx.clone(),
block.header.height,
weight_map,
Expand All @@ -698,7 +716,7 @@ where
.verifier_list
.iter()
.filter_map(|v| {
if signed_voters.contains(&v.address.as_bytes()) {
if signed_voters.contains(&v.pub_key.decode()) {
Some(v.bls_pub_key.clone())
} else {
None
Expand Down Expand Up @@ -748,7 +766,7 @@ where
}

#[muta_apm::derive::tracing_span(kind = "consensus.adapter")]
fn verity_proof_weight(
fn verify_proof_weight(
&self,
ctx: Context,
block_height: u64,
Expand All @@ -765,15 +783,15 @@ where
.ok_or(ConsensusError::VerifyProof(block_height, WeightNotFound))
.map_err(|e| {
log::error!(
"[consensus] verity_proof_weight,signed_voter_address: {:?}",
"[consensus] verify_proof_weight,signed_voter_address: {:?}",
signed_voter_address
);
e
})?;
accumulator += u64::from(*(weight));
} else {
log::error!(
"[consensus] verity_proof_weight, weight not found, signed_voter_address: {:?}",
"[consensus] verify_proof_weight, weight not found, signed_voter_address: {:?}",
signed_voter_address
);
return Err(
Expand All @@ -784,7 +802,7 @@ where

if 3 * accumulator <= 2 * total_validator_weight {
log::error!(
"[consensus] verity_proof_weight, accumulator: {}, total: {}",
"[consensus] verify_proof_weight, accumulator: {}, total: {}",
accumulator,
total_validator_weight
);
Expand All @@ -799,7 +817,7 @@ impl<EF, M, N, S, DB, Mapping> OverlordConsensusAdapter<EF, M, N, S, DB, Mapping
where
EF: ExecutorFactory<DB, S, Mapping>,
M: MemPool + 'static,
N: Rpc + PeerTrust + Gossip + 'static,
N: Rpc + PeerTrust + Gossip + Network + 'static,
S: Storage + 'static,
DB: cita_trie::DB + 'static,
Mapping: ServiceMapping + 'static,
Expand Down
14 changes: 4 additions & 10 deletions core/consensus/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,7 @@ impl<Adapter: ConsensusAdapter + 'static> OverlordConsensus<Adapter> {
lock,
));

let overlord = Overlord::new(
node_info.self_address.as_bytes(),
Arc::clone(&engine),
crypto,
engine,
);
let overlord = Overlord::new(node_info.self_pub_key, Arc::clone(&engine), crypto, engine);
let overlord_handler = overlord.get_handler();
let status = status_agent.to_inner();

Expand Down Expand Up @@ -182,7 +177,7 @@ pub fn gen_overlord_status(
let mut authority_list = validators
.into_iter()
.map(|v| Node {
address: v.address.as_bytes(),
address: v.pub_key.clone(),
propose_weight: v.propose_weight,
vote_weight: v.vote_weight,
})
Expand Down Expand Up @@ -216,7 +211,7 @@ impl<T: overlord::Codec> OverlordMsgExt for OverlordMsg<T> {
OverlordMsg::AggregatedVote(av) => av.get_height().to_string(),
OverlordMsg::RichStatus(s) => s.height.to_string(),
OverlordMsg::SignedChoke(sc) => sc.choke.height.to_string(),
OverlordMsg::Stop => "".to_owned(),
_ => "".to_owned(),
}
}

Expand All @@ -225,9 +220,8 @@ impl<T: overlord::Codec> OverlordMsgExt for OverlordMsg<T> {
OverlordMsg::SignedProposal(sp) => sp.proposal.round.to_string(),
OverlordMsg::SignedVote(sv) => sv.get_round().to_string(),
OverlordMsg::AggregatedVote(av) => av.get_round().to_string(),
OverlordMsg::RichStatus(_) => "".to_owned(),
OverlordMsg::SignedChoke(sc) => sc.choke.round.to_string(),
OverlordMsg::Stop => "".to_owned(),
_ => "".to_owned(),
}
}
}
Expand Down
22 changes: 14 additions & 8 deletions core/consensus/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,13 +459,12 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
/// Only signed vote will be transmit to the relayer.
#[muta_apm::derive::tracing_span(
kind = "consensus.engine",
logs = "{'address':
'Address::from_bytes(addr.clone()).unwrap().as_hex()'}"
logs = "{'pub_key': 'hex::encode(pub_key.clone())'}"
)]
async fn transmit_to_relayer(
&self,
ctx: Context,
addr: Bytes,
pub_key: Bytes,
msg: OverlordMsg<FixedPill>,
) -> Result<(), Box<dyn Error + Send>> {
match msg {
Expand All @@ -476,7 +475,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
ctx,
msg,
END_GOSSIP_SIGNED_VOTE,
MessageTarget::Specified(Address::from_bytes(addr)?),
MessageTarget::Specified(pub_key),
)
.await?;
}
Expand All @@ -487,7 +486,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
ctx,
msg,
END_GOSSIP_AGGREGATED_VOTE,
MessageTarget::Specified(Address::from_bytes(addr)?),
MessageTarget::Specified(pub_key),
)
.await?;
}
Expand Down Expand Up @@ -526,7 +525,7 @@ impl<Adapter: ConsensusAdapter + 'static> Engine<FixedPill> for ConsensusEngine<
.verifier_list
.into_iter()
.map(|v| Node {
address: v.address.as_bytes(),
address: v.pub_key.decode(),
propose_weight: v.propose_weight,
vote_weight: v.vote_weight,
})
Expand Down Expand Up @@ -729,6 +728,13 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
metadata.max_tx_size,
);

let pub_keys = metadata
.verifier_list
.iter()
.map(|v| v.pub_key.decode())
.collect();
self.adapter.tag_consensus(Context::new(), pub_keys)?;

let block_hash = Hash::digest(block.header.encode_fixed()?);

if block.header.height != proof.height {
Expand Down Expand Up @@ -770,7 +776,7 @@ impl<Adapter: ConsensusAdapter + 'static> ConsensusEngine<Adapter> {
pub fn generate_new_crypto_map(metadata: Metadata) -> ProtocolResult<HashMap<Bytes, BlsPublicKey>> {
let mut new_addr_pubkey_map = HashMap::new();
for validator in metadata.verifier_list.into_iter() {
let addr = validator.address.as_bytes();
let addr = validator.pub_key.decode();
let hex_pubkey = hex::decode(validator.bls_pub_key.as_string_trim0x()).map_err(|err| {
ConsensusError::Other(format!("hex decode metadata bls pubkey error {:?}", err))
})?;
Expand All @@ -785,7 +791,7 @@ fn covert_to_overlord_authority(validators: &[Validator]) -> Vec<Node> {
let mut authority = validators
.iter()
.map(|v| Node {
address: v.address.as_bytes(),
address: v.pub_key.clone(),
propose_weight: v.propose_weight,
vote_weight: v.vote_weight,
})
Expand Down
2 changes: 1 addition & 1 deletion core/consensus/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl CurrentConsensusStatus {
.verifier_list
.iter()
.map(|v| Validator {
address: v.address.clone(),
pub_key: v.pub_key.decode(),
propose_weight: v.propose_weight,
vote_weight: v.vote_weight,
})
Expand Down
Loading