Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

approval distribution: trigger assignment/votes resend based on approval checking lag #7038

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 30 additions & 16 deletions node/network/approval-distribution/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,12 @@ struct AggressionConfig {
}

impl AggressionConfig {
/// Returns `true` if block is not too old depending on the aggression level
fn is_age_relevant(&self, block_age: BlockNumber) -> bool {
/// Returns `true` if lag is past threshold depending on the aggression level
fn should_trigger_aggression(&self, approval_checking_lag: BlockNumber) -> bool {
if let Some(t) = self.l1_threshold {
block_age >= t
approval_checking_lag >= t
} else if let Some(t) = self.resend_unfinalized_period {
block_age > 0 && block_age % t == 0
approval_checking_lag > 0 && approval_checking_lag % t == 0
} else {
false
}
Expand Down Expand Up @@ -184,6 +184,9 @@ struct State {

/// HashMap from active leaves to spans
spans: HashMap<Hash, jaeger::PerLeafSpan>,

/// Current approval checking finality lag.
approval_checking_lag: BlockNumber,
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -1425,19 +1428,24 @@ impl State {
resend: Resend,
metrics: &Metrics,
) {
let min_age = self.blocks_by_number.iter().next().map(|(num, _)| num);
let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num);
let config = self.aggression_config.clone();

let (min_age, max_age) = match (min_age, max_age) {
(Some(min), Some(max)) => (min, max),
if !self.aggression_config.should_trigger_aggression(self.approval_checking_lag) {
return
}

let max_age = self.blocks_by_number.iter().rev().next().map(|(num, _)| num);

let max_age = match max_age {
Some(max) => *max,
_ => return, // empty.
};

let diff = max_age - min_age;
if !self.aggression_config.is_age_relevant(diff) {
return
}
// Since we have the approval checking lag, we need to set the `min_age` accordingly to
// enable aggresion for the oldest block that is not approved.
//
// Alternatively we could remove blocks that have been approved based on the `approval_checking_lag` value.
let min_age = max_age.saturating_sub(self.approval_checking_lag);

adjust_required_routing_and_propagate(
ctx,
Expand Down Expand Up @@ -1476,28 +1484,30 @@ impl State {
// its descendants from being finalized. Waste minimal bandwidth
// this way. Also, disputes might prevent finality - again, nothing
// to waste bandwidth on newer blocks for.
&block_entry.number == min_age
block_entry.number == min_age
},
|required_routing, local, _| {
// It's a bit surprising not to have a topology at this age.
if *required_routing == RequiredRouting::PendingTopology {
gum::debug!(
target: LOG_TARGET,
age = ?diff,
lag = ?self.approval_checking_lag,
"Encountered old block pending gossip topology",
);
return
}

if config.l1_threshold.as_ref().map_or(false, |t| &diff >= t) {
if config.l1_threshold.as_ref().map_or(false, |t| &self.approval_checking_lag >= t)
{
// Message originator sends to everyone.
if local && *required_routing != RequiredRouting::All {
metrics.on_aggression_l1();
*required_routing = RequiredRouting::All;
}
}

if config.l2_threshold.as_ref().map_or(false, |t| &diff >= t) {
if config.l2_threshold.as_ref().map_or(false, |t| &self.approval_checking_lag >= t)
{
// Message originator sends to everyone. Everyone else sends to XY.
if !local && *required_routing != RequiredRouting::GridXY {
metrics.on_aggression_l2();
Expand Down Expand Up @@ -1764,6 +1774,10 @@ impl ApprovalDistribution {
);
}
},
ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag) => {
gum::debug!(target: LOG_TARGET, lag, "Received `ApprovalCheckingLagUpdate`");
state.approval_checking_lag = lag;
},
}
}
}
Expand Down
1 change: 1 addition & 0 deletions node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ where
basics.backend.clone(),
overseer_handle.clone(),
metrics,
basics.task_manager.spawn_handle(),
)
} else {
SelectRelayChain::new_longest_chain(basics.backend.clone())
Expand Down
49 changes: 42 additions & 7 deletions node/service/src/relay_chain_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,16 @@ use consensus_common::{Error as ConsensusError, SelectChain};
use futures::channel::oneshot;
use polkadot_node_primitives::MAX_FINALITY_LAG as PRIMITIVES_MAX_FINALITY_LAG;
use polkadot_node_subsystem::messages::{
ApprovalVotingMessage, ChainSelectionMessage, DisputeCoordinatorMessage,
HighestApprovedAncestorBlock,
ApprovalDistributionMessage, ApprovalVotingMessage, ChainSelectionMessage,
DisputeCoordinatorMessage, HighestApprovedAncestorBlock,
};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_overseer::{AllMessages, Handle};
use polkadot_primitives::{Block as PolkadotBlock, BlockNumber, Hash, Header as PolkadotHeader};
use std::sync::Arc;

pub use service::SpawnTaskHandle;

/// The maximum amount of unfinalized blocks we are willing to allow due to approval checking
/// or disputes.
///
Expand Down Expand Up @@ -162,13 +164,21 @@ where

/// Create a new [`SelectRelayChain`] wrapping the given chain backend
/// and a handle to the overseer.
pub fn new_with_overseer(backend: Arc<B>, overseer: Handle, metrics: Metrics) -> Self {
pub fn new_with_overseer(
backend: Arc<B>,
overseer: Handle,
metrics: Metrics,
spawn_handle: SpawnTaskHandle,
) -> Self {
gum::debug!(target: LOG_TARGET, "Using dispute aware relay-chain selection algorithm",);

SelectRelayChain {
longest_chain: sc_consensus::LongestChain::new(backend.clone()),
selection: IsDisputesAwareWithOverseer::Yes(SelectRelayChainInner::new(
backend, overseer, metrics,
backend,
overseer,
metrics,
spawn_handle,
)),
}
}
Expand Down Expand Up @@ -219,6 +229,7 @@ pub struct SelectRelayChainInner<B, OH> {
backend: Arc<B>,
overseer: OH,
metrics: Metrics,
spawn_handle: SpawnTaskHandle,
}

impl<B, OH> SelectRelayChainInner<B, OH>
Expand All @@ -228,8 +239,13 @@ where
{
/// Create a new [`SelectRelayChainInner`] wrapping the given chain backend
/// and a handle to the overseer.
pub fn new(backend: Arc<B>, overseer: OH, metrics: Metrics) -> Self {
SelectRelayChainInner { backend, overseer, metrics }
pub fn new(
backend: Arc<B>,
overseer: OH,
metrics: Metrics,
spawn_handle: SpawnTaskHandle,
) -> Self {
SelectRelayChainInner { backend, overseer, metrics, spawn_handle }
}

fn block_header(&self, hash: Hash) -> Result<PolkadotHeader, ConsensusError> {
Expand Down Expand Up @@ -267,6 +283,7 @@ where
backend: self.backend.clone(),
overseer: self.overseer.clone(),
metrics: self.metrics.clone(),
spawn_handle: self.spawn_handle.clone(),
}
}
}
Expand Down Expand Up @@ -307,7 +324,7 @@ impl OverseerHandleT for Handle {
impl<B, OH> SelectRelayChainInner<B, OH>
where
B: HeaderProviderProvider<PolkadotBlock>,
OH: OverseerHandleT,
OH: OverseerHandleT + 'static,
{
/// Get all leaves of the chain, i.e. block hashes that are suitable to
/// build upon and have no suitable children.
Expand Down Expand Up @@ -455,6 +472,24 @@ where
let lag = initial_leaf_number.saturating_sub(subchain_number);
self.metrics.note_approval_checking_finality_lag(lag);

let mut overseer_handle = self.overseer.clone();
let lag_update_task = async move {
overseer_handle
.send_msg(
ApprovalDistributionMessage::ApprovalCheckingLagUpdate(lag),
std::any::type_name::<Self>(),
)
.await;
};

// Messages sent to `approval-distrbution` are known to have high `ToF`, we need to spawn a task for sending
// the message to not block here and delay finality.
self.spawn_handle.spawn(
"approval-checking-lag-update",
Some("relay-chain-selection"),
Box::pin(lag_update_task),
);

let (lag, subchain_head) = {
// Prevent sending flawed data to the dispute-coordinator.
if Some(subchain_block_descriptions.len() as _) !=
Expand Down
2 changes: 2 additions & 0 deletions node/subsystem-types/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,8 @@ pub enum ApprovalDistributionMessage {
HashSet<(Hash, CandidateIndex)>,
oneshot::Sender<HashMap<ValidatorIndex, ValidatorSignature>>,
),
/// Approval checking lag update measured in blocks.
ApprovalCheckingLagUpdate(BlockNumber),
}

/// Message to the Gossip Support subsystem.
Expand Down