Skip to content

Commit

Permalink
Implement PeerDAS Fulu fork activation.
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmygchen committed Jan 13, 2025
1 parent c9747fb commit 2e11554
Show file tree
Hide file tree
Showing 31 changed files with 139 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ use types::{
///
/// The blobs are all gossip and kzg verified.
/// The block has completed all verifications except the availability check.
/// TODO(das): this struct can potentially be reafactored as blobs and data columns are mutually
/// exclusive and this could simplify `is_importable`.
#[derive(Clone)]
pub struct PendingComponents<E: EthSpec> {
pub block_root: Hash256,
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ static KZG_NO_PRECOMP: LazyLock<Arc<Kzg>> = LazyLock::new(|| {
});

pub fn get_kzg(spec: &ChainSpec) -> Arc<Kzg> {
if spec.eip7594_fork_epoch.is_some() {
if spec.fulu_fork_epoch.is_some() {
KZG_PEERDAS.clone()
} else if spec.deneb_fork_epoch.is_some() {
KZG.clone()
Expand Down
8 changes: 4 additions & 4 deletions beacon_node/lighthouse_network/src/discovery/enr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,9 +333,9 @@ mod test {

type E = MainnetEthSpec;

fn make_eip7594_spec() -> ChainSpec {
fn make_fulu_spec() -> ChainSpec {
let mut spec = E::default_spec();
spec.eip7594_fork_epoch = Some(Epoch::new(10));
spec.fulu_fork_epoch = Some(Epoch::new(10));
spec
}

Expand All @@ -353,7 +353,7 @@ mod test {
subscribe_all_data_column_subnets: false,
..NetworkConfig::default()
};
let spec = make_eip7594_spec();
let spec = make_fulu_spec();

let enr = build_enr_with_config(config, &spec).0;

Expand All @@ -369,7 +369,7 @@ mod test {
subscribe_all_data_column_subnets: true,
..NetworkConfig::default()
};
let spec = make_eip7594_spec();
let spec = make_fulu_spec();
let enr = build_enr_with_config(config, &spec).0;

assert_eq!(
Expand Down
40 changes: 20 additions & 20 deletions beacon_node/lighthouse_network/src/rpc/codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,17 +485,9 @@ fn context_bytes<E: EthSpec>(
RpcSuccessResponse::BlobsByRange(_) | RpcSuccessResponse::BlobsByRoot(_) => {
return fork_context.to_context_bytes(ForkName::Deneb);
}
RpcSuccessResponse::DataColumnsByRoot(d)
| RpcSuccessResponse::DataColumnsByRange(d) => {
// TODO(das): Remove deneb fork after `peerdas-devnet-2`.
return if matches!(
fork_context.spec.fork_name_at_slot::<E>(d.slot()),
ForkName::Deneb
) {
fork_context.to_context_bytes(ForkName::Deneb)
} else {
fork_context.to_context_bytes(ForkName::Electra)
};
RpcSuccessResponse::DataColumnsByRoot(_)
| RpcSuccessResponse::DataColumnsByRange(_) => {
return fork_context.to_context_bytes(ForkName::Fulu);
}
RpcSuccessResponse::LightClientBootstrap(lc_bootstrap) => {
return lc_bootstrap
Expand Down Expand Up @@ -744,10 +736,7 @@ fn handle_rpc_response<E: EthSpec>(
},
SupportedProtocol::DataColumnsByRootV1 => match fork_name {
Some(fork_name) => {
// TODO(das): PeerDAS is currently supported for both deneb and electra. This check
// does not advertise the topic on deneb, simply allows it to decode it. Advertise
// logic is in `SupportedTopic::currently_supported`.
if fork_name.deneb_enabled() {
if fork_name.fulu_enabled() {
Ok(Some(RpcSuccessResponse::DataColumnsByRoot(Arc::new(
DataColumnSidecar::from_ssz_bytes(decoded_buffer)?,
))))
Expand All @@ -768,7 +757,7 @@ fn handle_rpc_response<E: EthSpec>(
},
SupportedProtocol::DataColumnsByRangeV1 => match fork_name {
Some(fork_name) => {
if fork_name.deneb_enabled() {
if fork_name.fulu_enabled() {
Ok(Some(RpcSuccessResponse::DataColumnsByRange(Arc::new(
DataColumnSidecar::from_ssz_bytes(decoded_buffer)?,
))))
Expand Down Expand Up @@ -959,9 +948,10 @@ mod tests {
use crate::rpc::protocol::*;
use crate::types::{EnrAttestationBitfield, EnrSyncCommitteeBitfield};
use types::{
blob_sidecar::BlobIdentifier, BeaconBlock, BeaconBlockAltair, BeaconBlockBase,
BeaconBlockBellatrix, DataColumnIdentifier, EmptyBlock, Epoch, FixedBytesExtended,
FullPayload, Signature, Slot,
blob_sidecar::BlobIdentifier, data_column_sidecar::Cell, BeaconBlock, BeaconBlockAltair,
BeaconBlockBase, BeaconBlockBellatrix, BeaconBlockHeader, DataColumnIdentifier, EmptyBlock,
Epoch, FixedBytesExtended, FullPayload, KzgCommitment, KzgProof, Signature,
SignedBeaconBlockHeader, Slot,
};

type Spec = types::MainnetEthSpec;
Expand Down Expand Up @@ -1012,7 +1002,17 @@ mod tests {
}

fn empty_data_column_sidecar() -> Arc<DataColumnSidecar<Spec>> {
Arc::new(DataColumnSidecar::empty())
Arc::new(DataColumnSidecar {
index: 0,
column: VariableList::new(vec![Cell::<Spec>::default()]).unwrap(),
kzg_commitments: VariableList::new(vec![KzgCommitment::empty_for_testing()]).unwrap(),
kzg_proofs: VariableList::new(vec![KzgProof::empty()]).unwrap(),
signed_block_header: SignedBeaconBlockHeader {
message: BeaconBlockHeader::empty(),
signature: Signature::empty(),
},
kzg_commitments_inclusion_proof: Default::default(),
})
}

/// Bellatrix block with length < max_rpc_size.
Expand Down
19 changes: 9 additions & 10 deletions beacon_node/lighthouse_network/src/rpc/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ impl SupportedProtocol {
ProtocolId::new(Self::BlocksByRootV1, Encoding::SSZSnappy),
ProtocolId::new(Self::PingV1, Encoding::SSZSnappy),
];
if fork_context.spec.is_peer_das_scheduled() {
if fork_context.fork_exists(ForkName::Fulu) {
supported.extend_from_slice(&[
// V3 variants have higher preference for protocol negotation
ProtocolId::new(Self::MetaDataV3, Encoding::SSZSnappy),
Expand All @@ -478,7 +478,7 @@ impl SupportedProtocol {
ProtocolId::new(SupportedProtocol::BlobsByRangeV1, Encoding::SSZSnappy),
]);
}
if fork_context.spec.is_peer_das_scheduled() {
if fork_context.fork_exists(ForkName::Fulu) {
supported.extend_from_slice(&[
ProtocolId::new(SupportedProtocol::DataColumnsByRootV1, Encoding::SSZSnappy),
ProtocolId::new(SupportedProtocol::DataColumnsByRangeV1, Encoding::SSZSnappy),
Expand Down Expand Up @@ -627,9 +627,11 @@ impl ProtocolId {
Protocol::BlocksByRoot => rpc_block_limits_by_fork(fork_context.current_fork()),
Protocol::BlobsByRange => rpc_blob_limits::<E>(),
Protocol::BlobsByRoot => rpc_blob_limits::<E>(),
Protocol::DataColumnsByRoot => rpc_data_column_limits::<E>(fork_context.current_fork()),
Protocol::DataColumnsByRoot => {
rpc_data_column_limits::<E>(fork_context.current_fork(), &fork_context.spec)
}
Protocol::DataColumnsByRange => {
rpc_data_column_limits::<E>(fork_context.current_fork())
rpc_data_column_limits::<E>(fork_context.current_fork(), &fork_context.spec)
}
Protocol::Ping => RpcLimits::new(
<Ping as Encode>::ssz_fixed_len(),
Expand Down Expand Up @@ -710,13 +712,10 @@ pub fn rpc_blob_limits<E: EthSpec>() -> RpcLimits {
}
}

// TODO(peerdas): fix hardcoded max here
pub fn rpc_data_column_limits<E: EthSpec>(fork_name: ForkName) -> RpcLimits {
pub fn rpc_data_column_limits<E: EthSpec>(fork_name: ForkName, spec: &ChainSpec) -> RpcLimits {
RpcLimits::new(
DataColumnSidecar::<E>::empty().as_ssz_bytes().len(),
DataColumnSidecar::<E>::max_size(
E::default_spec().max_blobs_per_block_by_fork(fork_name) as usize
),
DataColumnSidecar::<E>::min_size(),
DataColumnSidecar::<E>::max_size(spec.max_blobs_per_block_by_fork(fork_name) as usize),
)
}

Expand Down
4 changes: 2 additions & 2 deletions beacon_node/lighthouse_network/src/types/globals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ mod test {
fn test_sampling_subnets() {
let log = logging::test_logger();
let mut spec = E::default_spec();
spec.eip7594_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));

let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);
Expand All @@ -231,7 +231,7 @@ mod test {
fn test_sampling_columns() {
let log = logging::test_logger();
let mut spec = E::default_spec();
spec.eip7594_fork_epoch = Some(Epoch::new(0));
spec.fulu_fork_epoch = Some(Epoch::new(0));

let custody_subnet_count = spec.data_column_sidecar_subnet_count / 2;
let subnet_sampling_size = std::cmp::max(custody_subnet_count, spec.samples_per_slot);
Expand Down
22 changes: 5 additions & 17 deletions beacon_node/lighthouse_network/src/types/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,27 +279,15 @@ impl<E: EthSpec> PubsubMessage<E> {
}
GossipKind::DataColumnSidecar(subnet_id) => {
match fork_context.from_context_bytes(gossip_topic.fork_digest) {
// TODO(das): Remove Deneb fork
Some(fork) if fork.deneb_enabled() => {
Some(fork) if fork.fulu_enabled() => {
let col_sidecar = Arc::new(
DataColumnSidecar::from_ssz_bytes(data)
.map_err(|e| format!("{:?}", e))?,
);
let peer_das_enabled =
fork_context.spec.is_peer_das_enabled_for_epoch(
col_sidecar.slot().epoch(E::slots_per_epoch()),
);
if peer_das_enabled {
Ok(PubsubMessage::DataColumnSidecar(Box::new((
*subnet_id,
col_sidecar,
))))
} else {
Err(format!(
"data_column_sidecar topic invalid for given fork digest {:?}",
gossip_topic.fork_digest
))
}
Ok(PubsubMessage::DataColumnSidecar(Box::new((
*subnet_id,
col_sidecar,
))))
}
Some(_) | None => Err(format!(
"data_column_sidecar topic invalid for given fork digest {:?}",
Expand Down
57 changes: 25 additions & 32 deletions beacon_node/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use lighthouse_network::{
MessageId, NetworkEvent, NetworkGlobals, PeerId,
};
use slog::{crit, debug, error, info, o, trace, warn};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::{collections::HashSet, pin::Pin, sync::Arc, time::Duration};
use store::HotColdDB;
Expand All @@ -33,8 +34,8 @@ use task_executor::ShutdownReason;
use tokio::sync::mpsc;
use tokio::time::Sleep;
use types::{
ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, Slot, SubnetId, SyncCommitteeSubscription,
SyncSubnetId, Unsigned, ValidatorSubscription,
ChainSpec, DataColumnSubnetId, EthSpec, ForkContext, ForkName, Slot, SubnetId,
SyncCommitteeSubscription, SyncSubnetId, Unsigned, ValidatorSubscription,
};

mod tests;
Expand Down Expand Up @@ -734,12 +735,7 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

// TODO(das): This is added here for the purpose of testing, *without* having to
// activate Electra. This should happen as part of the Electra upgrade and we should
// move the subscription logic once it's ready to rebase PeerDAS on Electra, or if
// we decide to activate via the soft fork route:
// https://github.com/sigp/lighthouse/pull/5899
if self.fork_context.spec.is_peer_das_scheduled() {
if self.fork_context.fork_exists(ForkName::Fulu) {
self.subscribe_to_peer_das_topics(&mut subscribed_topics);
}

Expand Down Expand Up @@ -789,32 +785,29 @@ impl<T: BeaconChainTypes> NetworkService<T> {
}
}

/// Keeping these separate from core topics because it has custom logic:
/// 1. Data column subscription logic depends on subscription configuration.
/// 2. Data column topic subscriptions will be dynamic based on validator balances due to
/// validator custody.
fn subscribe_to_peer_das_topics(&mut self, subscribed_topics: &mut Vec<GossipTopic>) {
if self.subscribe_all_data_column_subnets {
for column_subnet in 0..self.fork_context.spec.data_column_sidecar_subnet_count {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind =
Subnet::DataColumn(DataColumnSubnetId::new(column_subnet)).into();
let topic =
GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
let column_subnets_to_subscribe = if self.subscribe_all_data_column_subnets {
Cow::Owned(
(0..self.fork_context.spec.data_column_sidecar_subnet_count)
.map(DataColumnSubnetId::new)
.collect(),
)
} else {
for column_subnet in &self.network_globals.sampling_subnets {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind = Subnet::DataColumn(*column_subnet).into();
let topic =
GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
Cow::Borrowed(&self.network_globals.sampling_subnets)
};

for column_subnet in column_subnets_to_subscribe.iter() {
for fork_digest in self.required_gossip_fork_digests() {
let gossip_kind = Subnet::DataColumn(*column_subnet).into();
let topic = GossipTopic::new(gossip_kind, GossipEncoding::default(), fork_digest);
if self.libp2p.subscribe(topic.clone()) {
subscribed_topics.push(topic);
} else {
warn!(self.log, "Could not subscribe to topic"; "topic" => %topic);
}
}
}
Expand Down
Loading

0 comments on commit 2e11554

Please sign in to comment.