Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor chain event emits #6993

Merged
merged 2 commits into from
Aug 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 54 additions & 49 deletions packages/beacon-node/src/chain/blocks/importBlock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,34 +101,6 @@ export async function importBlock(
this.metrics?.importBlock.bySource.inc({source});
this.logger.verbose("Added block to forkchoice and state cache", {slot: blockSlot, root: blockRootHex});

// We want to import block asap so call all event handler in the next event loop
callInNextEventLoop(async () => {
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});

// dataPromise will not end up here, but preDeneb could. In future we might also allow syncing
// out of data range blocks and import then in forkchoice although one would not be able to
// attest and propose with such head similar to optimistic sync
if (blockInput.type === BlockInputType.availableData) {
const {blobsSource, blobs} = blockInput.blockData;

this.metrics?.importBlock.blobsBySource.inc({blobsSource});
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
}
}
});

// 3. Import attestations to fork choice
//
// - For each attestation
Expand Down Expand Up @@ -413,32 +385,58 @@ export async function importBlock(
// Send block events, only for recent enough blocks

if (this.clock.currentSlot - blockSlot < EVENTSTREAM_EMIT_RECENT_BLOCK_SLOTS) {
// NOTE: Skip looping if there are no listeners from the API
if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) {
for (const voluntaryExit of block.message.body.voluntaryExits) {
this.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit);
// We want to import block asap so call all event handler in the next event loop
callInNextEventLoop(() => {
// NOTE: Skip emitting if there are no listeners from the API
if (this.emitter.listenerCount(routes.events.EventType.block)) {
this.emitter.emit(routes.events.EventType.block, {
block: blockRootHex,
slot: blockSlot,
executionOptimistic: blockSummary != null && isOptimisticBlock(blockSummary),
});
}
}
if (this.emitter.listenerCount(routes.events.EventType.blsToExecutionChange)) {
for (const blsToExecutionChange of (block.message.body as capella.BeaconBlockBody).blsToExecutionChanges ?? []) {
this.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange);
if (this.emitter.listenerCount(routes.events.EventType.voluntaryExit)) {
for (const voluntaryExit of block.message.body.voluntaryExits) {
this.emitter.emit(routes.events.EventType.voluntaryExit, voluntaryExit);
}
}
}
if (this.emitter.listenerCount(routes.events.EventType.attestation)) {
for (const attestation of block.message.body.attestations) {
this.emitter.emit(routes.events.EventType.attestation, attestation);
if (this.emitter.listenerCount(routes.events.EventType.blsToExecutionChange)) {
for (const blsToExecutionChange of (block.message as capella.BeaconBlock).body.blsToExecutionChanges ?? []) {
this.emitter.emit(routes.events.EventType.blsToExecutionChange, blsToExecutionChange);
}
}
}
if (this.emitter.listenerCount(routes.events.EventType.attesterSlashing)) {
for (const attesterSlashing of block.message.body.attesterSlashings) {
this.emitter.emit(routes.events.EventType.attesterSlashing, attesterSlashing);
if (this.emitter.listenerCount(routes.events.EventType.attestation)) {
for (const attestation of block.message.body.attestations) {
this.emitter.emit(routes.events.EventType.attestation, attestation);
}
}
}
if (this.emitter.listenerCount(routes.events.EventType.proposerSlashing)) {
for (const proposerSlashing of block.message.body.proposerSlashings) {
this.emitter.emit(routes.events.EventType.proposerSlashing, proposerSlashing);
if (this.emitter.listenerCount(routes.events.EventType.attesterSlashing)) {
for (const attesterSlashing of block.message.body.attesterSlashings) {
this.emitter.emit(routes.events.EventType.attesterSlashing, attesterSlashing);
}
}
}
if (this.emitter.listenerCount(routes.events.EventType.proposerSlashing)) {
for (const proposerSlashing of block.message.body.proposerSlashings) {
this.emitter.emit(routes.events.EventType.proposerSlashing, proposerSlashing);
}
}
if (
blockInput.type === BlockInputType.availableData &&
this.emitter.listenerCount(routes.events.EventType.blobSidecar)
) {
const {blobs} = blockInput.blockData;
for (const blobSidecar of blobs) {
const {index, kzgCommitment} = blobSidecar;
this.emitter.emit(routes.events.EventType.blobSidecar, {
blockRoot: blockRootHex,
slot: blockSlot,
index,
kzgCommitment: toHexString(kzgCommitment),
versionedHash: toHexString(kzgCommitmentToVersionedHash(kzgCommitment)),
});
}
}
});
}

// Register stat metrics about the block after importing it
Expand All @@ -452,6 +450,13 @@ export async function importBlock(
fullyVerifiedBlock.postState.epochCtx.currentSyncCommitteeIndexed.validatorIndices
);
}
// dataPromise will not end up here, but preDeneb could. In future we might also allow syncing
// out of data range blocks and import then in forkchoice although one would not be able to
// attest and propose with such head similar to optimistic sync
if (blockInput.type === BlockInputType.availableData) {
const {blobsSource} = blockInput.blockData;
this.metrics?.importBlock.blobsBySource.inc({blobsSource});
}

const advancedSlot = this.clock.slotWithFutureTolerance(REPROCESS_MIN_TIME_TO_NEXT_SLOT_SEC);

Expand Down
Loading