Skip to content

Commit af660d5

Browse files
committed
Initial commit
1 parent c6e1cca commit af660d5

File tree

11 files changed

+126
-29
lines changed

11 files changed

+126
-29
lines changed

packages/beacon-node/src/api/impl/beacon/pool/index.ts

+11-3
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import {routes} from "@lodestar/api";
22
import {ApplicationMethods} from "@lodestar/api/server";
3-
import {Epoch, ssz} from "@lodestar/types";
4-
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
3+
import {Epoch, isElectraAttestation, ssz} from "@lodestar/types";
4+
import {ForkName, ForkSeq, SYNC_COMMITTEE_SUBNET_SIZE} from "@lodestar/params";
55
import {validateApiAttestation} from "../../../../chain/validation/index.js";
66
import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
77
import {validateApiProposerSlashing} from "../../../../chain/validation/proposerSlashing.js";
@@ -26,7 +26,9 @@ export function getBeaconPoolApi({
2626
return {
2727
async getPoolAttestations({slot, committeeIndex}) {
2828
// Already filtered by slot
29-
let attestations = chain.aggregatedAttestationPool.getAll(slot);
29+
let attestations = chain.aggregatedAttestationPool
30+
.getAll(slot)
31+
.filter((attestation) => !isElectraAttestation(attestation));
3032

3133
if (committeeIndex !== undefined) {
3234
attestations = attestations.filter((attestation) => committeeIndex === attestation.data.index);
@@ -39,6 +41,12 @@ export function getBeaconPoolApi({
3941
// Already filtered by slot
4042
let attestations = chain.aggregatedAttestationPool.getAll(slot);
4143
const fork = chain.config.getForkName(slot ?? attestations[0]?.data.slot ?? chain.clock.currentSlot);
44+
const isAfterElectra = ForkSeq[fork] >= ForkSeq.electra;
45+
attestations = attestations.filter(
46+
(attestation) =>
47+
(isAfterElectra && isElectraAttestation(attestation)) ||
48+
(!isAfterElectra && !isElectraAttestation(attestation))
49+
);
4250

4351
if (committeeIndex !== undefined) {
4452
attestations = attestations.filter((attestation) => committeeIndex === attestation.data.index);

packages/beacon-node/src/api/impl/validator/index.ts

+17-2
Original file line numberDiff line numberDiff line change
@@ -1064,8 +1064,23 @@ export function getValidatorApi({
10641064
},
10651065

10661066
// TODO Electra: Implement getAggregatedAttestation to properly handle pre-electra
1067-
async getAggregatedAttestation() {
1068-
throw new Error("Not implemented. Use getAggregatedAttestationV2 for now.");
1067+
async getAggregatedAttestation({attestationDataRoot, slot}) {
1068+
notWhileSyncing();
1069+
1070+
await waitForSlot(slot); // Must never request for a future slot > currentSlot
1071+
1072+
const dataRootHex = toHex(attestationDataRoot);
1073+
const aggregate = chain.attestationPool.getAggregate(slot, null, dataRootHex);
1074+
1075+
if (!aggregate) {
1076+
throw new ApiError(404, `No aggregated attestation for slot=${slot}, dataRoot=${dataRootHex}`);
1077+
}
1078+
1079+
metrics?.production.producedAggregateParticipants.observe(aggregate.aggregationBits.getTrueBitIndexes().length);
1080+
1081+
return {
1082+
data: aggregate,
1083+
};
10691084
},
10701085

10711086
async getAggregatedAttestationV2({attestationDataRoot, slot, committeeIndex}) {

packages/beacon-node/src/chain/chain.ts

+8-2
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ export class BeaconChain implements IBeaconChain {
131131

132132
// Ops pool
133133
readonly attestationPool: AttestationPool;
134-
readonly aggregatedAttestationPool = new AggregatedAttestationPool();
134+
readonly aggregatedAttestationPool: AggregatedAttestationPool;
135135
readonly syncCommitteeMessagePool: SyncCommitteeMessagePool;
136136
readonly syncContributionAndProofPool = new SyncContributionAndProofPool();
137137
readonly opPool = new OpPool();
@@ -221,7 +221,13 @@ export class BeaconChain implements IBeaconChain {
221221
if (!clock) clock = new Clock({config, genesisTime: this.genesisTime, signal});
222222

223223
const preAggregateCutOffTime = (2 / 3) * this.config.SECONDS_PER_SLOT;
224-
this.attestationPool = new AttestationPool(clock, preAggregateCutOffTime, this.opts?.preaggregateSlotDistance);
224+
this.attestationPool = new AttestationPool(
225+
this.config,
226+
clock,
227+
preAggregateCutOffTime,
228+
this.opts?.preaggregateSlotDistance
229+
);
230+
this.aggregatedAttestationPool = new AggregatedAttestationPool(this.config);
225231
this.syncCommitteeMessagePool = new SyncCommitteeMessagePool(
226232
clock,
227233
preAggregateCutOffTime,

packages/beacon-node/src/chain/opPools/aggregatedAttestationPool.ts

+35-6
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import {
3131
} from "@lodestar/state-transition";
3232
import {IForkChoice, EpochDifference} from "@lodestar/fork-choice";
3333
import {toHex, MapDef, assert} from "@lodestar/utils";
34+
import {ChainForkConfig} from "@lodestar/config";
3435
import {intersectUint8Arrays, IntersectResult} from "../../util/bitArray.js";
3536
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
3637
import {InsertOutcome} from "./types.js";
@@ -102,6 +103,8 @@ export class AggregatedAttestationPool {
102103
>(() => new Map<DataRootHex, Map<CommitteeIndex, MatchingDataAttestationGroup>>());
103104
private lowestPermissibleSlot = 0;
104105

106+
constructor(private readonly config: ChainForkConfig) {}
107+
105108
/** For metrics to track size of the pool */
106109
getAttestationCount(): {attestationCount: number; attestationDataCount: number} {
107110
let attestationCount = 0;
@@ -124,6 +127,7 @@ export class AggregatedAttestationPool {
124127
committee: Uint32Array
125128
): InsertOutcome {
126129
const slot = attestation.data.slot;
130+
const fork = this.config.getForkSeq(slot);
127131
const lowestPermissibleSlot = this.lowestPermissibleSlot;
128132

129133
// Reject any attestations that are too old.
@@ -137,10 +141,22 @@ export class AggregatedAttestationPool {
137141
attestationGroupByIndex = new Map<CommitteeIndex, MatchingDataAttestationGroup>();
138142
attestationGroupByIndexByDataHash.set(dataRootHex, attestationGroupByIndex);
139143
}
140-
const committeeIndex = isElectraAttestation(attestation)
141-
? // this attestation is added to pool after validation
142-
attestation.committeeBits.getSingleTrueBit()
143-
: attestation.data.index;
144+
145+
let committeeIndex;
146+
147+
if (fork >= ForkSeq.electra) {
148+
if (isElectraAttestation(attestation)) {
149+
committeeIndex = attestation.committeeBits.getSingleTrueBit();
150+
} else {
151+
throw new Error("");
152+
}
153+
} else {
154+
if (!isElectraAttestation(attestation)) {
155+
committeeIndex = attestation.data.index;
156+
} else {
157+
throw new Error("");
158+
}
159+
}
144160
// this should not happen because attestation should be validated before reaching this
145161
assert.notNull(committeeIndex, "Committee index should not be null in aggregated attestation pool");
146162
let attestationGroup = attestationGroupByIndex.get(committeeIndex);
@@ -391,6 +407,10 @@ export class AggregatedAttestationPool {
391407

392408
/**
393409
* Get all attestations optionally filtered by `attestation.data.slot`
410+
* Note this function is not fork aware and can potentially return a mix
411+
* of phase0.Attestations and electra.Attestations.
412+
* Caller of this function is expected to filtered result if they desire
413+
* a homogenous array.
394414
* @param bySlot slot to filter, `bySlot === attestation.data.slot`
395415
*/
396416
getAll(bySlot?: Slot): Attestation[] {
@@ -506,7 +526,16 @@ export class MatchingDataAttestationGroup {
506526
getAttestationsForBlock(fork: ForkName, notSeenAttestingIndices: Set<number>): AttestationNonParticipant[] {
507527
const attestations: AttestationNonParticipant[] = [];
508528
const forkSeq = ForkSeq[fork];
529+
const isAfterElectra = forkSeq >= ForkSeq.electra;
509530
for (const {attestation} of this.attestations) {
531+
if (
532+
(isAfterElectra && !isElectraAttestation(attestation)) ||
533+
(!isAfterElectra && isElectraAttestation(attestation))
534+
) {
535+
// TODO Electra: log warning
536+
continue;
537+
}
538+
510539
let notSeenAttesterCount = 0;
511540
const {aggregationBits} = attestation;
512541
for (const notSeenIndex of notSeenAttestingIndices) {
@@ -516,12 +545,12 @@ export class MatchingDataAttestationGroup {
516545
}
517546

518547
// if fork >= electra, should return electra-only attestations
519-
if (notSeenAttesterCount > 0 && (forkSeq < ForkSeq.electra || isElectraAttestation(attestation))) {
548+
if (notSeenAttesterCount > 0) {
520549
attestations.push({attestation, notSeenAttesterCount});
521550
}
522551
}
523552

524-
const maxAttestation = forkSeq >= ForkSeq.electra ? MAX_ATTESTATIONS_PER_GROUP_ELECTRA : MAX_ATTESTATIONS_PER_GROUP;
553+
const maxAttestation = isAfterElectra ? MAX_ATTESTATIONS_PER_GROUP_ELECTRA : MAX_ATTESTATIONS_PER_GROUP;
525554
if (attestations.length <= maxAttestation) {
526555
return attestations;
527556
} else {

packages/beacon-node/src/chain/opPools/attestationPool.ts

+27-7
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ import bls from "@chainsafe/bls";
33
import {BitArray} from "@chainsafe/ssz";
44
import {Slot, RootHex, isElectraAttestation, Attestation} from "@lodestar/types";
55
import {MapDef, assert} from "@lodestar/utils";
6+
import {ForkSeq} from "@lodestar/params";
7+
import {ChainForkConfig} from "@lodestar/config";
68
import {IClock} from "../../util/clock.js";
79
import {InsertOutcome, OpPoolError, OpPoolErrorCode} from "./types.js";
8-
import {pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
10+
import {isElectraAggregate, pruneBySlot, signatureFromBytesNoCheck} from "./utils.js";
911

1012
/**
1113
* The number of slots that will be stored in the pool.
@@ -29,14 +31,14 @@ type AggregateFastPhase0 = {
2931
signature: Signature;
3032
};
3133

32-
type AggregateFastElectra = AggregateFastPhase0 & {committeeBits: BitArray};
34+
export type AggregateFastElectra = AggregateFastPhase0 & {committeeBits: BitArray};
3335

34-
type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;
36+
export type AggregateFast = AggregateFastPhase0 | AggregateFastElectra;
3537

3638
/** Hex string of DataRoot `TODO` */
3739
type DataRootHex = string;
3840

39-
type CommitteeIndex = number;
41+
type CommitteeIndex = number | null;
4042

4143
/**
4244
* A pool of `Attestation` that is specially designed to store "unaggregated" attestations from
@@ -69,6 +71,7 @@ export class AttestationPool {
6971
private lowestPermissibleSlot = 0;
7072

7173
constructor(
74+
private readonly config: ChainForkConfig,
7275
private readonly clock: IClock,
7376
private readonly cutOffSecFromSlot: number,
7477
private readonly preaggregateSlotDistance = 0
@@ -104,6 +107,7 @@ export class AttestationPool {
104107
*/
105108
add(committeeIndex: CommitteeIndex, attestation: Attestation, attDataRootHex: RootHex): InsertOutcome {
106109
const slot = attestation.data.slot;
110+
const fork = this.config.getForkSeq(slot);
107111
const lowestPermissibleSlot = this.lowestPermissibleSlot;
108112

109113
// Reject any attestations that are too old.
@@ -122,8 +126,15 @@ export class AttestationPool {
122126
throw new OpPoolError({code: OpPoolErrorCode.REACHED_MAX_PER_SLOT});
123127
}
124128

125-
// this should not happen because attestation should be validated before reaching this
126-
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool");
129+
// TODO Electra: Use `isForkElectra` after the other PR is merged
130+
if (fork >= ForkSeq.electra) {
131+
// Electra only: this should not happen because attestation should be validated before reaching this
132+
assert.notNull(committeeIndex, "Committee index should not be null in attestation pool post-electra");
133+
assert.true(isElectraAttestation(attestation), "Attestation should be type electra.Attestation");
134+
} else {
135+
assert.true(!isElectraAttestation(attestation), "Attestation should be type phase0.Attestation");
136+
committeeIndex = null; // For pre-electra, committee index info is encoded in attDataRootIndex
137+
}
127138

128139
// Pre-aggregate the contribution with existing items
129140
let aggregateByIndex = aggregateByRoot.get(attDataRootHex);
@@ -145,14 +156,23 @@ export class AttestationPool {
145156
/**
146157
* For validator API to get an aggregate
147158
*/
148-
// TODO Electra: Change attestation pool to accomodate pre-electra request
149159
getAggregate(slot: Slot, committeeIndex: CommitteeIndex, dataRootHex: RootHex): Attestation | null {
160+
const fork = this.config.getForkSeq(slot);
161+
const isAfterElectra = fork >= ForkSeq.electra;
162+
committeeIndex = isAfterElectra ? committeeIndex : null;
163+
150164
const aggregate = this.aggregateByIndexByRootBySlot.get(slot)?.get(dataRootHex)?.get(committeeIndex);
151165
if (!aggregate) {
152166
// TODO: Add metric for missing aggregates
153167
return null;
154168
}
155169

170+
if (isAfterElectra) {
171+
assert.true(isElectraAggregate(aggregate), "Aggregate should be type AggregateFastElectra");
172+
} else {
173+
assert.true(!isElectraAggregate(aggregate), "Aggregate should be type AggregateFastPhase0");
174+
}
175+
156176
return fastToAttestation(aggregate);
157177
}
158178

packages/beacon-node/src/chain/opPools/utils.ts

+5
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {CoordType, Signature} from "@chainsafe/bls/types";
33
import {BLS_WITHDRAWAL_PREFIX} from "@lodestar/params";
44
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
55
import {Slot, capella} from "@lodestar/types";
6+
import {AggregateFast, AggregateFastElectra} from "./attestationPool.js";
67

78
/**
89
* Prune a Map indexed by slot to keep the most recent slots, up to `slotsRetained`
@@ -59,3 +60,7 @@ export function isValidBlsToExecutionChangeForBlockInclusion(
5960

6061
return true;
6162
}
63+
64+
export function isElectraAggregate(aggregate: AggregateFast): aggregate is AggregateFastElectra {
65+
return (aggregate as AggregateFastElectra).committeeBits !== undefined;
66+
}

packages/beacon-node/src/network/processor/gossipQueues/index.ts

+3-4
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import {mapValues} from "@lodestar/utils";
2-
import {ForkSeq} from "@lodestar/params";
32
import {GossipType} from "../../gossip/interface.js";
43
import {PendingGossipsubMessage} from "../types.js";
5-
import {getSeenAttDataKey} from "../../../util/sszBytes.js";
4+
import {getGossipAttestationIndex} from "../../../util/sszBytes.js";
65
import {LinearGossipQueue} from "./linear.js";
76
import {
87
DropType,
@@ -88,8 +87,8 @@ const indexedGossipQueueOpts: {
8887
[GossipType.beacon_attestation]: {
8988
maxLength: 24576,
9089
indexFn: (item: PendingGossipsubMessage) => {
91-
const {topic, msg} = item;
92-
return getSeenAttDataKey(ForkSeq[topic.fork], msg.data);
90+
// Note indexFn is fork agnostic despite changes introduced in Electra
91+
return getGossipAttestationIndex(item.msg.data);
9392
},
9493
minChunkSize: MIN_SIGNATURE_SETS_TO_BATCH_VERIFY,
9594
maxChunkSize: MAX_GOSSIP_ATTESTATION_BATCH_SIZE,

packages/beacon-node/src/util/sszBytes.ts

+7
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,13 @@ export function getSeenAttDataKeyPhase0(data: Uint8Array): AttDataBase64 | null
110110
return toBase64(data.slice(VARIABLE_FIELD_OFFSET, VARIABLE_FIELD_OFFSET + ATTESTATION_DATA_SIZE));
111111
}
112112

113+
/**
114+
* Alias of `getSeenAttDataKeyPhase0` specifically for batch handling indexing in gossip queue
115+
*/
116+
export function getGossipAttestationIndex(data: Uint8Array): AttDataBase64 | null {
117+
return getSeenAttDataKeyPhase0(data);
118+
}
119+
113120
/**
114121
* Extract aggregation bits from attestation serialized bytes.
115122
* Return null if data is not long enough to extract aggregation bits.

packages/beacon-node/test/mocks/mockedBeaconChain.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ vi.mock("../../src/chain/chain.js", async (importActual) => {
123123
// @ts-expect-error
124124
eth1: new Eth1ForBlockProduction(),
125125
opPool: new OpPool(),
126-
aggregatedAttestationPool: new AggregatedAttestationPool(),
126+
aggregatedAttestationPool: new AggregatedAttestationPool(config),
127127
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
128128
// @ts-expect-error
129129
beaconProposerCache: new BeaconProposerCache(),

packages/beacon-node/test/perf/chain/opPools/aggregatedAttestationPool.test.ts

+7-3
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,9 @@ import {
1010
import {HISTORICAL_ROOTS_LIMIT, SLOTS_PER_EPOCH} from "@lodestar/params";
1111
import {ExecutionStatus, ForkChoice, IForkChoiceStore, ProtoArray, DataAvailabilityStatus} from "@lodestar/fork-choice";
1212
import {ssz} from "@lodestar/types";
13-
// eslint-disable-next-line import/no-relative-packages
14-
import {generatePerfTestCachedStateAltair} from "../../../../../state-transition/test/perf/util.js";
13+
14+
import {createChainForkConfig, defaultChainConfig} from "@lodestar/config";
15+
import {generatePerfTestCachedStateAltair} from "@lodestar/state-transition/test/perf/util.js";
1516
import {AggregatedAttestationPool} from "../../../../src/chain/opPools/aggregatedAttestationPool.js";
1617
import {computeAnchorCheckpoint} from "../../../../src/chain/initState.js";
1718

@@ -230,7 +231,10 @@ function getAggregatedAttestationPool(
230231
numMissedVotes: number,
231232
numBadVotes: number
232233
): AggregatedAttestationPool {
233-
const pool = new AggregatedAttestationPool();
234+
const config = createChainForkConfig({
235+
...defaultChainConfig,
236+
});
237+
const pool = new AggregatedAttestationPool(config);
234238
for (let epochSlot = 0; epochSlot < SLOTS_PER_EPOCH; epochSlot++) {
235239
const slot = state.slot - 1 - epochSlot;
236240
const epoch = computeEpochAtSlot(slot);

packages/beacon-node/test/unit/chain/opPools/aggregatedAttestationPool.test.ts

+5-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import {
1212
} from "@lodestar/params";
1313
import {ssz, phase0} from "@lodestar/types";
1414
import {CachedBeaconStateAltair} from "@lodestar/state-transition/src/types.js";
15+
import {createChainForkConfig, defaultChainConfig} from "@lodestar/config";
1516
import {MockedForkChoice, getMockedForkChoice} from "../../../mocks/mockedBeaconChain.js";
1617
import {
1718
aggregateConsolidation,
@@ -37,6 +38,9 @@ const validSignature = fromHexString(
3738
describe("AggregatedAttestationPool", function () {
3839
let pool: AggregatedAttestationPool;
3940
const fork = ForkName.altair;
41+
const config = createChainForkConfig({
42+
...defaultChainConfig,
43+
});
4044
const altairForkEpoch = 2020;
4145
const currentEpoch = altairForkEpoch + 10;
4246
const currentSlot = SLOTS_PER_EPOCH * currentEpoch;
@@ -80,7 +84,7 @@ describe("AggregatedAttestationPool", function () {
8084
let forkchoiceStub: MockedForkChoice;
8185

8286
beforeEach(() => {
83-
pool = new AggregatedAttestationPool();
87+
pool = new AggregatedAttestationPool(config);
8488
altairState = originalState.clone();
8589
forkchoiceStub = getMockedForkChoice();
8690
});

0 commit comments

Comments
 (0)