diff --git a/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java b/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java index c20d6ed5f48..9ef842ac60a 100644 --- a/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java +++ b/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java @@ -37,6 +37,7 @@ import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AttestationManager; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeMessagePool; @@ -73,6 +74,7 @@ public class ValidatorApiHandlerIntegrationTest { private final BlockGossipChannel blockGossipChannel = mock(BlockGossipChannel.class); private final ChainDataProvider chainDataProvider = mock(ChainDataProvider.class); private final ForkChoiceTrigger forkChoiceTrigger = mock(ForkChoiceTrigger.class); + private final ForkChoiceNotifier forkChoiceNotifier = mock(ForkChoiceNotifier.class); private final ChainUpdater chainUpdater = storageSystem.chainUpdater(); private final SyncCommitteeMessagePool syncCommitteeMessagePool = @@ -97,6 +99,7 @@ public class ValidatorApiHandlerIntegrationTest { performanceTracker, spec, forkChoiceTrigger, + forkChoiceNotifier, syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager); diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java index ca1079b71ad..942edcb375e 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java @@ -57,6 +57,7 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.ValidateableSyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; import tech.pegasys.teku.spec.datastructures.util.AttestationProcessingResult; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; @@ -68,6 +69,7 @@ import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AttestationManager; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeMessagePool; @@ -76,7 +78,6 @@ import tech.pegasys.teku.sync.events.SyncStateProvider; import tech.pegasys.teku.validator.api.AttesterDuties; import tech.pegasys.teku.validator.api.AttesterDuty; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.NodeSyncingException; import tech.pegasys.teku.validator.api.ProposerDuties; @@ -116,6 +117,7 @@ public class ValidatorApiHandler implements ValidatorApiChannel { private final SyncCommitteeMessagePool syncCommitteeMessagePool; private final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager; private final SyncCommitteeContributionPool syncCommitteeContributionPool; + private final ForkChoiceNotifier forkChoiceNotifier; public ValidatorApiHandler( final ChainDataProvider chainDataProvider, @@ -132,6 +134,7 @@ public ValidatorApiHandler( final PerformanceTracker performanceTracker, final Spec spec, final ForkChoiceTrigger forkChoiceTrigger, + final ForkChoiceNotifier forkChoiceNotifier, final SyncCommitteeMessagePool syncCommitteeMessagePool, final SyncCommitteeContributionPool syncCommitteeContributionPool, final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager) { @@ -152,6 +155,7 @@ public ValidatorApiHandler( this.syncCommitteeMessagePool = syncCommitteeMessagePool; this.syncCommitteeContributionPool = syncCommitteeContributionPool; this.syncCommitteeSubscriptionManager = syncCommitteeSubscriptionManager; + this.forkChoiceNotifier = forkChoiceNotifier; } @Override @@ -593,7 +597,9 @@ public SafeFuture sendSignedContributionAndProofs( @Override public void prepareBeaconProposer( - Collection beaconPreparableProposers) {} + Collection beaconPreparableProposers) { + forkChoiceNotifier.onUpdatePreparableProposers(beaconPreparableProposers); + } private Optional fromInternalValidationResult( final InternalValidationResult internalValidationResult, final int resultIndex) { diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java index 9f3946a4366..92e597deebd 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java @@ -75,6 +75,7 @@ import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AttestationManager; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeMessagePool; @@ -119,6 +120,7 @@ class ValidatorApiHandlerTest { private final ChainDataProvider chainDataProvider = mock(ChainDataProvider.class); private final DutyMetrics dutyMetrics = mock(DutyMetrics.class); private final ForkChoiceTrigger forkChoiceTrigger = mock(ForkChoiceTrigger.class); + private final ForkChoiceNotifier forkChoiceNotifier = mock(ForkChoiceNotifier.class); private final SyncCommitteeMessagePool syncCommitteeMessagePool = mock(SyncCommitteeMessagePool.class); private final SyncCommitteeContributionPool syncCommitteeContributionPool = @@ -142,6 +144,7 @@ class ValidatorApiHandlerTest { performanceTracker, spec, forkChoiceTrigger, + forkChoiceNotifier, syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager); @@ -387,6 +390,7 @@ void getSyncCommitteeDuties_shouldNotUseEpochPriorToFork() { performanceTracker, spec, forkChoiceTrigger, + forkChoiceNotifier, syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager); diff --git a/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/validator/PostPrepareBeaconProposer.java b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/validator/PostPrepareBeaconProposer.java index bd841cd4cfa..a0463589616 100644 --- a/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/validator/PostPrepareBeaconProposer.java +++ b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/validator/PostPrepareBeaconProposer.java @@ -27,11 +27,7 @@ import io.javalin.plugin.openapi.annotations.OpenApiContent; import io.javalin.plugin.openapi.annotations.OpenApiRequestBody; import io.javalin.plugin.openapi.annotations.OpenApiResponse; -import java.util.Arrays; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import java.util.List; import org.jetbrains.annotations.NotNull; import tech.pegasys.teku.api.DataProvider; import tech.pegasys.teku.api.ValidatorDataProvider; @@ -43,15 +39,17 @@ public class PostPrepareBeaconProposer extends AbstractHandler implements Handler { public static final String ROUTE = "/eth/v1/validator/prepare_beacon_proposer"; - private static final Logger LOG = LogManager.getLogger(); + + private final ValidatorDataProvider validatorDataProvider; public PostPrepareBeaconProposer(final DataProvider provider, final JsonProvider jsonProvider) { this(provider.getValidatorDataProvider(), jsonProvider); } public PostPrepareBeaconProposer( - final ValidatorDataProvider provider, final JsonProvider jsonProvider) { + final ValidatorDataProvider validatorDataProvider, final JsonProvider jsonProvider) { super(jsonProvider); + this.validatorDataProvider = validatorDataProvider; } @OpenApi( @@ -78,13 +76,7 @@ public void handle(@NotNull final Context ctx) throws Exception { final BeaconPreparableProposer[] request = parseRequestBody(ctx.body(), BeaconPreparableProposer[].class); - LOG.trace( - "received: {}", - (Supplier) - () -> - Arrays.stream(request) - .map(BeaconPreparableProposer::toString) - .collect(Collectors.joining(","))); + validatorDataProvider.prepareBeaconProposer(List.of(request)); ctx.status(SC_OK); } catch (final IllegalArgumentException e) { diff --git a/data/provider/src/main/java/tech/pegasys/teku/api/ValidatorDataProvider.java b/data/provider/src/main/java/tech/pegasys/teku/api/ValidatorDataProvider.java index af294d9c202..8cab852cae0 100644 --- a/data/provider/src/main/java/tech/pegasys/teku/api/ValidatorDataProvider.java +++ b/data/provider/src/main/java/tech/pegasys/teku/api/ValidatorDataProvider.java @@ -46,6 +46,7 @@ import tech.pegasys.teku.api.schema.altair.SignedContributionAndProof; import tech.pegasys.teku.api.schema.altair.SyncCommitteeMessage; import tech.pegasys.teku.api.schema.altair.SyncCommitteeSubnetSubscription; +import tech.pegasys.teku.api.schema.merge.BeaconPreparableProposer; import tech.pegasys.teku.api.schema.merge.SignedBeaconBlockMerge; import tech.pegasys.teku.api.schema.phase0.SignedBeaconBlockPhase0; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -368,6 +369,17 @@ public SafeFuture sendContributionAndProofs( .collect(toList())); } + public void prepareBeaconProposer( + Collection beaconPreparableProposers) { + List + internalBeaconPreparableProposer = + beaconPreparableProposers.stream() + .map(BeaconPreparableProposer::asInternalBeaconPreparableProposer) + .collect(Collectors.toUnmodifiableList()); + + validatorApiChannel.prepareBeaconProposer(internalBeaconPreparableProposer); + } + public boolean isPhase0Slot(final UInt64 slot) { return spec.atSlot(slot).getMilestone() == SpecMilestone.PHASE0; } diff --git a/data/serializer/src/main/java/tech/pegasys/teku/api/schema/merge/BeaconPreparableProposer.java b/data/serializer/src/main/java/tech/pegasys/teku/api/schema/merge/BeaconPreparableProposer.java index 97faebd6ffc..cfa8d447ec8 100644 --- a/data/serializer/src/main/java/tech/pegasys/teku/api/schema/merge/BeaconPreparableProposer.java +++ b/data/serializer/src/main/java/tech/pegasys/teku/api/schema/merge/BeaconPreparableProposer.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.MoreObjects; import io.swagger.v3.oas.annotations.media.Schema; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.ssz.type.Bytes20; @@ -45,11 +44,11 @@ public BeaconPreparableProposer( this.fee_recipient = fee_recipient; } - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("validator_index", validator_index) - .add("fee_recipient", fee_recipient) - .toString(); + public static tech.pegasys.teku.spec.datastructures.operations.versions.merge + .BeaconPreparableProposer + asInternalBeaconPreparableProposer(BeaconPreparableProposer beaconPreparableProposer) { + return new tech.pegasys.teku.spec.datastructures.operations.versions.merge + .BeaconPreparableProposer( + beaconPreparableProposer.validator_index, beaconPreparableProposer.fee_recipient); } } diff --git a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionEngineChannelImpl.java b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionEngineChannelImpl.java index 75952078703..fc5c98b7a36 100644 --- a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionEngineChannelImpl.java +++ b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionEngineChannelImpl.java @@ -50,7 +50,7 @@ public static ExecutionEngineChannelImpl create(String eeEndpoint, Spec spec) { return new ExecutionEngineChannelImpl(new Web3JExecutionEngineClient(eeEndpoint), spec); } - public ExecutionEngineChannelImpl(ExecutionEngineClient executionEngineClient, Spec spec) { + private ExecutionEngineChannelImpl(ExecutionEngineClient executionEngineClient, Spec spec) { this.spec = spec; this.executionEngineClient = executionEngineClient; } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java index e6c6e1172d4..607f184503a 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java @@ -292,6 +292,10 @@ public UInt64 computeEpochAtSlot(final UInt64 slot) { return atSlot(slot).miscHelpers().computeEpochAtSlot(slot); } + public UInt64 computeTimeAtSlot(BeaconState state, UInt64 slot) { + return atSlot(slot).miscHelpers().computeTimeAtSlot(state, slot); + } + public Bytes computeSigningRoot(BeaconBlock block, Bytes32 domain) { return atBlock(block).miscHelpers().computeSigningRoot(block, domain); } @@ -328,6 +332,10 @@ public Bytes32 getDomain( .getDomain(domainType, epoch, fork, genesisValidatorsRoot); } + public Bytes32 getRandaoMix(final BeaconState state, final UInt64 epoch) { + return atEpoch(epoch).beaconStateAccessors().getRandaoMix(state, epoch); + } + public boolean verifyProposerSlashingSignature( BeaconState state, ProposerSlashing proposerSlashing, diff --git a/validator/api/src/main/java/tech/pegasys/teku/validator/api/BeaconPreparableProposer.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/operations/versions/merge/BeaconPreparableProposer.java similarity index 96% rename from validator/api/src/main/java/tech/pegasys/teku/validator/api/BeaconPreparableProposer.java rename to ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/operations/versions/merge/BeaconPreparableProposer.java index 1b7ac8724c5..db7945de00a 100644 --- a/validator/api/src/main/java/tech/pegasys/teku/validator/api/BeaconPreparableProposer.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/operations/versions/merge/BeaconPreparableProposer.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. */ -package tech.pegasys.teku.validator.api; +package tech.pegasys.teku.spec.datastructures.operations.versions.merge; import com.google.common.base.MoreObjects; import java.util.Objects; diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceState.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceState.java index ecf46d93cfa..422a89d7510 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceState.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceState.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.spec.executionengine; import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; import java.util.Optional; import org.apache.tuweni.bytes.Bytes32; @@ -54,4 +55,19 @@ public String toString() { .add("finalizedBlockHash", finalizedBlockHash) .toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ForkChoiceState)) return false; + ForkChoiceState that = (ForkChoiceState) o; + return headBlockHash == that.headBlockHash + && safeBlockHash == that.safeBlockHash + && finalizedBlockHash == that.finalizedBlockHash; + } + + @Override + public int hashCode() { + return Objects.hashCode(headBlockHash, safeBlockHash, finalizedBlockHash); + } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceUpdatedResult.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceUpdatedResult.java index 543ed558287..9ab601a8f3c 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceUpdatedResult.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceUpdatedResult.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.spec.executionengine; +import com.google.common.base.MoreObjects; import java.util.Objects; import java.util.Optional; import tech.pegasys.teku.ssz.type.Bytes8; @@ -46,4 +47,12 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(status, payloadId); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("status", status) + .add("payloadId", payloadId) + .toString(); + } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/PayloadAttributes.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/PayloadAttributes.java index cf79194a26d..68e1d359f64 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/PayloadAttributes.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/PayloadAttributes.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.spec.executionengine; import com.google.common.base.MoreObjects; +import java.util.Objects; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.ssz.type.Bytes20; @@ -41,6 +42,25 @@ public Bytes20 getFeeRecipient() { return feeRecipient; } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PayloadAttributes that = (PayloadAttributes) o; + return Objects.equals(timestamp, that.timestamp) + && Objects.equals(random, that.random) + && Objects.equals(feeRecipient, that.feeRecipient); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, random, feeRecipient); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/helpers/MiscHelpers.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/helpers/MiscHelpers.java index 946ebc19e5f..8cb4e3cc29c 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/helpers/MiscHelpers.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/helpers/MiscHelpers.java @@ -103,6 +103,11 @@ public UInt64 computeStartSlotAtEpoch(UInt64 epoch) { return epoch.times(specConfig.getSlotsPerEpoch()); } + public UInt64 computeTimeAtSlot(BeaconState state, UInt64 slot) { + UInt64 slotsSinceGenesis = slot.minus(SpecConfig.GENESIS_SLOT); + return state.getGenesis_time().plus(slotsSinceGenesis.times(specConfig.getSecondsPerSlot())); + } + public boolean isSlotAtNthEpochBoundary( final UInt64 blockSlot, final UInt64 parentSlot, final int n) { checkArgument(n > 0, "Parameter n must be greater than 0"); diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java new file mode 100644 index 00000000000..410592b251d --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java @@ -0,0 +1,267 @@ +/* + * Copyright 2021 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.forkchoice; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.async.AsyncRunnerFactory; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.async.eventthread.AsyncRunnerEventThread; +import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.config.SpecConfig; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; +import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.executionengine.ExecutionEngineChannel; +import tech.pegasys.teku.spec.executionengine.ForkChoiceState; +import tech.pegasys.teku.spec.executionengine.ForkChoiceUpdatedResult; +import tech.pegasys.teku.spec.executionengine.PayloadAttributes; +import tech.pegasys.teku.ssz.type.Bytes20; +import tech.pegasys.teku.ssz.type.Bytes8; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class ForkChoiceNotifier { + private static final long MAX_PROPOSER_SEEN_EPOCHS = 2; + private static final Logger LOG = LogManager.getLogger(); + + private final EventThread eventThread; + private final Spec spec; + private final ExecutionEngineChannel executionEngineChannel; + private final RecentChainData recentChainData; + + private final Map proposerInfoByValidatorIndex = new HashMap<>(); + + private Optional forkChoiceState = Optional.empty(); + private Optional payloadAttributes = Optional.empty(); + + private Optional lastSentForkChoiceState = Optional.empty(); + private Optional lastSentPayloadAttributes = Optional.empty(); + private Optional lastPayloadId = Optional.empty(); + + ForkChoiceNotifier( + final EventThread eventThread, + final Spec spec, + final ExecutionEngineChannel executionEngineChannel, + final RecentChainData recentChainData) { + this.eventThread = eventThread; + this.spec = spec; + this.executionEngineChannel = executionEngineChannel; + this.recentChainData = recentChainData; + } + + public static ForkChoiceNotifier create( + final AsyncRunnerFactory asyncRunnerFactory, + final Spec spec, + final ExecutionEngineChannel executionEngineChannel, + final RecentChainData recentChainData) { + final AsyncRunnerEventThread eventThread = + new AsyncRunnerEventThread("forkChoiceNotifier", asyncRunnerFactory); + return new ForkChoiceNotifier(eventThread, spec, executionEngineChannel, recentChainData); + } + + public void onUpdatePreparableProposers(final Collection proposers) { + eventThread.execute(() -> internalUpdatePreparableProposers(proposers)); + } + + public void onForkChoiceUpdated(final ForkChoiceState forkChoiceState) { + eventThread.execute(() -> internalForkChoiceUpdated(forkChoiceState)); + } + + public void onAttestationsDue(final UInt64 slot) { + eventThread.execute(() -> internalAttestationsDue(slot)); + } + + public SafeFuture> getPayloadId() { + return eventThread.execute(() -> lastPayloadId); + } + + private void internalUpdatePreparableProposers( + final Collection proposers) { + eventThread.checkOnEventThread(); + // Default to the genesis slot if we're pre-genesis. + final UInt64 currentSlot = recentChainData.getCurrentSlot().orElse(SpecConfig.GENESIS_SLOT); + + // Remove expired validators + proposerInfoByValidatorIndex.values().removeIf(info -> info.hasExpired(currentSlot)); + + // Update validators + final UInt64 expirySlot = + currentSlot.plus(spec.getSlotsPerEpoch(currentSlot) * MAX_PROPOSER_SEEN_EPOCHS); + for (BeaconPreparableProposer proposer : proposers) { + proposerInfoByValidatorIndex.put( + proposer.getValidatorIndex(), new ProposerInfo(expirySlot, proposer.getFeeRecipient())); + } + + // Update payload attributes in case we now need to propose the next block + updatePayloadAttributes(currentSlot.plus(1)); + } + + private void internalForkChoiceUpdated(final ForkChoiceState forkChoiceState) { + eventThread.checkOnEventThread(); + + if (this.forkChoiceState.isPresent() && this.forkChoiceState.get().equals(forkChoiceState)) { + // No change required. + return; + } + + this.forkChoiceState = Optional.of(forkChoiceState); + recentChainData + .getCurrentSlot() + .ifPresent(currentSlot -> updatePayloadAttributes(currentSlot.plus(1))); + sendForkChoiceUpdated(); + } + + private void internalAttestationsDue(final UInt64 slot) { + eventThread.checkOnEventThread(); + // Assume `slot` is empty and check if we need to prepare to propose in the next slot + updatePayloadAttributes(slot.plus(1)); + } + + private void sendForkChoiceUpdated() { + if (lastSentForkChoiceState.equals(forkChoiceState) + && lastSentPayloadAttributes.equals(payloadAttributes)) { + // No change to previously sent values so no need to resend + return; + } + forkChoiceState.ifPresentOrElse( + forkChoiceState -> { + lastSentForkChoiceState = this.forkChoiceState; + lastSentPayloadAttributes = payloadAttributes; + // Previous payload is no longer useful as we've moved on to prepping the next block + lastPayloadId = Optional.empty(); + executionEngineChannel + .forkChoiceUpdated(forkChoiceState, payloadAttributes) + .thenAcceptAsync( + result -> handleForkChoiceResult(forkChoiceState, result), eventThread) + .finish(error -> LOG.error("Failed to notify EL of fork choice update", error)); + }, + () -> + LOG.warn( + "Could not notify EL of fork choice update because fork choice state is not yet known")); + } + + private void updatePayloadAttributes(final UInt64 blockSlot) { + calculatePayloadAttributes(blockSlot) + .thenAcceptAsync( + newPayloadAttributes -> updatePayloadAttributes(blockSlot, newPayloadAttributes), + eventThread) + .finish( + error -> + LOG.error("Failed to calculate payload attributes for slot {}", blockSlot, error)); + } + + private void updatePayloadAttributes( + final UInt64 blockSlot, final Optional newPayloadAttributes) { + eventThread.checkOnEventThread(); + if (payloadAttributes.equals(newPayloadAttributes)) { + // No change, nothing to do. + return; + } + final UInt64 currentSlot = recentChainData.getCurrentSlot().orElse(UInt64.ZERO); + if (currentSlot.isGreaterThanOrEqualTo(blockSlot)) { + // Slot has already progressed so this update is too late, just drop it. + LOG.warn( + "Payload attribute calculation for slot {} took too long. Slot was already {}", + blockSlot, + currentSlot); + return; + } + payloadAttributes = newPayloadAttributes; + sendForkChoiceUpdated(); + } + + private void handleForkChoiceResult( + final ForkChoiceState forkChoiceState, final ForkChoiceUpdatedResult result) { + eventThread.checkOnEventThread(); + if (lastSentForkChoiceState.isEmpty() + || !lastSentForkChoiceState.get().equals(forkChoiceState)) { + // Debug level because this is quite likely to happen when syncing + LOG.debug("Execution engine did not return payload ID in time, discarding"); + return; + } + lastPayloadId = result.getPayloadId(); + } + + private SafeFuture> calculatePayloadAttributes( + final UInt64 blockSlot) { + eventThread.checkOnEventThread(); + if (forkChoiceState.isEmpty()) { + // No known fork choice state so no point calculating payload attributes + return SafeFuture.completedFuture(Optional.empty()); + } + final UInt64 epoch = spec.computeEpochAtSlot(blockSlot); + // TODO: Return empty if chain head is not same as optimistic chain head + // TODO: Alternatively just limit how many epochs of empty slots we'll process to avoid burning + // CPU pointlessly during optimistic sync + return getStateInEpoch(epoch) + .thenApplyAsync( + maybeState -> calculatePayloadAttributes(blockSlot, epoch, maybeState), eventThread); + } + + private Optional calculatePayloadAttributes( + final UInt64 blockSlot, final UInt64 epoch, final Optional maybeState) { + eventThread.checkOnEventThread(); + if (maybeState.isEmpty()) { + return Optional.empty(); + } + final BeaconState state = maybeState.get(); + final UInt64 proposerIndex = UInt64.valueOf(spec.getBeaconProposerIndex(state, blockSlot)); + final ProposerInfo proposerInfo = proposerInfoByValidatorIndex.get(proposerIndex); + if (proposerInfo == null) { + // Proposer is not one of our validators. No need to propose a block. + return Optional.empty(); + } + final UInt64 timestamp = spec.computeTimeAtSlot(state, blockSlot); + final Bytes32 random = spec.getRandaoMix(state, epoch); + return Optional.of(new PayloadAttributes(timestamp, random, proposerInfo.feeRecipient)); + } + + private SafeFuture> getStateInEpoch(final UInt64 requiredEpoch) { + final Optional chainHead = recentChainData.getChainHead(); + if (chainHead.isEmpty()) { + return SafeFuture.completedFuture(Optional.empty()); + } + final StateAndBlockSummary head = chainHead.get(); + if (spec.computeEpochAtSlot(head.getSlot()).equals(requiredEpoch)) { + return SafeFuture.completedFuture(Optional.of(head.getState())); + } else { + // TODO: Chain head is from a prior epoch, we want to avoid processing a lot of empty slots if + // we're using optimistic sync as that would just waste CPU. + return recentChainData.retrieveStateAtSlot( + new SlotAndBlockRoot(spec.computeStartSlotAtEpoch(requiredEpoch), head.getRoot())); + } + } + + private static class ProposerInfo { + UInt64 expirySlot; + Bytes20 feeRecipient; + + public ProposerInfo(UInt64 expirySlot, Bytes20 feeRecipient) { + this.expirySlot = expirySlot; + this.feeRecipient = feeRecipient; + } + + public boolean hasExpired(final UInt64 currentSlot) { + return currentSlot.isGreaterThanOrEqualTo(expirySlot); + } + } +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java new file mode 100644 index 00000000000..8b5da1621d9 --- /dev/null +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java @@ -0,0 +1,259 @@ +/* + * Copyright 2021 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.forkchoice; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture; + +import java.util.List; +import java.util.Optional; +import org.apache.tuweni.bytes.Bytes32; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.protoarray.ForkChoiceStrategy; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecFactory; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.executionengine.ExecutePayloadResult; +import tech.pegasys.teku.spec.executionengine.ExecutionEngineChannel; +import tech.pegasys.teku.spec.executionengine.ExecutionPayloadStatus; +import tech.pegasys.teku.spec.executionengine.ForkChoiceState; +import tech.pegasys.teku.spec.executionengine.ForkChoiceUpdatedResult; +import tech.pegasys.teku.spec.executionengine.ForkChoiceUpdatedStatus; +import tech.pegasys.teku.spec.executionengine.PayloadAttributes; +import tech.pegasys.teku.spec.logic.common.block.AbstractBlockProcessor; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.ssz.type.Bytes20; +import tech.pegasys.teku.ssz.type.Bytes8; +import tech.pegasys.teku.storage.client.RecentChainData; +import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder; +import tech.pegasys.teku.storage.storageSystem.StorageSystem; + +class ForkChoiceNotifierTest { + + private final InlineEventThread eventThread = new InlineEventThread(); + private final Spec spec = TestSpecFactory.createMinimalMerge(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); + + private final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec); + private final RecentChainData recentChainData = storageSystem.recentChainData(); + private ForkChoiceStrategy forkChoiceStrategy; + + private final ExecutionEngineChannel executionEngineChannel = mock(ExecutionEngineChannel.class); + + private final ForkChoiceNotifier notifier = + new ForkChoiceNotifier(eventThread, spec, executionEngineChannel, recentChainData); + + @BeforeAll + public static void initSession() { + AbstractBlockProcessor.BLS_VERIFY_DEPOSIT = false; + } + + @AfterAll + public static void resetSession() { + AbstractBlockProcessor.BLS_VERIFY_DEPOSIT = true; + } + + @BeforeEach + void setUp() { + storageSystem.chainUpdater().initializeGenesis(false); + forkChoiceStrategy = recentChainData.getForkChoiceStrategy().orElseThrow(); + when(executionEngineChannel.executePayload(any())) + .thenReturn( + SafeFuture.completedFuture( + new ExecutePayloadResult( + ExecutionPayloadStatus.VALID, Optional.empty(), Optional.empty()))); + when(executionEngineChannel.forkChoiceUpdated(any(), any())) + .thenReturn( + SafeFuture.completedFuture( + new ForkChoiceUpdatedResult(ForkChoiceUpdatedStatus.SUCCESS, Optional.empty()))); + } + + @Test + void onForkChoiceUpdated_shouldSendNotificationToExecutionEngine() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); + } + + @Test + void onForkChoiceUpdated_shouldSendNotificationWithPayloadAttributesForNextProposer() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.ONE; + final PayloadAttributes payloadAttributes = withProposerForSlot(headState, blockSlot); + + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel) + .forkChoiceUpdated(forkChoiceState, Optional.of(payloadAttributes)); + } + + @Test + void onForkChoiceUpdated_shouldSendNotificationWithoutPayloadAttributesWhenNotProposingNext() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.ONE; + final int notTheNextProposer = spec.getBeaconProposerIndex(headState, blockSlot) + 1; + notifier.onUpdatePreparableProposers( + List.of( + new BeaconPreparableProposer( + UInt64.valueOf(notTheNextProposer), dataStructureUtil.randomBytes20()))); + + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); + } + + @Test + void onAttestationsDue_shouldNotSendUpdateIfNotChanged() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); + + notifier.onAttestationsDue(UInt64.ZERO); + verifyNoMoreInteractions(executionEngineChannel); + + notifier.onAttestationsDue(UInt64.ONE); + verifyNoMoreInteractions(executionEngineChannel); + } + + @Test + void onAttestationsDue_shouldSendUpdateIfWeAreDueToProposeNext() { + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.valueOf(2); + final PayloadAttributes payloadAttributes = withProposerForSlot(headState, blockSlot); + + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + notifier.onForkChoiceUpdated(forkChoiceState); + // Not proposing block 1 so no payload attributes + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); + + notifier.onAttestationsDue(UInt64.ZERO); + verifyNoMoreInteractions(executionEngineChannel); + + // Slot 1 is now assumed empty so prepare to propose in slot 2 + notifier.onAttestationsDue(UInt64.ONE); + verify(executionEngineChannel) + .forkChoiceUpdated(forkChoiceState, Optional.of(payloadAttributes)); + } + + @Test + void shouldUseStateFromCorrectEpochToCalculateBlockProposer() { + final int firstSlotOfNextEpoch = spec.getSlotsPerEpoch(UInt64.ZERO); + final UInt64 blockSlot = UInt64.valueOf(firstSlotOfNextEpoch); + final UInt64 slotBeforeBlock = blockSlot.minus(1); + final SignedBlockAndState headBlockAndState = + storageSystem.chainUpdater().advanceChainUntil(firstSlotOfNextEpoch - 1); + storageSystem.chainUpdater().updateBestBlock(headBlockAndState); + final BeaconState headState = headBlockAndState.getState(); + storageSystem.chainUpdater().setTime(spec.computeTimeAtSlot(headState, slotBeforeBlock)); + assertThat(recentChainData.getCurrentSlot()).contains(slotBeforeBlock); + + final PayloadAttributes payloadAttributes = withProposerForSlot(blockSlot); + + notifier.onForkChoiceUpdated(getCurrentForkChoiceState()); + + verify(executionEngineChannel) + .forkChoiceUpdated(getCurrentForkChoiceState(), Optional.of(payloadAttributes)); + } + + @Test + void onUpdatePreparableProposers_shouldSendNewNotificationWhenProposerAdded() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.ONE; + + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); + + final PayloadAttributes payloadAttributes = withProposerForSlot(headState, blockSlot); + verify(executionEngineChannel) + .forkChoiceUpdated(forkChoiceState, Optional.of(payloadAttributes)); + } + + @Test + void getPayloadId_shouldReturnLatestPayloadId() { + final Bytes8 payloadId = dataStructureUtil.randomBytes8(); + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.ONE; + final PayloadAttributes payloadAttributes = withProposerForSlot(headState, blockSlot); + + final SafeFuture responseFuture = new SafeFuture<>(); + when(executionEngineChannel.forkChoiceUpdated(forkChoiceState, Optional.of(payloadAttributes))) + .thenReturn(responseFuture); + + notifier.onForkChoiceUpdated(forkChoiceState); + + // Initially has no payload ID. + assertThatSafeFuture(notifier.getPayloadId()).isCompletedWithEmptyOptional(); + + // But becomes available once we receive the response + responseFuture.complete( + new ForkChoiceUpdatedResult(ForkChoiceUpdatedStatus.SUCCESS, Optional.of(payloadId))); + assertThatSafeFuture(notifier.getPayloadId()).isCompletedWithOptionalContaining(payloadId); + } + + private PayloadAttributes withProposerForSlot(final UInt64 blockSlot) { + final Bytes32 bestBlockRoot = recentChainData.getBestBlockRoot().orElseThrow(); + final BeaconState state = + recentChainData + .retrieveStateAtSlot(new SlotAndBlockRoot(blockSlot, bestBlockRoot)) + .join() + .orElseThrow(); + return withProposerForSlot(state, blockSlot); + } + + private PayloadAttributes withProposerForSlot( + final BeaconState headState, final UInt64 blockSlot) { + final int block2Proposer = spec.getBeaconProposerIndex(headState, blockSlot); + final PayloadAttributes payloadAttributes = getExpectedPayloadAttributes(headState, blockSlot); + notifier.onUpdatePreparableProposers( + List.of( + new BeaconPreparableProposer( + UInt64.valueOf(block2Proposer), payloadAttributes.getFeeRecipient()))); + return payloadAttributes; + } + + private PayloadAttributes getExpectedPayloadAttributes( + final BeaconState headState, final UInt64 blockSlot) { + final Bytes20 feeRecipient = dataStructureUtil.randomBytes20(); + final UInt64 timestamp = spec.computeTimeAtSlot(headState, blockSlot); + final Bytes32 random = spec.getRandaoMix(headState, UInt64.ZERO); + return new PayloadAttributes(timestamp, random, feeRecipient); + } + + private ForkChoiceState getCurrentForkChoiceState() { + final Bytes32 headBlockRoot = recentChainData.getBestBlockRoot().orElseThrow(); + final Bytes32 headExecutionHash = + forkChoiceStrategy.executionBlockHash(headBlockRoot).orElseThrow(); + final Bytes32 finalizedRoot = recentChainData.getFinalizedCheckpoint().orElseThrow().getRoot(); + final Bytes32 finalizedExecutionHash = + forkChoiceStrategy.executionBlockHash(finalizedRoot).orElseThrow(); + + return new ForkChoiceState(headExecutionHash, headExecutionHash, finalizedExecutionHash); + } +} diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index d8d3c9c61ac..80516d5b145 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -83,6 +83,7 @@ import tech.pegasys.teku.statetransition.block.BlockImporter; import tech.pegasys.teku.statetransition.block.BlockManager; import tech.pegasys.teku.statetransition.forkchoice.ForkChoice; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.genesis.GenesisHandler; import tech.pegasys.teku.statetransition.synccommittee.SignedContributionAndProofValidator; @@ -181,6 +182,8 @@ public class BeaconChainController extends Service implements TimeTickChannel { private volatile ActiveValidatorTracker activeValidatorTracker; private volatile AttestationTopicSubscriber attestationTopicSubscriber; private volatile SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager; + private volatile ForkChoiceNotifier forkChoiceNotifier; + private volatile ExecutionEngineChannel executionEngine; private UInt64 genesisTimeTracker = ZERO; private BlockManager blockManager; @@ -302,6 +305,7 @@ private SafeFuture initialize() { } public void initAll() { + initExecutionEngine(); initForkChoice(); initBlockImporter(); initCombinedChainDataClient(); @@ -319,6 +323,7 @@ public void initAll() { initSyncCommitteePools(); initP2PNetwork(); initSyncService(); + initForkChoiceNotifier(); initSlotProcessor(); initMetrics(); initAttestationTopicSubscriber(); @@ -329,6 +334,10 @@ public void initAll() { initOperationsReOrgManager(); } + private void initExecutionEngine() { + executionEngine = eventChannels.getPublisher(ExecutionEngineChannel.class, beaconAsyncRunner); + } + private void initPendingBlocks() { LOG.debug("BeaconChainController.initPendingBlocks()"); pendingBlocks = PendingPool.createForBlocks(spec); @@ -509,6 +518,7 @@ public void initValidatorApiHandler() { performanceTracker, spec, forkChoiceTrigger, + forkChoiceNotifier, syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager); @@ -701,6 +711,7 @@ private void initSlotProcessor() { recentChainData, syncService.getForwardSync(), forkChoiceTrigger, + forkChoiceNotifier, p2pNetwork, slotEventsChannelPublisher, new EpochCachePrimer(spec, recentChainData)); @@ -766,7 +777,7 @@ public void initBlockImporter() { recentChainData, forkChoice, weakSubjectivityValidator, - ExecutionEngineChannel.NOOP); + executionEngine); } public void initBlockManager() { @@ -819,6 +830,12 @@ private void initOperationsReOrgManager() { eventChannels.subscribe(ChainHeadChannel.class, operationsReOrgManager); } + private void initForkChoiceNotifier() { + LOG.debug("BeaconChainController.initForkChoiceNotifier()"); + forkChoiceNotifier = + ForkChoiceNotifier.create(asyncRunnerFactory, spec, executionEngine, recentChainData); + } + private void setupInitialState(final RecentChainData client) { final Optional initialAnchor = wsInitializer.loadInitialAnchorPoint( diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java index 15cfabafbb6..a9cc266f172 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java @@ -24,6 +24,7 @@ import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blocks.NodeSlot; import tech.pegasys.teku.statetransition.EpochCachePrimer; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.sync.forward.ForwardSync; @@ -34,6 +35,7 @@ public class SlotProcessor { private final RecentChainData recentChainData; private final ForwardSync syncService; private final ForkChoiceTrigger forkChoiceTrigger; + private final ForkChoiceNotifier forkChoiceNotifier; private final Eth2P2PNetwork p2pNetwork; private final SlotEventsChannel slotEventsChannelPublisher; private final NodeSlot nodeSlot = new NodeSlot(ZERO); @@ -50,6 +52,7 @@ public class SlotProcessor { final RecentChainData recentChainData, final ForwardSync syncService, final ForkChoiceTrigger forkChoiceTrigger, + final ForkChoiceNotifier forkChoiceNotifier, final Eth2P2PNetwork p2pNetwork, final SlotEventsChannel slotEventsChannelPublisher, final EpochCachePrimer epochCachePrimer, @@ -58,6 +61,7 @@ public class SlotProcessor { this.recentChainData = recentChainData; this.syncService = syncService; this.forkChoiceTrigger = forkChoiceTrigger; + this.forkChoiceNotifier = forkChoiceNotifier; this.p2pNetwork = p2pNetwork; this.slotEventsChannelPublisher = slotEventsChannelPublisher; this.epochCachePrimer = epochCachePrimer; @@ -69,6 +73,7 @@ public SlotProcessor( final RecentChainData recentChainData, final ForwardSync syncService, final ForkChoiceTrigger forkChoiceTrigger, + final ForkChoiceNotifier forkChoiceNotifier, final Eth2P2PNetwork p2pNetwork, final SlotEventsChannel slotEventsChannelPublisher, final EpochCachePrimer epochCachePrimer) { @@ -77,6 +82,7 @@ public SlotProcessor( recentChainData, syncService, forkChoiceTrigger, + forkChoiceNotifier, p2pNetwork, slotEventsChannelPublisher, epochCachePrimer, @@ -208,6 +214,7 @@ private void processSlotStart(final UInt64 nodeEpoch) { private void processSlotAttestation(final UInt64 nodeEpoch) { onTickSlotAttestation = nodeSlot.getValue(); forkChoiceTrigger.onAttestationsDueForSlot(onTickSlotAttestation); + forkChoiceNotifier.onAttestationsDue(onTickSlotAttestation); recentChainData .getChainHead() .ifPresent( diff --git a/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java b/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java index 120c2ffc4ac..6ea85156a31 100644 --- a/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java +++ b/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java @@ -42,6 +42,7 @@ import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.EpochCachePrimer; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.server.StateStorageMode; @@ -62,6 +63,7 @@ public class SlotProcessorTest { private final ForwardSync syncService = mock(ForwardSync.class); private final ForkChoiceTrigger forkChoiceTrigger = mock(ForkChoiceTrigger.class); + private final ForkChoiceNotifier forkChoiceNotifier = mock(ForkChoiceNotifier.class); private final Eth2P2PNetwork p2pNetwork = mock(Eth2P2PNetwork.class); private final SlotEventsChannel slotEventsChannel = mock(SlotEventsChannel.class); private final EpochCachePrimer epochCachePrimer = mock(EpochCachePrimer.class); @@ -71,6 +73,7 @@ public class SlotProcessorTest { recentChainData, syncService, forkChoiceTrigger, + forkChoiceNotifier, p2pNetwork, slotEventsChannel, epochCachePrimer, @@ -287,6 +290,7 @@ void shouldPrecomputeEpochTransitionJustBeforeFirstSlotOfNextEpoch() { recentChainData, syncService, forkChoiceTrigger, + forkChoiceNotifier, p2pNetwork, slotEventsChannel, epochCachePrimer, diff --git a/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorApiChannel.java b/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorApiChannel.java index 644c3539a3c..6ed065e9ef7 100644 --- a/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorApiChannel.java +++ b/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorApiChannel.java @@ -34,6 +34,7 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; public interface ValidatorApiChannel extends ChannelInterface { diff --git a/validator/beaconnode/src/main/java/tech/pegasys/teku/validator/beaconnode/metrics/MetricRecordingValidatorApiChannel.java b/validator/beaconnode/src/main/java/tech/pegasys/teku/validator/beaconnode/metrics/MetricRecordingValidatorApiChannel.java index 8ec6791e73d..cef1613c70e 100644 --- a/validator/beaconnode/src/main/java/tech/pegasys/teku/validator/beaconnode/metrics/MetricRecordingValidatorApiChannel.java +++ b/validator/beaconnode/src/main/java/tech/pegasys/teku/validator/beaconnode/metrics/MetricRecordingValidatorApiChannel.java @@ -36,9 +36,9 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; import tech.pegasys.teku.validator.api.AttesterDuties; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.ProposerDuties; import tech.pegasys.teku.validator.api.SendSignedBlockResult; diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java index 42b84431ea4..e0118b53656 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java @@ -20,8 +20,8 @@ import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.ssz.type.Bytes20; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.ValidatorApiChannel; import tech.pegasys.teku.validator.api.ValidatorTimingChannel; import tech.pegasys.teku.validator.client.loader.OwnedValidators; @@ -32,6 +32,7 @@ public class BeaconProposerPreparer implements ValidatorTimingChannel { private final OwnedValidators validators; private final Spec spec; private final Bytes20 feeRecipient; + private boolean firstCallDone = false; public BeaconProposerPreparer( ValidatorApiChannel validatorApiChannel, @@ -48,7 +49,8 @@ public BeaconProposerPreparer( @Override public void onSlot(UInt64 slot) { - if (slot.mod(spec.getSlotsPerEpoch(slot)).isZero()) { + if (slot.mod(spec.getSlotsPerEpoch(slot)).isZero() || !firstCallDone) { + firstCallDone = true; validatorIndexProvider .getValidatorIndices(validators.getPublicKeys()) .thenApply( diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java index 62baa812d38..6a517820da9 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java @@ -17,9 +17,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.util.Collection; @@ -35,8 +35,8 @@ import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.TestSpecContext; import tech.pegasys.teku.spec.TestSpecInvocationContextProvider.SpecContext; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.ssz.type.Bytes20; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.ValidatorApiChannel; import tech.pegasys.teku.validator.client.loader.OwnedValidators; @@ -102,8 +102,11 @@ void should_callPrepareBeaconProposerAtBeginningOfEpoch(SpecContext specContext) @TestTemplate void should_notCallPrepareBeaconProposerAfterFirstSlotOfEpoch() { + beaconProposerPreparer.onSlot(UInt64.ZERO); + verify(validatorApiChannel).prepareBeaconProposer(any()); + beaconProposerPreparer.onSlot(UInt64.ONE); - verify(validatorApiChannel, never()).prepareBeaconProposer(any()); + verifyNoMoreInteractions(validatorApiChannel); } @TestTemplate diff --git a/validator/relaypublisher/src/main/java/tech/pegasys/teku/validator/relaypublisher/MultiPublishingValidatorApiChannel.java b/validator/relaypublisher/src/main/java/tech/pegasys/teku/validator/relaypublisher/MultiPublishingValidatorApiChannel.java index f81636dccb4..7437bac9939 100644 --- a/validator/relaypublisher/src/main/java/tech/pegasys/teku/validator/relaypublisher/MultiPublishingValidatorApiChannel.java +++ b/validator/relaypublisher/src/main/java/tech/pegasys/teku/validator/relaypublisher/MultiPublishingValidatorApiChannel.java @@ -40,9 +40,9 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; import tech.pegasys.teku.validator.api.AttesterDuties; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.ProposerDuties; import tech.pegasys.teku.validator.api.SendSignedBlockResult; diff --git a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/RemoteValidatorApiHandler.java b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/RemoteValidatorApiHandler.java index c1b82303674..5d28c920625 100644 --- a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/RemoteValidatorApiHandler.java +++ b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/RemoteValidatorApiHandler.java @@ -55,10 +55,10 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; import tech.pegasys.teku.validator.api.AttesterDuties; import tech.pegasys.teku.validator.api.AttesterDuty; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.ProposerDuties; import tech.pegasys.teku.validator.api.ProposerDuty;