From d70c760e6f313de4fc81dce14df7f691670e50d0 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Thu, 18 Nov 2021 21:53:41 +0100 Subject: [PATCH 1/6] initial implementation of forkchoice update call --- beacon/validator/build.gradle | 1 + .../ValidatorApiHandlerIntegrationTest.java | 3 +++ .../coordinator/ValidatorApiHandler.java | 10 +++++-- .../coordinator/ValidatorApiHandlerTest.java | 4 +++ .../validator/PostPrepareBeaconProposer.java | 20 +++++--------- .../teku/api/ValidatorDataProvider.java | 12 +++++++++ .../merge/BeaconPreparableProposer.java | 13 +++++----- .../ExecutionEngineChannelImpl.java | 2 +- .../merge}/BeaconPreparableProposer.java | 2 +- .../ForkChoiceUpdatedResult.java | 9 +++++++ .../beaconchain/BeaconChainController.java | 26 +++++++++++++++++++ .../services/beaconchain/SlotProcessor.java | 7 +++++ .../beaconchain/SlotProcessorTest.java | 4 +++ services/powchain/build.gradle | 1 + .../validator/api/ValidatorApiChannel.java | 1 + .../MetricRecordingValidatorApiChannel.java | 2 +- .../client/BeaconProposerPreparer.java | 2 +- .../client/BeaconProposerPreparerTest.java | 2 +- .../MultiPublishingValidatorApiChannel.java | 2 +- .../remote/RemoteValidatorApiHandler.java | 2 +- 20 files changed, 95 insertions(+), 30 deletions(-) rename {validator/api/src/main/java/tech/pegasys/teku/validator/api => ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/operations/versions/merge}/BeaconPreparableProposer.java (96%) diff --git a/beacon/validator/build.gradle b/beacon/validator/build.gradle index db5367c029a..a5f2c84cfed 100644 --- a/beacon/validator/build.gradle +++ b/beacon/validator/build.gradle @@ -15,6 +15,7 @@ dependencies { implementation project(':util') implementation project(':infrastructure:logging') implementation project(':pow') + implementation project(':services:powchain') implementation project(':ssz') implementation project(':sync') implementation project(':data:serializer') diff --git a/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java b/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java index c20d6ed5f48..df9e5969ea6 100644 --- a/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java +++ b/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java @@ -29,6 +29,7 @@ import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber; import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager; +import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; @@ -73,6 +74,7 @@ public class ValidatorApiHandlerIntegrationTest { private final BlockGossipChannel blockGossipChannel = mock(BlockGossipChannel.class); private final ChainDataProvider chainDataProvider = mock(ChainDataProvider.class); private final ForkChoiceTrigger forkChoiceTrigger = mock(ForkChoiceTrigger.class); + private final ForkChoiceNotifier forkChoiceNotifier = mock(ForkChoiceNotifier.class); private final ChainUpdater chainUpdater = storageSystem.chainUpdater(); private final SyncCommitteeMessagePool syncCommitteeMessagePool = @@ -97,6 +99,7 @@ public class ValidatorApiHandlerIntegrationTest { performanceTracker, spec, forkChoiceTrigger, + forkChoiceNotifier, syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager); diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java index ca1079b71ad..d2c995302c0 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java @@ -43,6 +43,7 @@ import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber; import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager; +import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecVersion; import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation; @@ -57,6 +58,7 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.ValidateableSyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; import tech.pegasys.teku.spec.datastructures.util.AttestationProcessingResult; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; @@ -76,7 +78,6 @@ import tech.pegasys.teku.sync.events.SyncStateProvider; import tech.pegasys.teku.validator.api.AttesterDuties; import tech.pegasys.teku.validator.api.AttesterDuty; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.NodeSyncingException; import tech.pegasys.teku.validator.api.ProposerDuties; @@ -116,6 +117,7 @@ public class ValidatorApiHandler implements ValidatorApiChannel { private final SyncCommitteeMessagePool syncCommitteeMessagePool; private final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager; private final SyncCommitteeContributionPool syncCommitteeContributionPool; + private final ForkChoiceNotifier forkChoiceNotifier; public ValidatorApiHandler( final ChainDataProvider chainDataProvider, @@ -132,6 +134,7 @@ public ValidatorApiHandler( final PerformanceTracker performanceTracker, final Spec spec, final ForkChoiceTrigger forkChoiceTrigger, + final ForkChoiceNotifier forkChoiceNotifier, final SyncCommitteeMessagePool syncCommitteeMessagePool, final SyncCommitteeContributionPool syncCommitteeContributionPool, final SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager) { @@ -152,6 +155,7 @@ public ValidatorApiHandler( this.syncCommitteeMessagePool = syncCommitteeMessagePool; this.syncCommitteeContributionPool = syncCommitteeContributionPool; this.syncCommitteeSubscriptionManager = syncCommitteeSubscriptionManager; + this.forkChoiceNotifier = forkChoiceNotifier; } @Override @@ -593,7 +597,9 @@ public SafeFuture sendSignedContributionAndProofs( @Override public void prepareBeaconProposer( - Collection beaconPreparableProposers) {} + Collection beaconPreparableProposers) { + forkChoiceNotifier.onUpdatePreparableProposers(beaconPreparableProposers); + } private Optional fromInternalValidationResult( final InternalValidationResult internalValidationResult, final int resultIndex) { diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java index 9f3946a4366..2f6b1b238e9 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java @@ -50,6 +50,7 @@ import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber; import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager; +import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.config.SpecConfig; @@ -119,6 +120,7 @@ class ValidatorApiHandlerTest { private final ChainDataProvider chainDataProvider = mock(ChainDataProvider.class); private final DutyMetrics dutyMetrics = mock(DutyMetrics.class); private final ForkChoiceTrigger forkChoiceTrigger = mock(ForkChoiceTrigger.class); + private final ForkChoiceNotifier forkChoiceNotifier = mock(ForkChoiceNotifier.class); private final SyncCommitteeMessagePool syncCommitteeMessagePool = mock(SyncCommitteeMessagePool.class); private final SyncCommitteeContributionPool syncCommitteeContributionPool = @@ -142,6 +144,7 @@ class ValidatorApiHandlerTest { performanceTracker, spec, forkChoiceTrigger, + forkChoiceNotifier, syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager); @@ -387,6 +390,7 @@ void getSyncCommitteeDuties_shouldNotUseEpochPriorToFork() { performanceTracker, spec, forkChoiceTrigger, + forkChoiceNotifier, syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager); diff --git a/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/validator/PostPrepareBeaconProposer.java b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/validator/PostPrepareBeaconProposer.java index bd841cd4cfa..a0463589616 100644 --- a/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/validator/PostPrepareBeaconProposer.java +++ b/data/beaconrestapi/src/main/java/tech/pegasys/teku/beaconrestapi/handlers/v1/validator/PostPrepareBeaconProposer.java @@ -27,11 +27,7 @@ import io.javalin.plugin.openapi.annotations.OpenApiContent; import io.javalin.plugin.openapi.annotations.OpenApiRequestBody; import io.javalin.plugin.openapi.annotations.OpenApiResponse; -import java.util.Arrays; -import java.util.function.Supplier; -import java.util.stream.Collectors; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; +import java.util.List; import org.jetbrains.annotations.NotNull; import tech.pegasys.teku.api.DataProvider; import tech.pegasys.teku.api.ValidatorDataProvider; @@ -43,15 +39,17 @@ public class PostPrepareBeaconProposer extends AbstractHandler implements Handler { public static final String ROUTE = "/eth/v1/validator/prepare_beacon_proposer"; - private static final Logger LOG = LogManager.getLogger(); + + private final ValidatorDataProvider validatorDataProvider; public PostPrepareBeaconProposer(final DataProvider provider, final JsonProvider jsonProvider) { this(provider.getValidatorDataProvider(), jsonProvider); } public PostPrepareBeaconProposer( - final ValidatorDataProvider provider, final JsonProvider jsonProvider) { + final ValidatorDataProvider validatorDataProvider, final JsonProvider jsonProvider) { super(jsonProvider); + this.validatorDataProvider = validatorDataProvider; } @OpenApi( @@ -78,13 +76,7 @@ public void handle(@NotNull final Context ctx) throws Exception { final BeaconPreparableProposer[] request = parseRequestBody(ctx.body(), BeaconPreparableProposer[].class); - LOG.trace( - "received: {}", - (Supplier) - () -> - Arrays.stream(request) - .map(BeaconPreparableProposer::toString) - .collect(Collectors.joining(","))); + validatorDataProvider.prepareBeaconProposer(List.of(request)); ctx.status(SC_OK); } catch (final IllegalArgumentException e) { diff --git a/data/provider/src/main/java/tech/pegasys/teku/api/ValidatorDataProvider.java b/data/provider/src/main/java/tech/pegasys/teku/api/ValidatorDataProvider.java index af294d9c202..8cab852cae0 100644 --- a/data/provider/src/main/java/tech/pegasys/teku/api/ValidatorDataProvider.java +++ b/data/provider/src/main/java/tech/pegasys/teku/api/ValidatorDataProvider.java @@ -46,6 +46,7 @@ import tech.pegasys.teku.api.schema.altair.SignedContributionAndProof; import tech.pegasys.teku.api.schema.altair.SyncCommitteeMessage; import tech.pegasys.teku.api.schema.altair.SyncCommitteeSubnetSubscription; +import tech.pegasys.teku.api.schema.merge.BeaconPreparableProposer; import tech.pegasys.teku.api.schema.merge.SignedBeaconBlockMerge; import tech.pegasys.teku.api.schema.phase0.SignedBeaconBlockPhase0; import tech.pegasys.teku.infrastructure.async.SafeFuture; @@ -368,6 +369,17 @@ public SafeFuture sendContributionAndProofs( .collect(toList())); } + public void prepareBeaconProposer( + Collection beaconPreparableProposers) { + List + internalBeaconPreparableProposer = + beaconPreparableProposers.stream() + .map(BeaconPreparableProposer::asInternalBeaconPreparableProposer) + .collect(Collectors.toUnmodifiableList()); + + validatorApiChannel.prepareBeaconProposer(internalBeaconPreparableProposer); + } + public boolean isPhase0Slot(final UInt64 slot) { return spec.atSlot(slot).getMilestone() == SpecMilestone.PHASE0; } diff --git a/data/serializer/src/main/java/tech/pegasys/teku/api/schema/merge/BeaconPreparableProposer.java b/data/serializer/src/main/java/tech/pegasys/teku/api/schema/merge/BeaconPreparableProposer.java index 97faebd6ffc..cfa8d447ec8 100644 --- a/data/serializer/src/main/java/tech/pegasys/teku/api/schema/merge/BeaconPreparableProposer.java +++ b/data/serializer/src/main/java/tech/pegasys/teku/api/schema/merge/BeaconPreparableProposer.java @@ -19,7 +19,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; -import com.google.common.base.MoreObjects; import io.swagger.v3.oas.annotations.media.Schema; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.ssz.type.Bytes20; @@ -45,11 +44,11 @@ public BeaconPreparableProposer( this.fee_recipient = fee_recipient; } - @Override - public String toString() { - return MoreObjects.toStringHelper(this) - .add("validator_index", validator_index) - .add("fee_recipient", fee_recipient) - .toString(); + public static tech.pegasys.teku.spec.datastructures.operations.versions.merge + .BeaconPreparableProposer + asInternalBeaconPreparableProposer(BeaconPreparableProposer beaconPreparableProposer) { + return new tech.pegasys.teku.spec.datastructures.operations.versions.merge + .BeaconPreparableProposer( + beaconPreparableProposer.validator_index, beaconPreparableProposer.fee_recipient); } } diff --git a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionEngineChannelImpl.java b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionEngineChannelImpl.java index 75952078703..fc5c98b7a36 100644 --- a/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionEngineChannelImpl.java +++ b/ethereum/executionlayer/src/main/java/tech/pegasys/teku/ethereum/executionlayer/ExecutionEngineChannelImpl.java @@ -50,7 +50,7 @@ public static ExecutionEngineChannelImpl create(String eeEndpoint, Spec spec) { return new ExecutionEngineChannelImpl(new Web3JExecutionEngineClient(eeEndpoint), spec); } - public ExecutionEngineChannelImpl(ExecutionEngineClient executionEngineClient, Spec spec) { + private ExecutionEngineChannelImpl(ExecutionEngineClient executionEngineClient, Spec spec) { this.spec = spec; this.executionEngineClient = executionEngineClient; } diff --git a/validator/api/src/main/java/tech/pegasys/teku/validator/api/BeaconPreparableProposer.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/operations/versions/merge/BeaconPreparableProposer.java similarity index 96% rename from validator/api/src/main/java/tech/pegasys/teku/validator/api/BeaconPreparableProposer.java rename to ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/operations/versions/merge/BeaconPreparableProposer.java index 1b7ac8724c5..db7945de00a 100644 --- a/validator/api/src/main/java/tech/pegasys/teku/validator/api/BeaconPreparableProposer.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/datastructures/operations/versions/merge/BeaconPreparableProposer.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. */ -package tech.pegasys.teku.validator.api; +package tech.pegasys.teku.spec.datastructures.operations.versions.merge; import com.google.common.base.MoreObjects; import java.util.Objects; diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceUpdatedResult.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceUpdatedResult.java index 543ed558287..9ab601a8f3c 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceUpdatedResult.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceUpdatedResult.java @@ -13,6 +13,7 @@ package tech.pegasys.teku.spec.executionengine; +import com.google.common.base.MoreObjects; import java.util.Objects; import java.util.Optional; import tech.pegasys.teku.ssz.type.Bytes8; @@ -46,4 +47,12 @@ public boolean equals(final Object o) { public int hashCode() { return Objects.hash(status, payloadId); } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("status", status) + .add("payloadId", payloadId) + .toString(); + } } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 80d38e6a74e..127ef2f3998 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -60,8 +60,11 @@ import tech.pegasys.teku.protoarray.ProtoArrayStorageChannel; import tech.pegasys.teku.service.serviceutils.Service; import tech.pegasys.teku.service.serviceutils.ServiceConfig; +import tech.pegasys.teku.services.executionengine.ExecutionEngineChannelImpl; +import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.services.timer.TimeTickChannel; import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.BeaconBlockBodySchema; @@ -182,6 +185,7 @@ public class BeaconChainController extends Service implements TimeTickChannel { private volatile ActiveValidatorTracker activeValidatorTracker; private volatile AttestationTopicSubscriber attestationTopicSubscriber; private volatile SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager; + private volatile ForkChoiceNotifier forkChoiceNotifier; private UInt64 genesisTimeTracker = ZERO; private BlockManager blockManager; @@ -321,6 +325,7 @@ public void initAll() { initSyncCommitteePools(); initP2PNetwork(); initSyncService(); + initForkChoiceNotifier(); initSlotProcessor(); initMetrics(); initAttestationTopicSubscriber(); @@ -511,6 +516,7 @@ public void initValidatorApiHandler() { performanceTracker, spec, forkChoiceTrigger, + forkChoiceNotifier, syncCommitteeMessagePool, syncCommitteeContributionPool, syncCommitteeSubscriptionManager); @@ -703,6 +709,7 @@ private void initSlotProcessor() { recentChainData, syncService.getForwardSync(), forkChoiceTrigger, + forkChoiceNotifier, p2pNetwork, slotEventsChannelPublisher, new EpochCachePrimer(spec, recentChainData)); @@ -821,6 +828,25 @@ private void initOperationsReOrgManager() { eventChannels.subscribe(ChainHeadChannel.class, operationsReOrgManager); } + private void initForkChoiceNotifier() { + LOG.debug("BeaconChainController.initForkChoiceNotifier()"); + ExecutionEngineChannel executionEngineChannelHandler = ExecutionEngineChannel.NOOP; + ; + + if (spec.isMilestoneSupported(SpecMilestone.MERGE)) { + if (beaconConfig.executionEngineConfiguration().isEnabled()) { + executionEngineChannelHandler = + ExecutionEngineChannelImpl.create( + beaconConfig.executionEngineConfiguration().getEndpoints().get(0), spec); + } else { + LOG.warn("Execution engine endpoint should be specified"); + } + } + + forkChoiceNotifier = + ForkChoiceNotifier.create(recentChainData, executionEngineChannelHandler, spec); + } + private void setupInitialState(final RecentChainData client) { final Optional initialAnchor = wsInitializer.loadInitialAnchorPoint( diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java index 15cfabafbb6..b4879b40458 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java @@ -21,6 +21,7 @@ import tech.pegasys.teku.infrastructure.logging.EventLogger; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork; +import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blocks.NodeSlot; import tech.pegasys.teku.statetransition.EpochCachePrimer; @@ -34,6 +35,7 @@ public class SlotProcessor { private final RecentChainData recentChainData; private final ForwardSync syncService; private final ForkChoiceTrigger forkChoiceTrigger; + private final ForkChoiceNotifier forkChoiceNotifier; private final Eth2P2PNetwork p2pNetwork; private final SlotEventsChannel slotEventsChannelPublisher; private final NodeSlot nodeSlot = new NodeSlot(ZERO); @@ -50,6 +52,7 @@ public class SlotProcessor { final RecentChainData recentChainData, final ForwardSync syncService, final ForkChoiceTrigger forkChoiceTrigger, + final ForkChoiceNotifier forkChoiceNotifier, final Eth2P2PNetwork p2pNetwork, final SlotEventsChannel slotEventsChannelPublisher, final EpochCachePrimer epochCachePrimer, @@ -58,6 +61,7 @@ public class SlotProcessor { this.recentChainData = recentChainData; this.syncService = syncService; this.forkChoiceTrigger = forkChoiceTrigger; + this.forkChoiceNotifier = forkChoiceNotifier; this.p2pNetwork = p2pNetwork; this.slotEventsChannelPublisher = slotEventsChannelPublisher; this.epochCachePrimer = epochCachePrimer; @@ -69,6 +73,7 @@ public SlotProcessor( final RecentChainData recentChainData, final ForwardSync syncService, final ForkChoiceTrigger forkChoiceTrigger, + final ForkChoiceNotifier forkChoiceNotifier, final Eth2P2PNetwork p2pNetwork, final SlotEventsChannel slotEventsChannelPublisher, final EpochCachePrimer epochCachePrimer) { @@ -77,6 +82,7 @@ public SlotProcessor( recentChainData, syncService, forkChoiceTrigger, + forkChoiceNotifier, p2pNetwork, slotEventsChannelPublisher, epochCachePrimer, @@ -117,6 +123,7 @@ public void onTick(final UInt64 currentTime) { } if (isSlotAttestationDue(calculatedSlot, currentTime, nodeSlotStartTime)) { processSlotAttestation(epoch); + forkChoiceNotifier.onOneThirdOfSlot(nodeSlot.getValue()); nodeSlot.inc(); } diff --git a/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java b/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java index 120c2ffc4ac..2417369fa3d 100644 --- a/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java +++ b/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java @@ -35,6 +35,7 @@ import tech.pegasys.teku.infrastructure.logging.EventLogger; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork; +import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; @@ -62,6 +63,7 @@ public class SlotProcessorTest { private final ForwardSync syncService = mock(ForwardSync.class); private final ForkChoiceTrigger forkChoiceTrigger = mock(ForkChoiceTrigger.class); + private final ForkChoiceNotifier forkChoiceNotifier = mock(ForkChoiceNotifier.class); private final Eth2P2PNetwork p2pNetwork = mock(Eth2P2PNetwork.class); private final SlotEventsChannel slotEventsChannel = mock(SlotEventsChannel.class); private final EpochCachePrimer epochCachePrimer = mock(EpochCachePrimer.class); @@ -71,6 +73,7 @@ public class SlotProcessorTest { recentChainData, syncService, forkChoiceTrigger, + forkChoiceNotifier, p2pNetwork, slotEventsChannel, epochCachePrimer, @@ -287,6 +290,7 @@ void shouldPrecomputeEpochTransitionJustBeforeFirstSlotOfNextEpoch() { recentChainData, syncService, forkChoiceTrigger, + forkChoiceNotifier, p2pNetwork, slotEventsChannel, epochCachePrimer, diff --git a/services/powchain/build.gradle b/services/powchain/build.gradle index 6ae0020d375..e958ad59103 100644 --- a/services/powchain/build.gradle +++ b/services/powchain/build.gradle @@ -9,6 +9,7 @@ dependencies { implementation project(':infrastructure:metrics') implementation project(':infrastructure:version') implementation project(':pow') + implementation project(':storage') implementation project(':storage:api') implementation project(':infrastructure:serviceutils') implementation project(':util') diff --git a/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorApiChannel.java b/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorApiChannel.java index 644c3539a3c..6ed065e9ef7 100644 --- a/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorApiChannel.java +++ b/validator/api/src/main/java/tech/pegasys/teku/validator/api/ValidatorApiChannel.java @@ -34,6 +34,7 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; public interface ValidatorApiChannel extends ChannelInterface { diff --git a/validator/beaconnode/src/main/java/tech/pegasys/teku/validator/beaconnode/metrics/MetricRecordingValidatorApiChannel.java b/validator/beaconnode/src/main/java/tech/pegasys/teku/validator/beaconnode/metrics/MetricRecordingValidatorApiChannel.java index 8ec6791e73d..cef1613c70e 100644 --- a/validator/beaconnode/src/main/java/tech/pegasys/teku/validator/beaconnode/metrics/MetricRecordingValidatorApiChannel.java +++ b/validator/beaconnode/src/main/java/tech/pegasys/teku/validator/beaconnode/metrics/MetricRecordingValidatorApiChannel.java @@ -36,9 +36,9 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; import tech.pegasys.teku.validator.api.AttesterDuties; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.ProposerDuties; import tech.pegasys.teku.validator.api.SendSignedBlockResult; diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java index 42b84431ea4..ede4ae667ea 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java @@ -20,8 +20,8 @@ import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.ssz.type.Bytes20; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.ValidatorApiChannel; import tech.pegasys.teku.validator.api.ValidatorTimingChannel; import tech.pegasys.teku.validator.client.loader.OwnedValidators; diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java index 62baa812d38..c68e58356b8 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java @@ -35,8 +35,8 @@ import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.TestSpecContext; import tech.pegasys.teku.spec.TestSpecInvocationContextProvider.SpecContext; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.ssz.type.Bytes20; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.ValidatorApiChannel; import tech.pegasys.teku.validator.client.loader.OwnedValidators; diff --git a/validator/relaypublisher/src/main/java/tech/pegasys/teku/validator/relaypublisher/MultiPublishingValidatorApiChannel.java b/validator/relaypublisher/src/main/java/tech/pegasys/teku/validator/relaypublisher/MultiPublishingValidatorApiChannel.java index f81636dccb4..7437bac9939 100644 --- a/validator/relaypublisher/src/main/java/tech/pegasys/teku/validator/relaypublisher/MultiPublishingValidatorApiChannel.java +++ b/validator/relaypublisher/src/main/java/tech/pegasys/teku/validator/relaypublisher/MultiPublishingValidatorApiChannel.java @@ -40,9 +40,9 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; import tech.pegasys.teku.validator.api.AttesterDuties; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.ProposerDuties; import tech.pegasys.teku.validator.api.SendSignedBlockResult; diff --git a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/RemoteValidatorApiHandler.java b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/RemoteValidatorApiHandler.java index c1b82303674..5d28c920625 100644 --- a/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/RemoteValidatorApiHandler.java +++ b/validator/remote/src/main/java/tech/pegasys/teku/validator/remote/RemoteValidatorApiHandler.java @@ -55,10 +55,10 @@ import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution; import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.validator.SubnetSubscription; import tech.pegasys.teku.validator.api.AttesterDuties; import tech.pegasys.teku.validator.api.AttesterDuty; -import tech.pegasys.teku.validator.api.BeaconPreparableProposer; import tech.pegasys.teku.validator.api.CommitteeSubscriptionRequest; import tech.pegasys.teku.validator.api.ProposerDuties; import tech.pegasys.teku.validator.api.ProposerDuty; From 2957f450e18ed6b0c473e429ba4a1e91eb16a9ee Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Fri, 19 Nov 2021 10:52:07 +1000 Subject: [PATCH 2/6] Rearrange and simplify a bit. --- beacon/validator/build.gradle | 1 - .../ValidatorApiHandlerIntegrationTest.java | 2 +- .../coordinator/ValidatorApiHandler.java | 2 +- .../coordinator/ValidatorApiHandlerTest.java | 2 +- .../beaconchain/BeaconChainController.java | 28 ++++++------------- .../services/beaconchain/SlotProcessor.java | 4 +-- .../beaconchain/SlotProcessorTest.java | 2 +- 7 files changed, 15 insertions(+), 26 deletions(-) diff --git a/beacon/validator/build.gradle b/beacon/validator/build.gradle index a5f2c84cfed..db5367c029a 100644 --- a/beacon/validator/build.gradle +++ b/beacon/validator/build.gradle @@ -15,7 +15,6 @@ dependencies { implementation project(':util') implementation project(':infrastructure:logging') implementation project(':pow') - implementation project(':services:powchain') implementation project(':ssz') implementation project(':sync') implementation project(':data:serializer') diff --git a/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java b/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java index df9e5969ea6..9ef842ac60a 100644 --- a/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java +++ b/beacon/validator/src/integrationTest/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerIntegrationTest.java @@ -29,7 +29,6 @@ import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber; import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager; -import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; @@ -38,6 +37,7 @@ import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AttestationManager; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeMessagePool; diff --git a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java index d2c995302c0..942edcb375e 100644 --- a/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java +++ b/beacon/validator/src/main/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandler.java @@ -43,7 +43,6 @@ import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber; import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager; -import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.SpecVersion; import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation; @@ -70,6 +69,7 @@ import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AttestationManager; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeMessagePool; diff --git a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java index 2f6b1b238e9..92e597deebd 100644 --- a/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java +++ b/beacon/validator/src/test/java/tech/pegasys/teku/validator/coordinator/ValidatorApiHandlerTest.java @@ -50,7 +50,6 @@ import tech.pegasys.teku.networking.eth2.gossip.BlockGossipChannel; import tech.pegasys.teku.networking.eth2.gossip.subnets.AttestationTopicSubscriber; import tech.pegasys.teku.networking.eth2.gossip.subnets.SyncCommitteeSubscriptionManager; -import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.config.SpecConfig; @@ -76,6 +75,7 @@ import tech.pegasys.teku.statetransition.attestation.AggregatingAttestationPool; import tech.pegasys.teku.statetransition.attestation.AttestationManager; import tech.pegasys.teku.statetransition.block.BlockImportChannel; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeContributionPool; import tech.pegasys.teku.statetransition.synccommittee.SyncCommitteeMessagePool; diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 127ef2f3998..e224849b259 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -60,11 +60,8 @@ import tech.pegasys.teku.protoarray.ProtoArrayStorageChannel; import tech.pegasys.teku.service.serviceutils.Service; import tech.pegasys.teku.service.serviceutils.ServiceConfig; -import tech.pegasys.teku.services.executionengine.ExecutionEngineChannelImpl; -import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.services.timer.TimeTickChannel; import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.SpecMilestone; import tech.pegasys.teku.spec.datastructures.attestation.ValidateableAttestation; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; import tech.pegasys.teku.spec.datastructures.blocks.blockbody.BeaconBlockBodySchema; @@ -87,6 +84,7 @@ import tech.pegasys.teku.statetransition.block.BlockImporter; import tech.pegasys.teku.statetransition.block.BlockManager; import tech.pegasys.teku.statetransition.forkchoice.ForkChoice; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.statetransition.genesis.GenesisHandler; import tech.pegasys.teku.statetransition.synccommittee.SignedContributionAndProofValidator; @@ -186,6 +184,7 @@ public class BeaconChainController extends Service implements TimeTickChannel { private volatile AttestationTopicSubscriber attestationTopicSubscriber; private volatile SyncCommitteeSubscriptionManager syncCommitteeSubscriptionManager; private volatile ForkChoiceNotifier forkChoiceNotifier; + private volatile ExecutionEngineChannel executionEngine; private UInt64 genesisTimeTracker = ZERO; private BlockManager blockManager; @@ -308,6 +307,7 @@ private SafeFuture initialize() { } public void initAll() { + initExecutionEngine(); initForkChoice(); initBlockImporter(); initCombinedChainDataClient(); @@ -336,6 +336,10 @@ public void initAll() { initOperationsReOrgManager(); } + private void initExecutionEngine() { + executionEngine = eventChannels.getPublisher(ExecutionEngineChannel.class, beaconAsyncRunner); + } + private void initPendingBlocks() { LOG.debug("BeaconChainController.initPendingBlocks()"); pendingBlocks = PendingPool.createForBlocks(spec); @@ -775,7 +779,7 @@ public void initBlockImporter() { recentChainData, forkChoice, weakSubjectivityValidator, - ExecutionEngineChannel.NOOP); + executionEngine); } public void initBlockManager() { @@ -830,21 +834,7 @@ private void initOperationsReOrgManager() { private void initForkChoiceNotifier() { LOG.debug("BeaconChainController.initForkChoiceNotifier()"); - ExecutionEngineChannel executionEngineChannelHandler = ExecutionEngineChannel.NOOP; - ; - - if (spec.isMilestoneSupported(SpecMilestone.MERGE)) { - if (beaconConfig.executionEngineConfiguration().isEnabled()) { - executionEngineChannelHandler = - ExecutionEngineChannelImpl.create( - beaconConfig.executionEngineConfiguration().getEndpoints().get(0), spec); - } else { - LOG.warn("Execution engine endpoint should be specified"); - } - } - - forkChoiceNotifier = - ForkChoiceNotifier.create(recentChainData, executionEngineChannelHandler, spec); + forkChoiceNotifier = new ForkChoiceNotifier(recentChainData, executionEngine, spec); } private void setupInitialState(final RecentChainData client) { diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java index b4879b40458..a9cc266f172 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/SlotProcessor.java @@ -21,10 +21,10 @@ import tech.pegasys.teku.infrastructure.logging.EventLogger; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork; -import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.datastructures.blocks.NodeSlot; import tech.pegasys.teku.statetransition.EpochCachePrimer; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.sync.forward.ForwardSync; @@ -123,7 +123,6 @@ public void onTick(final UInt64 currentTime) { } if (isSlotAttestationDue(calculatedSlot, currentTime, nodeSlotStartTime)) { processSlotAttestation(epoch); - forkChoiceNotifier.onOneThirdOfSlot(nodeSlot.getValue()); nodeSlot.inc(); } @@ -215,6 +214,7 @@ private void processSlotStart(final UInt64 nodeEpoch) { private void processSlotAttestation(final UInt64 nodeEpoch) { onTickSlotAttestation = nodeSlot.getValue(); forkChoiceTrigger.onAttestationsDueForSlot(onTickSlotAttestation); + forkChoiceNotifier.onAttestationsDue(onTickSlotAttestation); recentChainData .getChainHead() .ifPresent( diff --git a/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java b/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java index 2417369fa3d..6ea85156a31 100644 --- a/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java +++ b/services/beaconchain/src/test/java/tech/pegasys/teku/services/beaconchain/SlotProcessorTest.java @@ -35,7 +35,6 @@ import tech.pegasys.teku.infrastructure.logging.EventLogger; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.networking.eth2.Eth2P2PNetwork; -import tech.pegasys.teku.services.executionengine.ForkChoiceNotifier; import tech.pegasys.teku.spec.Spec; import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blocks.SignedBeaconBlock; @@ -43,6 +42,7 @@ import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.statetransition.EpochCachePrimer; +import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceNotifier; import tech.pegasys.teku.statetransition.forkchoice.ForkChoiceTrigger; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.server.StateStorageMode; From 875e7ecc5743fa8b67fc4364bf474108774451e1 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Fri, 19 Nov 2021 21:21:30 +0100 Subject: [PATCH 3/6] updates --- .../java/tech/pegasys/teku/spec/Spec.java | 4 + .../spec/executionengine/ForkChoiceState.java | 16 ++ .../logic/common/helpers/MiscHelpers.java | 5 + .../forkchoice/ForkChoiceNotifierTest.java | 138 ++++++++++++++++++ 4 files changed, 163 insertions(+) create mode 100644 ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java index e6c6e1172d4..1a9f7db0a15 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java @@ -292,6 +292,10 @@ public UInt64 computeEpochAtSlot(final UInt64 slot) { return atSlot(slot).miscHelpers().computeEpochAtSlot(slot); } + public UInt64 computeTimeAtSlot(BeaconState state, UInt64 slot) { + return atSlot(slot).miscHelpers().computeTimeAtSlot(state, slot); + } + public Bytes computeSigningRoot(BeaconBlock block, Bytes32 domain) { return atBlock(block).miscHelpers().computeSigningRoot(block, domain); } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceState.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceState.java index ecf46d93cfa..422a89d7510 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceState.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/ForkChoiceState.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.spec.executionengine; import com.google.common.base.MoreObjects; +import com.google.common.base.Objects; import java.util.Optional; import org.apache.tuweni.bytes.Bytes32; @@ -54,4 +55,19 @@ public String toString() { .add("finalizedBlockHash", finalizedBlockHash) .toString(); } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof ForkChoiceState)) return false; + ForkChoiceState that = (ForkChoiceState) o; + return headBlockHash == that.headBlockHash + && safeBlockHash == that.safeBlockHash + && finalizedBlockHash == that.finalizedBlockHash; + } + + @Override + public int hashCode() { + return Objects.hashCode(headBlockHash, safeBlockHash, finalizedBlockHash); + } } diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/helpers/MiscHelpers.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/helpers/MiscHelpers.java index 946ebc19e5f..8cb4e3cc29c 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/helpers/MiscHelpers.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/logic/common/helpers/MiscHelpers.java @@ -103,6 +103,11 @@ public UInt64 computeStartSlotAtEpoch(UInt64 epoch) { return epoch.times(specConfig.getSlotsPerEpoch()); } + public UInt64 computeTimeAtSlot(BeaconState state, UInt64 slot) { + UInt64 slotsSinceGenesis = slot.minus(SpecConfig.GENESIS_SLOT); + return state.getGenesis_time().plus(slotsSinceGenesis.times(specConfig.getSecondsPerSlot())); + } + public boolean isSlotAtNthEpochBoundary( final UInt64 blockSlot, final UInt64 parentSlot, final int n) { checkArgument(n > 0, "Parameter n must be greater than 0"); diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java new file mode 100644 index 00000000000..f38251c0cb3 --- /dev/null +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java @@ -0,0 +1,138 @@ +/* + * Copyright 2021 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.forkchoice; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Collection; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.tuweni.bytes.Bytes32; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.mockito.ArgumentCaptor; +import tech.pegasys.teku.bls.BLSKeyGenerator; +import tech.pegasys.teku.bls.BLSKeyPair; +import tech.pegasys.teku.core.ChainBuilder; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.TestSpecContext; +import tech.pegasys.teku.spec.TestSpecInvocationContextProvider; +import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; +import tech.pegasys.teku.spec.executionengine.ExecutionEngineChannel; +import tech.pegasys.teku.spec.executionengine.ForkChoiceState; +import tech.pegasys.teku.spec.executionengine.ForkChoiceUpdatedResult; +import tech.pegasys.teku.spec.executionengine.ForkChoiceUpdatedStatus; +import tech.pegasys.teku.spec.executionengine.PayloadAttributes; +import tech.pegasys.teku.spec.logic.common.block.AbstractBlockProcessor; +import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.storage.client.ChainUpdater; +import tech.pegasys.teku.storage.client.RecentChainData; +import tech.pegasys.teku.storage.server.StateStorageMode; +import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder; +import tech.pegasys.teku.storage.storageSystem.StorageSystem; + +@TestSpecContext() +public class ForkChoiceNotifierTest { + private static final List VALIDATOR_KEYS = BLSKeyGenerator.generateKeyPairs(64); + + final ArgumentCaptor forkChoiceStateCaptor = + ArgumentCaptor.forClass(ForkChoiceState.class); + + @SuppressWarnings("unchecked") + final ArgumentCaptor> payloadAttributesCaptor = + ArgumentCaptor.forClass(Optional.class); + + ChainBuilder chainBuilder; + StorageSystem storageSystem; + ChainUpdater chainUpdater; + RecentChainData recentChainData; + ForkChoiceNotifier forkChoiceNotifier; + DataStructureUtil dataStructureUtil; + + ExecutionEngineChannel executionEngineChannel = mock(ExecutionEngineChannel.class); + + @BeforeAll + public static void init() { + AbstractBlockProcessor.BLS_VERIFY_DEPOSIT = false; + } + + @AfterAll + public static void reset() { + AbstractBlockProcessor.BLS_VERIFY_DEPOSIT = true; + } + + @BeforeEach + void setUp(TestSpecInvocationContextProvider.SpecContext specContext) { + dataStructureUtil = specContext.getDataStructureUtil(); + Spec spec = specContext.getSpec(); + chainBuilder = ChainBuilder.create(spec, VALIDATOR_KEYS); + storageSystem = InMemoryStorageSystemBuilder.buildDefault(StateStorageMode.ARCHIVE); + recentChainData = storageSystem.recentChainData(); + chainUpdater = new ChainUpdater(storageSystem.recentChainData(), chainBuilder); + chainUpdater.initializeGenesis(false); + + forkChoiceNotifier = new ForkChoiceNotifier(recentChainData, executionEngineChannel, spec); + + when(executionEngineChannel.forkChoiceUpdated(any(), any())) + .thenReturn( + SafeFuture.completedFuture( + new ForkChoiceUpdatedResult(ForkChoiceUpdatedStatus.SUCCESS, Optional.empty()))); + } + + @TestTemplate + void shouldCallForkChoiceWithAttributes() { + prepareAllValidators(); + + SignedBlockAndState signedBlockAndState = chainBuilder.getLatestBlockAndState(); + + Bytes32 root = signedBlockAndState.getRoot(); + Bytes32 executionHeadRoot = dataStructureUtil.randomBytes32(); + ForkChoiceState forkChoiceState = + new ForkChoiceState( + executionHeadRoot, + dataStructureUtil.randomBytes32(), + dataStructureUtil.randomBytes32()); + + forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); + + verify(executionEngineChannel) + .forkChoiceUpdated(forkChoiceStateCaptor.capture(), payloadAttributesCaptor.capture()); + + assertThat(forkChoiceStateCaptor.getValue()).isEqualToComparingFieldByField(forkChoiceState); + assertThat(payloadAttributesCaptor.getValue()).isNotEmpty(); + } + + private void prepareAllValidators() { + Collection proposers = + IntStream.range(0, VALIDATOR_KEYS.size()) + .mapToObj( + index -> + new BeaconPreparableProposer( + UInt64.valueOf(index), dataStructureUtil.randomBytes20())) + .collect(Collectors.toList()); + + forkChoiceNotifier.onUpdatePreparableProposers(proposers); + } +} From 3e5ff307915ae87e7d1d29ad7098d6ae7fd76871 Mon Sep 17 00:00:00 2001 From: Enrico Del Fante Date: Sun, 21 Nov 2021 20:30:37 +0100 Subject: [PATCH 4/6] improvements --- .../forkchoice/ForkChoiceNotifier.java | 289 ++++++++++++++++++ .../forkchoice/ForkChoiceNotifierTest.java | 192 ++++++++++-- .../client/BeaconProposerPreparer.java | 4 +- 3 files changed, 464 insertions(+), 21 deletions(-) create mode 100644 ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java new file mode 100644 index 00000000000..7fcddd30012 --- /dev/null +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java @@ -0,0 +1,289 @@ +/* + * Copyright 2021 ConsenSys AG. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package tech.pegasys.teku.statetransition.forkchoice; + +import com.google.common.annotations.VisibleForTesting; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary; +import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.executionengine.ExecutionEngineChannel; +import tech.pegasys.teku.spec.executionengine.ForkChoiceState; +import tech.pegasys.teku.spec.executionengine.ForkChoiceUpdatedResult; +import tech.pegasys.teku.spec.executionengine.PayloadAttributes; +import tech.pegasys.teku.ssz.type.Bytes20; +import tech.pegasys.teku.storage.client.RecentChainData; + +public class ForkChoiceNotifier { + private static final int MAX_PROPOSER_SEEN_EPOCHS = 2; + private static final Logger LOG = LogManager.getLogger(); + + private final ExecutionEngineChannel executionEngineChannel; + private final Spec spec; + private final RecentChainData recentChainData; + + private final CurrentForkChoiceUpdatedInfo currentForkChoiceUpdatedInfo = + new CurrentForkChoiceUpdatedInfo(); + + private Map + proposerIndexLastSeenSlotAndFeeRecipient = new HashMap<>(); + + private Optional maybeLastAttestationDueSlot = Optional.empty(); + + public ForkChoiceNotifier( + final RecentChainData recentChainData, + final ExecutionEngineChannel executionEngineChannel, + final Spec spec) { + this.recentChainData = recentChainData; + this.executionEngineChannel = executionEngineChannel; + this.spec = spec; + } + + public void onUpdatePreparableProposers( + Collection beaconPreparableProposers) { + LOG.debug("onUpdatePreparableProposers {}", beaconPreparableProposers); + + recentChainData + .getCurrentSlot() + .ifPresent( + currentSlot -> + updatePreparableProposers( + beaconPreparableProposers, + attestationDueHasBeenCalledForSlot(currentSlot) + ? currentSlot.plus(1) + : currentSlot)); + } + + private boolean attestationDueHasBeenCalledForSlot(UInt64 slot) { + return maybeLastAttestationDueSlot + .map(lastAttestationDueSlot -> lastAttestationDueSlot.isGreaterThanOrEqualTo(slot)) + .orElse(false); + } + + public void onForkChoiceUpdated( + Bytes32 optimisticHeadBeaconBlockRoot, ForkChoiceState forkChoiceState) { + LOG.debug("onForkChoiceUpdated {}, {}", optimisticHeadBeaconBlockRoot, forkChoiceState); + + Optional currentValidatedHead = + recentChainData.getChainHead().map(StateAndBlockSummary::getRoot); + boolean isForkChoiceStateOnValidatedHead = + currentValidatedHead.isPresent() + && optimisticHeadBeaconBlockRoot.equals(currentValidatedHead.get()); + + currentForkChoiceUpdatedInfo.forkChoiceState = Optional.of(forkChoiceState); + currentForkChoiceUpdatedInfo.isOnValidatedHead = isForkChoiceStateOnValidatedHead; + + recentChainData + .getCurrentSlot() + .ifPresent( + (slot) -> { + UInt64 targetSlot = slot.plus(1); + callExecutionEngineForkChoiceUpdated(getPayloadAttributes(targetSlot), targetSlot); + }); + } + + public void onAttestationsDue(UInt64 slot) { + LOG.debug("onAttestationsDue {}", slot); + + maybeLastAttestationDueSlot = Optional.of(slot); + + UInt64 targetSlot = slot.plus(1); + + if (currentForkChoiceUpdatedInfo.hasNotBeenCalledForSlot(targetSlot)) { + + Optional payloadAttributes = getPayloadAttributes(targetSlot); + + if (payloadAttributes.isPresent()) { + callExecutionEngineForkChoiceUpdated(payloadAttributes, targetSlot); + } + } + } + + public CurrentForkChoiceUpdatedInfo getCurrentForkChoiceUpdatedInfo() { + return currentForkChoiceUpdatedInfo; + } + + private void updatePreparableProposers( + Collection beaconPreparableProposers, UInt64 currentSlot) { + + final int maxProposerSeenSlot = spec.getSlotsPerEpoch(currentSlot) * MAX_PROPOSER_SEEN_EPOCHS; + boolean newProposerIndexAdded = false; + + // remove expired proposers + proposerIndexLastSeenSlotAndFeeRecipient = + proposerIndexLastSeenSlotAndFeeRecipient.entrySet().stream() + .filter(p -> p.getValue().lastSeenSlot.compareTo(currentSlot) < maxProposerSeenSlot) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + // update or add new proposers + for (BeaconPreparableProposer beaconPreparableProposer : beaconPreparableProposers) { + ProposerLastSeenSlotAndFeeRecipient current = + proposerIndexLastSeenSlotAndFeeRecipient.get( + beaconPreparableProposer.getValidatorIndex()); + if (current == null) { + proposerIndexLastSeenSlotAndFeeRecipient.put( + beaconPreparableProposer.getValidatorIndex(), + new ProposerLastSeenSlotAndFeeRecipient( + currentSlot, beaconPreparableProposer.getFeeRecipient())); + + newProposerIndexAdded = true; + + } else { + current.lastSeenSlot = currentSlot; + } + } + + if (newProposerIndexAdded) { + final Optional payloadAttributes = getPayloadAttributes(currentSlot); + + if (payloadAttributes.isPresent()) + callExecutionEngineForkChoiceUpdated(payloadAttributes, currentSlot); + } + } + + /** + * checks proposal preconditions and if we have a prepared proposer required to produce a block + * for the targetSlot + * + *

if conditions are met, builds a {@link PayloadAttributes} gathering data from current chain + * head state. + * + * @param targetSlot at which we want to produce a block + * @return optionally payload attributes for the block proposer due + */ + private Optional getPayloadAttributes(UInt64 targetSlot) { + if (!currentForkChoiceUpdatedInfo.isPossibleToPropose()) { + return Optional.empty(); + } + + return recentChainData + .getChainHead() + .map( + stateAndBlockSummary -> { + UInt64 proposerIndex = + UInt64.valueOf( + spec.getBeaconProposerIndex(stateAndBlockSummary.getState(), targetSlot)); + + ProposerLastSeenSlotAndFeeRecipient maybePreparedProposer = + proposerIndexLastSeenSlotAndFeeRecipient.get(proposerIndex); + + if (maybePreparedProposer != null) { + + final BeaconState state = stateAndBlockSummary.getState(); + final UInt64 epoch = spec.computeEpochAtSlot(targetSlot); + + return new PayloadAttributes( + spec.computeTimeAtSlot(state, targetSlot), + spec.atEpoch(epoch).beaconStateAccessors().getRandaoMix(state, epoch), + maybePreparedProposer.feeRecipient); + } + return null; + }); + } + + private void callExecutionEngineForkChoiceUpdated( + final Optional payloadAttributes, + final UInt64 executionPayloadTargetSlot) { + + currentForkChoiceUpdatedInfo.calledAtSlot = Optional.of(executionPayloadTargetSlot); + currentForkChoiceUpdatedInfo.payloadAttributes = payloadAttributes; + + ForkChoiceState forkChoiceState = + currentForkChoiceUpdatedInfo.forkChoiceState.orElseThrow( + () -> new IllegalStateException("A current ForkChoiceState is expected")); + + // do we need to .join() or ordering will be respected at this point? + // I think it is currently synchronous up to the actual call + // or maybe if it is now a real channel is not the case. + executionEngineChannel + .forkChoiceUpdated(forkChoiceState, payloadAttributes) + .finish( + result -> { + currentForkChoiceUpdatedInfo.result = Optional.of(result); + LOG.info("forkChoiceUpdated result: {}", result); + }, + error -> + LOG.error( + "Error while calling forkChoiceUpdated. Message: {}", error.getMessage())); + } + + @VisibleForTesting + Bytes20 getProposerIndexFeeRecipient(UInt64 proposerIndex) { + ProposerLastSeenSlotAndFeeRecipient lastSeenSlotAndFeeRecipient = + proposerIndexLastSeenSlotAndFeeRecipient.get(proposerIndex); + return lastSeenSlotAndFeeRecipient != null ? lastSeenSlotAndFeeRecipient.feeRecipient : null; + } + + @VisibleForTesting + UInt64 getProposerIndexLastSeenSlot(UInt64 proposerIndex) { + ProposerLastSeenSlotAndFeeRecipient lastSeenSlotAndFeeRecipient = + proposerIndexLastSeenSlotAndFeeRecipient.get(proposerIndex); + return lastSeenSlotAndFeeRecipient != null ? lastSeenSlotAndFeeRecipient.lastSeenSlot : null; + } + + private static class ProposerLastSeenSlotAndFeeRecipient { + UInt64 lastSeenSlot; + Bytes20 feeRecipient; + + public ProposerLastSeenSlotAndFeeRecipient(UInt64 lastSeenSlot, Bytes20 feeRecipient) { + this.lastSeenSlot = lastSeenSlot; + this.feeRecipient = feeRecipient; + } + } + + public static class CurrentForkChoiceUpdatedInfo { + private Optional forkChoiceState = Optional.empty(); + private Boolean isOnValidatedHead = false; + private Optional calledAtSlot = Optional.empty(); + private Optional payloadAttributes = Optional.empty(); + private Optional result = Optional.empty(); + + private boolean isPossibleToPropose() { + return forkChoiceState.isPresent() && isOnValidatedHead; + } + + private boolean hasNotBeenCalledForSlot(UInt64 slot) { + return calledAtSlot.map(lastSlot -> lastSlot.compareTo(slot) < 0).orElse(true); + } + + public Optional getCalledAtSlot() { + return calledAtSlot; + } + + public Optional getForkChoiceState() { + return forkChoiceState; + } + + public Boolean getOnValidatedHead() { + return isOnValidatedHead; + } + + public Optional getPayloadAttributes() { + return payloadAttributes; + } + + public Optional getResult() { + return result; + } + } +} diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java index f38251c0cb3..4c9aac668ce 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java @@ -16,6 +16,8 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -47,6 +49,7 @@ import tech.pegasys.teku.spec.executionengine.PayloadAttributes; import tech.pegasys.teku.spec.logic.common.block.AbstractBlockProcessor; import tech.pegasys.teku.spec.util.DataStructureUtil; +import tech.pegasys.teku.ssz.type.Bytes20; import tech.pegasys.teku.storage.client.ChainUpdater; import tech.pegasys.teku.storage.client.RecentChainData; import tech.pegasys.teku.storage.server.StateStorageMode; @@ -70,23 +73,26 @@ public class ForkChoiceNotifierTest { RecentChainData recentChainData; ForkChoiceNotifier forkChoiceNotifier; DataStructureUtil dataStructureUtil; + ForkChoiceState forkChoiceState; + Bytes32 root; + Spec spec; ExecutionEngineChannel executionEngineChannel = mock(ExecutionEngineChannel.class); @BeforeAll - public static void init() { + public static void initSession() { AbstractBlockProcessor.BLS_VERIFY_DEPOSIT = false; } @AfterAll - public static void reset() { + public static void resetSession() { AbstractBlockProcessor.BLS_VERIFY_DEPOSIT = true; } @BeforeEach void setUp(TestSpecInvocationContextProvider.SpecContext specContext) { dataStructureUtil = specContext.getDataStructureUtil(); - Spec spec = specContext.getSpec(); + spec = specContext.getSpec(); chainBuilder = ChainBuilder.create(spec, VALIDATOR_KEYS); storageSystem = InMemoryStorageSystemBuilder.buildDefault(StateStorageMode.ARCHIVE); recentChainData = storageSystem.recentChainData(); @@ -95,33 +101,107 @@ void setUp(TestSpecInvocationContextProvider.SpecContext specContext) { forkChoiceNotifier = new ForkChoiceNotifier(recentChainData, executionEngineChannel, spec); - when(executionEngineChannel.forkChoiceUpdated(any(), any())) - .thenReturn( - SafeFuture.completedFuture( - new ForkChoiceUpdatedResult(ForkChoiceUpdatedStatus.SUCCESS, Optional.empty()))); + resetExecutionEngineChannelMock(); } @TestTemplate - void shouldCallForkChoiceWithAttributes() { + void onForkChoiceShouldCallForkChoiceUpdatedWithAttributesWhenProposerIsPrepared() { prepareAllValidators(); - SignedBlockAndState signedBlockAndState = chainBuilder.getLatestBlockAndState(); + setRootAndForkChoiceState(); - Bytes32 root = signedBlockAndState.getRoot(); - Bytes32 executionHeadRoot = dataStructureUtil.randomBytes32(); - ForkChoiceState forkChoiceState = - new ForkChoiceState( - executionHeadRoot, - dataStructureUtil.randomBytes32(), - dataStructureUtil.randomBytes32()); + forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); + + validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); + } + + @TestTemplate + void onForkChoiceShouldCallForkChoiceUpdatedWithoutAttributesWhenProposerIsNotPrepared() { + setRootAndForkChoiceState(); forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); - verify(executionEngineChannel) - .forkChoiceUpdated(forkChoiceStateCaptor.capture(), payloadAttributesCaptor.capture()); + validateForkChoiceUpdatedWithoutPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); + } - assertThat(forkChoiceStateCaptor.getValue()).isEqualToComparingFieldByField(forkChoiceState); - assertThat(payloadAttributesCaptor.getValue()).isNotEmpty(); + @TestTemplate + void onUpdatePreparableProposersShouldNotCallForkChoiceUpdatedWithNotForkChoiceState() { + prepareAllValidators(); + + validateForkChoiceUpdatedHasNotBeenCalled(); + } + + @TestTemplate + void onUpdatePreparableProposersShouldNotCallForkChoiceUpdatedWhenNotPreparingProposer() { + setRootAndForkChoiceState(); + + forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); + + resetExecutionEngineChannelMock(); + + prepareWithNonProposingValidators(); + + validateForkChoiceUpdatedHasNotBeenCalled(); + } + + @TestTemplate + void onUpdatePreparableProposersShouldCallForkChoiceUpdatedOnCurrentSlotWhenPreparingProposer() { + setRootAndForkChoiceState(); + + forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); + + resetExecutionEngineChannelMock(); + + prepareAllValidators(); + + validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot()); + } + + @TestTemplate + void + onUpdatePreparableProposersShouldCallForkChoiceUpdatedOnNextSlotWhenPreparingProposerAfterAttestationDue() { + setRootAndForkChoiceState(); + + forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); + forkChoiceNotifier.onAttestationsDue(chainBuilder.getLatestSlot()); + + resetExecutionEngineChannelMock(); + + prepareAllValidators(); + + validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); + } + + @TestTemplate + void complexScenario1() { + setRootAndForkChoiceState(); + + // we begin with no proposers + forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); + + validateForkChoiceUpdatedWithoutPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); + + resetExecutionEngineChannelMock(); + + // then, before attestationDue, we prepare validator + prepareAllValidators(); + + validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot()); + + resetExecutionEngineChannelMock(); + + // then forkchoice state changes before attestationDue + forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); + + validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); + + resetExecutionEngineChannelMock(); + + // attestationDue arrives + + forkChoiceNotifier.onAttestationsDue(chainBuilder.getLatestSlot()); + + validateForkChoiceUpdatedHasNotBeenCalled(); } private void prepareAllValidators() { @@ -135,4 +215,76 @@ private void prepareAllValidators() { forkChoiceNotifier.onUpdatePreparableProposers(proposers); } + + private void prepareWithNonProposingValidators() { + Collection proposers = + IntStream.range(VALIDATOR_KEYS.size() + 1, VALIDATOR_KEYS.size() + 5) + .mapToObj( + index -> + new BeaconPreparableProposer( + UInt64.valueOf(index), dataStructureUtil.randomBytes20())) + .collect(Collectors.toList()); + + forkChoiceNotifier.onUpdatePreparableProposers(proposers); + } + + private void validateForkChoiceUpdatedHasNotBeenCalled() { + verify(executionEngineChannel, never()) + .forkChoiceUpdated(forkChoiceStateCaptor.capture(), payloadAttributesCaptor.capture()); + } + + private void validateForkChoiceUpdatedWithPayloadAttributes(UInt64 targetSlot) { + verify(executionEngineChannel) + .forkChoiceUpdated(forkChoiceStateCaptor.capture(), payloadAttributesCaptor.capture()); + + assertThat(forkChoiceStateCaptor.getValue()).isEqualToComparingFieldByField(forkChoiceState); + + SignedBlockAndState signedBlockAndState = chainBuilder.getLatestBlockAndState(); + UInt64 expectedProposerIndex = + UInt64.valueOf(spec.getBeaconProposerIndex(signedBlockAndState.getState(), targetSlot)); + Bytes20 proposerLastSeenSlotAndFeeRecipient = + forkChoiceNotifier.getProposerIndexFeeRecipient(expectedProposerIndex); + assertThat(proposerLastSeenSlotAndFeeRecipient).isNotNull(); + + Optional payloadAttributes = payloadAttributesCaptor.getValue(); + assertThat(payloadAttributes).isNotEmpty(); + assertThat(payloadAttributes.get().getFeeRecipient()) + .isEqualTo(proposerLastSeenSlotAndFeeRecipient); + } + + private void validateForkChoiceUpdatedWithoutPayloadAttributes(UInt64 targetSlot) { + verify(executionEngineChannel) + .forkChoiceUpdated(forkChoiceStateCaptor.capture(), payloadAttributesCaptor.capture()); + + assertThat(forkChoiceStateCaptor.getValue()).isEqualToComparingFieldByField(forkChoiceState); + + SignedBlockAndState signedBlockAndState = chainBuilder.getLatestBlockAndState(); + UInt64 expectedProposerIndex = + UInt64.valueOf(spec.getBeaconProposerIndex(signedBlockAndState.getState(), targetSlot)); + Bytes20 proposerLastSeenSlotAndFeeRecipient = + forkChoiceNotifier.getProposerIndexFeeRecipient(expectedProposerIndex); + assertThat(proposerLastSeenSlotAndFeeRecipient).isNull(); + + Optional payloadAttributes = payloadAttributesCaptor.getValue(); + assertThat(payloadAttributes).isEmpty(); + } + + private void resetExecutionEngineChannelMock() { + reset(executionEngineChannel); + when(executionEngineChannel.forkChoiceUpdated(any(), any())) + .thenReturn( + SafeFuture.completedFuture( + new ForkChoiceUpdatedResult(ForkChoiceUpdatedStatus.SUCCESS, Optional.empty()))); + } + + private void setRootAndForkChoiceState() { + SignedBlockAndState signedBlockAndState = chainBuilder.getLatestBlockAndState(); + root = signedBlockAndState.getRoot(); + Bytes32 executionHeadRoot = dataStructureUtil.randomBytes32(); + forkChoiceState = + new ForkChoiceState( + executionHeadRoot, + dataStructureUtil.randomBytes32(), + dataStructureUtil.randomBytes32()); + } } diff --git a/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java b/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java index ede4ae667ea..e0118b53656 100644 --- a/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java +++ b/validator/client/src/main/java/tech/pegasys/teku/validator/client/BeaconProposerPreparer.java @@ -32,6 +32,7 @@ public class BeaconProposerPreparer implements ValidatorTimingChannel { private final OwnedValidators validators; private final Spec spec; private final Bytes20 feeRecipient; + private boolean firstCallDone = false; public BeaconProposerPreparer( ValidatorApiChannel validatorApiChannel, @@ -48,7 +49,8 @@ public BeaconProposerPreparer( @Override public void onSlot(UInt64 slot) { - if (slot.mod(spec.getSlotsPerEpoch(slot)).isZero()) { + if (slot.mod(spec.getSlotsPerEpoch(slot)).isZero() || !firstCallDone) { + firstCallDone = true; validatorIndexProvider .getValidatorIndices(validators.getPublicKeys()) .thenApply( From f32f0f81ef61eb1ec92a3fdaaa30871dd4d2c7e7 Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Mon, 22 Nov 2021 12:08:13 +1000 Subject: [PATCH 5/6] Thread safe ForkChoiceNotifier --- .../java/tech/pegasys/teku/spec/Spec.java | 4 + .../executionengine/PayloadAttributes.java | 20 + .../forkchoice/ForkChoiceNotifier.java | 374 +++++++++--------- .../forkchoice/ForkChoiceNotifierTest.java | 347 ++++++++-------- .../beaconchain/BeaconChainController.java | 3 +- services/powchain/build.gradle | 1 - 6 files changed, 360 insertions(+), 389 deletions(-) diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java index 1a9f7db0a15..607f184503a 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/Spec.java @@ -332,6 +332,10 @@ public Bytes32 getDomain( .getDomain(domainType, epoch, fork, genesisValidatorsRoot); } + public Bytes32 getRandaoMix(final BeaconState state, final UInt64 epoch) { + return atEpoch(epoch).beaconStateAccessors().getRandaoMix(state, epoch); + } + public boolean verifyProposerSlashingSignature( BeaconState state, ProposerSlashing proposerSlashing, diff --git a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/PayloadAttributes.java b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/PayloadAttributes.java index cf79194a26d..68e1d359f64 100644 --- a/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/PayloadAttributes.java +++ b/ethereum/spec/src/main/java/tech/pegasys/teku/spec/executionengine/PayloadAttributes.java @@ -14,6 +14,7 @@ package tech.pegasys.teku.spec.executionengine; import com.google.common.base.MoreObjects; +import java.util.Objects; import org.apache.tuweni.bytes.Bytes32; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.ssz.type.Bytes20; @@ -41,6 +42,25 @@ public Bytes20 getFeeRecipient() { return feeRecipient; } + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final PayloadAttributes that = (PayloadAttributes) o; + return Objects.equals(timestamp, that.timestamp) + && Objects.equals(random, that.random) + && Objects.equals(feeRecipient, that.feeRecipient); + } + + @Override + public int hashCode() { + return Objects.hash(timestamp, random, feeRecipient); + } + @Override public String toString() { return MoreObjects.toStringHelper(this) diff --git a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java index 7fcddd30012..410592b251d 100644 --- a/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java +++ b/ethereum/statetransition/src/main/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifier.java @@ -13,17 +13,21 @@ package tech.pegasys.teku.statetransition.forkchoice; -import com.google.common.annotations.VisibleForTesting; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.tuweni.bytes.Bytes32; +import tech.pegasys.teku.infrastructure.async.AsyncRunnerFactory; +import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.async.eventthread.AsyncRunnerEventThread; +import tech.pegasys.teku.infrastructure.async.eventthread.EventThread; import tech.pegasys.teku.infrastructure.unsigned.UInt64; import tech.pegasys.teku.spec.Spec; +import tech.pegasys.teku.spec.config.SpecConfig; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; import tech.pegasys.teku.spec.datastructures.blocks.StateAndBlockSummary; import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; @@ -32,258 +36,232 @@ import tech.pegasys.teku.spec.executionengine.ForkChoiceUpdatedResult; import tech.pegasys.teku.spec.executionengine.PayloadAttributes; import tech.pegasys.teku.ssz.type.Bytes20; +import tech.pegasys.teku.ssz.type.Bytes8; import tech.pegasys.teku.storage.client.RecentChainData; public class ForkChoiceNotifier { - private static final int MAX_PROPOSER_SEEN_EPOCHS = 2; + private static final long MAX_PROPOSER_SEEN_EPOCHS = 2; private static final Logger LOG = LogManager.getLogger(); - private final ExecutionEngineChannel executionEngineChannel; + private final EventThread eventThread; private final Spec spec; + private final ExecutionEngineChannel executionEngineChannel; private final RecentChainData recentChainData; - private final CurrentForkChoiceUpdatedInfo currentForkChoiceUpdatedInfo = - new CurrentForkChoiceUpdatedInfo(); + private final Map proposerInfoByValidatorIndex = new HashMap<>(); - private Map - proposerIndexLastSeenSlotAndFeeRecipient = new HashMap<>(); + private Optional forkChoiceState = Optional.empty(); + private Optional payloadAttributes = Optional.empty(); - private Optional maybeLastAttestationDueSlot = Optional.empty(); + private Optional lastSentForkChoiceState = Optional.empty(); + private Optional lastSentPayloadAttributes = Optional.empty(); + private Optional lastPayloadId = Optional.empty(); - public ForkChoiceNotifier( - final RecentChainData recentChainData, + ForkChoiceNotifier( + final EventThread eventThread, + final Spec spec, final ExecutionEngineChannel executionEngineChannel, - final Spec spec) { - this.recentChainData = recentChainData; - this.executionEngineChannel = executionEngineChannel; + final RecentChainData recentChainData) { + this.eventThread = eventThread; this.spec = spec; + this.executionEngineChannel = executionEngineChannel; + this.recentChainData = recentChainData; } - public void onUpdatePreparableProposers( - Collection beaconPreparableProposers) { - LOG.debug("onUpdatePreparableProposers {}", beaconPreparableProposers); - - recentChainData - .getCurrentSlot() - .ifPresent( - currentSlot -> - updatePreparableProposers( - beaconPreparableProposers, - attestationDueHasBeenCalledForSlot(currentSlot) - ? currentSlot.plus(1) - : currentSlot)); + public static ForkChoiceNotifier create( + final AsyncRunnerFactory asyncRunnerFactory, + final Spec spec, + final ExecutionEngineChannel executionEngineChannel, + final RecentChainData recentChainData) { + final AsyncRunnerEventThread eventThread = + new AsyncRunnerEventThread("forkChoiceNotifier", asyncRunnerFactory); + return new ForkChoiceNotifier(eventThread, spec, executionEngineChannel, recentChainData); } - private boolean attestationDueHasBeenCalledForSlot(UInt64 slot) { - return maybeLastAttestationDueSlot - .map(lastAttestationDueSlot -> lastAttestationDueSlot.isGreaterThanOrEqualTo(slot)) - .orElse(false); + public void onUpdatePreparableProposers(final Collection proposers) { + eventThread.execute(() -> internalUpdatePreparableProposers(proposers)); } - public void onForkChoiceUpdated( - Bytes32 optimisticHeadBeaconBlockRoot, ForkChoiceState forkChoiceState) { - LOG.debug("onForkChoiceUpdated {}, {}", optimisticHeadBeaconBlockRoot, forkChoiceState); - - Optional currentValidatedHead = - recentChainData.getChainHead().map(StateAndBlockSummary::getRoot); - boolean isForkChoiceStateOnValidatedHead = - currentValidatedHead.isPresent() - && optimisticHeadBeaconBlockRoot.equals(currentValidatedHead.get()); - - currentForkChoiceUpdatedInfo.forkChoiceState = Optional.of(forkChoiceState); - currentForkChoiceUpdatedInfo.isOnValidatedHead = isForkChoiceStateOnValidatedHead; - - recentChainData - .getCurrentSlot() - .ifPresent( - (slot) -> { - UInt64 targetSlot = slot.plus(1); - callExecutionEngineForkChoiceUpdated(getPayloadAttributes(targetSlot), targetSlot); - }); + public void onForkChoiceUpdated(final ForkChoiceState forkChoiceState) { + eventThread.execute(() -> internalForkChoiceUpdated(forkChoiceState)); } - public void onAttestationsDue(UInt64 slot) { - LOG.debug("onAttestationsDue {}", slot); - - maybeLastAttestationDueSlot = Optional.of(slot); - - UInt64 targetSlot = slot.plus(1); - - if (currentForkChoiceUpdatedInfo.hasNotBeenCalledForSlot(targetSlot)) { - - Optional payloadAttributes = getPayloadAttributes(targetSlot); - - if (payloadAttributes.isPresent()) { - callExecutionEngineForkChoiceUpdated(payloadAttributes, targetSlot); - } - } + public void onAttestationsDue(final UInt64 slot) { + eventThread.execute(() -> internalAttestationsDue(slot)); } - public CurrentForkChoiceUpdatedInfo getCurrentForkChoiceUpdatedInfo() { - return currentForkChoiceUpdatedInfo; + public SafeFuture> getPayloadId() { + return eventThread.execute(() -> lastPayloadId); } - private void updatePreparableProposers( - Collection beaconPreparableProposers, UInt64 currentSlot) { - - final int maxProposerSeenSlot = spec.getSlotsPerEpoch(currentSlot) * MAX_PROPOSER_SEEN_EPOCHS; - boolean newProposerIndexAdded = false; - - // remove expired proposers - proposerIndexLastSeenSlotAndFeeRecipient = - proposerIndexLastSeenSlotAndFeeRecipient.entrySet().stream() - .filter(p -> p.getValue().lastSeenSlot.compareTo(currentSlot) < maxProposerSeenSlot) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + private void internalUpdatePreparableProposers( + final Collection proposers) { + eventThread.checkOnEventThread(); + // Default to the genesis slot if we're pre-genesis. + final UInt64 currentSlot = recentChainData.getCurrentSlot().orElse(SpecConfig.GENESIS_SLOT); + + // Remove expired validators + proposerInfoByValidatorIndex.values().removeIf(info -> info.hasExpired(currentSlot)); + + // Update validators + final UInt64 expirySlot = + currentSlot.plus(spec.getSlotsPerEpoch(currentSlot) * MAX_PROPOSER_SEEN_EPOCHS); + for (BeaconPreparableProposer proposer : proposers) { + proposerInfoByValidatorIndex.put( + proposer.getValidatorIndex(), new ProposerInfo(expirySlot, proposer.getFeeRecipient())); + } - // update or add new proposers - for (BeaconPreparableProposer beaconPreparableProposer : beaconPreparableProposers) { - ProposerLastSeenSlotAndFeeRecipient current = - proposerIndexLastSeenSlotAndFeeRecipient.get( - beaconPreparableProposer.getValidatorIndex()); - if (current == null) { - proposerIndexLastSeenSlotAndFeeRecipient.put( - beaconPreparableProposer.getValidatorIndex(), - new ProposerLastSeenSlotAndFeeRecipient( - currentSlot, beaconPreparableProposer.getFeeRecipient())); + // Update payload attributes in case we now need to propose the next block + updatePayloadAttributes(currentSlot.plus(1)); + } - newProposerIndexAdded = true; + private void internalForkChoiceUpdated(final ForkChoiceState forkChoiceState) { + eventThread.checkOnEventThread(); - } else { - current.lastSeenSlot = currentSlot; - } + if (this.forkChoiceState.isPresent() && this.forkChoiceState.get().equals(forkChoiceState)) { + // No change required. + return; } - if (newProposerIndexAdded) { - final Optional payloadAttributes = getPayloadAttributes(currentSlot); + this.forkChoiceState = Optional.of(forkChoiceState); + recentChainData + .getCurrentSlot() + .ifPresent(currentSlot -> updatePayloadAttributes(currentSlot.plus(1))); + sendForkChoiceUpdated(); + } - if (payloadAttributes.isPresent()) - callExecutionEngineForkChoiceUpdated(payloadAttributes, currentSlot); - } + private void internalAttestationsDue(final UInt64 slot) { + eventThread.checkOnEventThread(); + // Assume `slot` is empty and check if we need to prepare to propose in the next slot + updatePayloadAttributes(slot.plus(1)); } - /** - * checks proposal preconditions and if we have a prepared proposer required to produce a block - * for the targetSlot - * - *

if conditions are met, builds a {@link PayloadAttributes} gathering data from current chain - * head state. - * - * @param targetSlot at which we want to produce a block - * @return optionally payload attributes for the block proposer due - */ - private Optional getPayloadAttributes(UInt64 targetSlot) { - if (!currentForkChoiceUpdatedInfo.isPossibleToPropose()) { - return Optional.empty(); + private void sendForkChoiceUpdated() { + if (lastSentForkChoiceState.equals(forkChoiceState) + && lastSentPayloadAttributes.equals(payloadAttributes)) { + // No change to previously sent values so no need to resend + return; } - - return recentChainData - .getChainHead() - .map( - stateAndBlockSummary -> { - UInt64 proposerIndex = - UInt64.valueOf( - spec.getBeaconProposerIndex(stateAndBlockSummary.getState(), targetSlot)); - - ProposerLastSeenSlotAndFeeRecipient maybePreparedProposer = - proposerIndexLastSeenSlotAndFeeRecipient.get(proposerIndex); - - if (maybePreparedProposer != null) { - - final BeaconState state = stateAndBlockSummary.getState(); - final UInt64 epoch = spec.computeEpochAtSlot(targetSlot); - - return new PayloadAttributes( - spec.computeTimeAtSlot(state, targetSlot), - spec.atEpoch(epoch).beaconStateAccessors().getRandaoMix(state, epoch), - maybePreparedProposer.feeRecipient); - } - return null; - }); + forkChoiceState.ifPresentOrElse( + forkChoiceState -> { + lastSentForkChoiceState = this.forkChoiceState; + lastSentPayloadAttributes = payloadAttributes; + // Previous payload is no longer useful as we've moved on to prepping the next block + lastPayloadId = Optional.empty(); + executionEngineChannel + .forkChoiceUpdated(forkChoiceState, payloadAttributes) + .thenAcceptAsync( + result -> handleForkChoiceResult(forkChoiceState, result), eventThread) + .finish(error -> LOG.error("Failed to notify EL of fork choice update", error)); + }, + () -> + LOG.warn( + "Could not notify EL of fork choice update because fork choice state is not yet known")); } - private void callExecutionEngineForkChoiceUpdated( - final Optional payloadAttributes, - final UInt64 executionPayloadTargetSlot) { - - currentForkChoiceUpdatedInfo.calledAtSlot = Optional.of(executionPayloadTargetSlot); - currentForkChoiceUpdatedInfo.payloadAttributes = payloadAttributes; - - ForkChoiceState forkChoiceState = - currentForkChoiceUpdatedInfo.forkChoiceState.orElseThrow( - () -> new IllegalStateException("A current ForkChoiceState is expected")); - - // do we need to .join() or ordering will be respected at this point? - // I think it is currently synchronous up to the actual call - // or maybe if it is now a real channel is not the case. - executionEngineChannel - .forkChoiceUpdated(forkChoiceState, payloadAttributes) + private void updatePayloadAttributes(final UInt64 blockSlot) { + calculatePayloadAttributes(blockSlot) + .thenAcceptAsync( + newPayloadAttributes -> updatePayloadAttributes(blockSlot, newPayloadAttributes), + eventThread) .finish( - result -> { - currentForkChoiceUpdatedInfo.result = Optional.of(result); - LOG.info("forkChoiceUpdated result: {}", result); - }, error -> - LOG.error( - "Error while calling forkChoiceUpdated. Message: {}", error.getMessage())); + LOG.error("Failed to calculate payload attributes for slot {}", blockSlot, error)); } - @VisibleForTesting - Bytes20 getProposerIndexFeeRecipient(UInt64 proposerIndex) { - ProposerLastSeenSlotAndFeeRecipient lastSeenSlotAndFeeRecipient = - proposerIndexLastSeenSlotAndFeeRecipient.get(proposerIndex); - return lastSeenSlotAndFeeRecipient != null ? lastSeenSlotAndFeeRecipient.feeRecipient : null; + private void updatePayloadAttributes( + final UInt64 blockSlot, final Optional newPayloadAttributes) { + eventThread.checkOnEventThread(); + if (payloadAttributes.equals(newPayloadAttributes)) { + // No change, nothing to do. + return; + } + final UInt64 currentSlot = recentChainData.getCurrentSlot().orElse(UInt64.ZERO); + if (currentSlot.isGreaterThanOrEqualTo(blockSlot)) { + // Slot has already progressed so this update is too late, just drop it. + LOG.warn( + "Payload attribute calculation for slot {} took too long. Slot was already {}", + blockSlot, + currentSlot); + return; + } + payloadAttributes = newPayloadAttributes; + sendForkChoiceUpdated(); } - @VisibleForTesting - UInt64 getProposerIndexLastSeenSlot(UInt64 proposerIndex) { - ProposerLastSeenSlotAndFeeRecipient lastSeenSlotAndFeeRecipient = - proposerIndexLastSeenSlotAndFeeRecipient.get(proposerIndex); - return lastSeenSlotAndFeeRecipient != null ? lastSeenSlotAndFeeRecipient.lastSeenSlot : null; + private void handleForkChoiceResult( + final ForkChoiceState forkChoiceState, final ForkChoiceUpdatedResult result) { + eventThread.checkOnEventThread(); + if (lastSentForkChoiceState.isEmpty() + || !lastSentForkChoiceState.get().equals(forkChoiceState)) { + // Debug level because this is quite likely to happen when syncing + LOG.debug("Execution engine did not return payload ID in time, discarding"); + return; + } + lastPayloadId = result.getPayloadId(); } - private static class ProposerLastSeenSlotAndFeeRecipient { - UInt64 lastSeenSlot; - Bytes20 feeRecipient; - - public ProposerLastSeenSlotAndFeeRecipient(UInt64 lastSeenSlot, Bytes20 feeRecipient) { - this.lastSeenSlot = lastSeenSlot; - this.feeRecipient = feeRecipient; + private SafeFuture> calculatePayloadAttributes( + final UInt64 blockSlot) { + eventThread.checkOnEventThread(); + if (forkChoiceState.isEmpty()) { + // No known fork choice state so no point calculating payload attributes + return SafeFuture.completedFuture(Optional.empty()); } + final UInt64 epoch = spec.computeEpochAtSlot(blockSlot); + // TODO: Return empty if chain head is not same as optimistic chain head + // TODO: Alternatively just limit how many epochs of empty slots we'll process to avoid burning + // CPU pointlessly during optimistic sync + return getStateInEpoch(epoch) + .thenApplyAsync( + maybeState -> calculatePayloadAttributes(blockSlot, epoch, maybeState), eventThread); } - public static class CurrentForkChoiceUpdatedInfo { - private Optional forkChoiceState = Optional.empty(); - private Boolean isOnValidatedHead = false; - private Optional calledAtSlot = Optional.empty(); - private Optional payloadAttributes = Optional.empty(); - private Optional result = Optional.empty(); - - private boolean isPossibleToPropose() { - return forkChoiceState.isPresent() && isOnValidatedHead; + private Optional calculatePayloadAttributes( + final UInt64 blockSlot, final UInt64 epoch, final Optional maybeState) { + eventThread.checkOnEventThread(); + if (maybeState.isEmpty()) { + return Optional.empty(); } - - private boolean hasNotBeenCalledForSlot(UInt64 slot) { - return calledAtSlot.map(lastSlot -> lastSlot.compareTo(slot) < 0).orElse(true); + final BeaconState state = maybeState.get(); + final UInt64 proposerIndex = UInt64.valueOf(spec.getBeaconProposerIndex(state, blockSlot)); + final ProposerInfo proposerInfo = proposerInfoByValidatorIndex.get(proposerIndex); + if (proposerInfo == null) { + // Proposer is not one of our validators. No need to propose a block. + return Optional.empty(); } + final UInt64 timestamp = spec.computeTimeAtSlot(state, blockSlot); + final Bytes32 random = spec.getRandaoMix(state, epoch); + return Optional.of(new PayloadAttributes(timestamp, random, proposerInfo.feeRecipient)); + } - public Optional getCalledAtSlot() { - return calledAtSlot; + private SafeFuture> getStateInEpoch(final UInt64 requiredEpoch) { + final Optional chainHead = recentChainData.getChainHead(); + if (chainHead.isEmpty()) { + return SafeFuture.completedFuture(Optional.empty()); } - - public Optional getForkChoiceState() { - return forkChoiceState; + final StateAndBlockSummary head = chainHead.get(); + if (spec.computeEpochAtSlot(head.getSlot()).equals(requiredEpoch)) { + return SafeFuture.completedFuture(Optional.of(head.getState())); + } else { + // TODO: Chain head is from a prior epoch, we want to avoid processing a lot of empty slots if + // we're using optimistic sync as that would just waste CPU. + return recentChainData.retrieveStateAtSlot( + new SlotAndBlockRoot(spec.computeStartSlotAtEpoch(requiredEpoch), head.getRoot())); } + } - public Boolean getOnValidatedHead() { - return isOnValidatedHead; - } + private static class ProposerInfo { + UInt64 expirySlot; + Bytes20 feeRecipient; - public Optional getPayloadAttributes() { - return payloadAttributes; + public ProposerInfo(UInt64 expirySlot, Bytes20 feeRecipient) { + this.expirySlot = expirySlot; + this.feeRecipient = feeRecipient; } - public Optional getResult() { - return result; + public boolean hasExpired(final UInt64 currentSlot) { + return currentSlot.isGreaterThanOrEqualTo(expirySlot); } } } diff --git a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java index 4c9aac668ce..8b5da1621d9 100644 --- a/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java +++ b/ethereum/statetransition/src/test/java/tech/pegasys/teku/statetransition/forkchoice/ForkChoiceNotifierTest.java @@ -16,33 +16,31 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.reset; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import static tech.pegasys.teku.infrastructure.async.SafeFutureAssert.assertThatSafeFuture; -import java.util.Collection; import java.util.List; import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.apache.tuweni.bytes.Bytes32; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.TestTemplate; -import org.mockito.ArgumentCaptor; -import tech.pegasys.teku.bls.BLSKeyGenerator; -import tech.pegasys.teku.bls.BLSKeyPair; -import tech.pegasys.teku.core.ChainBuilder; +import org.junit.jupiter.api.Test; import tech.pegasys.teku.infrastructure.async.SafeFuture; +import tech.pegasys.teku.infrastructure.async.eventthread.InlineEventThread; import tech.pegasys.teku.infrastructure.unsigned.UInt64; +import tech.pegasys.teku.protoarray.ForkChoiceStrategy; import tech.pegasys.teku.spec.Spec; -import tech.pegasys.teku.spec.TestSpecContext; -import tech.pegasys.teku.spec.TestSpecInvocationContextProvider; +import tech.pegasys.teku.spec.TestSpecFactory; import tech.pegasys.teku.spec.datastructures.blocks.SignedBlockAndState; +import tech.pegasys.teku.spec.datastructures.blocks.SlotAndBlockRoot; import tech.pegasys.teku.spec.datastructures.operations.versions.merge.BeaconPreparableProposer; +import tech.pegasys.teku.spec.datastructures.state.beaconstate.BeaconState; +import tech.pegasys.teku.spec.executionengine.ExecutePayloadResult; import tech.pegasys.teku.spec.executionengine.ExecutionEngineChannel; +import tech.pegasys.teku.spec.executionengine.ExecutionPayloadStatus; import tech.pegasys.teku.spec.executionengine.ForkChoiceState; import tech.pegasys.teku.spec.executionengine.ForkChoiceUpdatedResult; import tech.pegasys.teku.spec.executionengine.ForkChoiceUpdatedStatus; @@ -50,34 +48,25 @@ import tech.pegasys.teku.spec.logic.common.block.AbstractBlockProcessor; import tech.pegasys.teku.spec.util.DataStructureUtil; import tech.pegasys.teku.ssz.type.Bytes20; -import tech.pegasys.teku.storage.client.ChainUpdater; +import tech.pegasys.teku.ssz.type.Bytes8; import tech.pegasys.teku.storage.client.RecentChainData; -import tech.pegasys.teku.storage.server.StateStorageMode; import tech.pegasys.teku.storage.storageSystem.InMemoryStorageSystemBuilder; import tech.pegasys.teku.storage.storageSystem.StorageSystem; -@TestSpecContext() -public class ForkChoiceNotifierTest { - private static final List VALIDATOR_KEYS = BLSKeyGenerator.generateKeyPairs(64); +class ForkChoiceNotifierTest { - final ArgumentCaptor forkChoiceStateCaptor = - ArgumentCaptor.forClass(ForkChoiceState.class); + private final InlineEventThread eventThread = new InlineEventThread(); + private final Spec spec = TestSpecFactory.createMinimalMerge(); + private final DataStructureUtil dataStructureUtil = new DataStructureUtil(spec); - @SuppressWarnings("unchecked") - final ArgumentCaptor> payloadAttributesCaptor = - ArgumentCaptor.forClass(Optional.class); + private final StorageSystem storageSystem = InMemoryStorageSystemBuilder.buildDefault(spec); + private final RecentChainData recentChainData = storageSystem.recentChainData(); + private ForkChoiceStrategy forkChoiceStrategy; - ChainBuilder chainBuilder; - StorageSystem storageSystem; - ChainUpdater chainUpdater; - RecentChainData recentChainData; - ForkChoiceNotifier forkChoiceNotifier; - DataStructureUtil dataStructureUtil; - ForkChoiceState forkChoiceState; - Bytes32 root; - Spec spec; + private final ExecutionEngineChannel executionEngineChannel = mock(ExecutionEngineChannel.class); - ExecutionEngineChannel executionEngineChannel = mock(ExecutionEngineChannel.class); + private final ForkChoiceNotifier notifier = + new ForkChoiceNotifier(eventThread, spec, executionEngineChannel, recentChainData); @BeforeAll public static void initSession() { @@ -90,201 +79,181 @@ public static void resetSession() { } @BeforeEach - void setUp(TestSpecInvocationContextProvider.SpecContext specContext) { - dataStructureUtil = specContext.getDataStructureUtil(); - spec = specContext.getSpec(); - chainBuilder = ChainBuilder.create(spec, VALIDATOR_KEYS); - storageSystem = InMemoryStorageSystemBuilder.buildDefault(StateStorageMode.ARCHIVE); - recentChainData = storageSystem.recentChainData(); - chainUpdater = new ChainUpdater(storageSystem.recentChainData(), chainBuilder); - chainUpdater.initializeGenesis(false); - - forkChoiceNotifier = new ForkChoiceNotifier(recentChainData, executionEngineChannel, spec); - - resetExecutionEngineChannelMock(); + void setUp() { + storageSystem.chainUpdater().initializeGenesis(false); + forkChoiceStrategy = recentChainData.getForkChoiceStrategy().orElseThrow(); + when(executionEngineChannel.executePayload(any())) + .thenReturn( + SafeFuture.completedFuture( + new ExecutePayloadResult( + ExecutionPayloadStatus.VALID, Optional.empty(), Optional.empty()))); + when(executionEngineChannel.forkChoiceUpdated(any(), any())) + .thenReturn( + SafeFuture.completedFuture( + new ForkChoiceUpdatedResult(ForkChoiceUpdatedStatus.SUCCESS, Optional.empty()))); } - @TestTemplate - void onForkChoiceShouldCallForkChoiceUpdatedWithAttributesWhenProposerIsPrepared() { - prepareAllValidators(); - - setRootAndForkChoiceState(); - - forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); - - validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); + @Test + void onForkChoiceUpdated_shouldSendNotificationToExecutionEngine() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); } - @TestTemplate - void onForkChoiceShouldCallForkChoiceUpdatedWithoutAttributesWhenProposerIsNotPrepared() { - setRootAndForkChoiceState(); + @Test + void onForkChoiceUpdated_shouldSendNotificationWithPayloadAttributesForNextProposer() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.ONE; + final PayloadAttributes payloadAttributes = withProposerForSlot(headState, blockSlot); - forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); - - validateForkChoiceUpdatedWithoutPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel) + .forkChoiceUpdated(forkChoiceState, Optional.of(payloadAttributes)); } - @TestTemplate - void onUpdatePreparableProposersShouldNotCallForkChoiceUpdatedWithNotForkChoiceState() { - prepareAllValidators(); - - validateForkChoiceUpdatedHasNotBeenCalled(); + @Test + void onForkChoiceUpdated_shouldSendNotificationWithoutPayloadAttributesWhenNotProposingNext() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.ONE; + final int notTheNextProposer = spec.getBeaconProposerIndex(headState, blockSlot) + 1; + notifier.onUpdatePreparableProposers( + List.of( + new BeaconPreparableProposer( + UInt64.valueOf(notTheNextProposer), dataStructureUtil.randomBytes20()))); + + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); } - @TestTemplate - void onUpdatePreparableProposersShouldNotCallForkChoiceUpdatedWhenNotPreparingProposer() { - setRootAndForkChoiceState(); - - forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); - - resetExecutionEngineChannelMock(); + @Test + void onAttestationsDue_shouldNotSendUpdateIfNotChanged() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); - prepareWithNonProposingValidators(); + notifier.onAttestationsDue(UInt64.ZERO); + verifyNoMoreInteractions(executionEngineChannel); - validateForkChoiceUpdatedHasNotBeenCalled(); + notifier.onAttestationsDue(UInt64.ONE); + verifyNoMoreInteractions(executionEngineChannel); } - @TestTemplate - void onUpdatePreparableProposersShouldCallForkChoiceUpdatedOnCurrentSlotWhenPreparingProposer() { - setRootAndForkChoiceState(); + @Test + void onAttestationsDue_shouldSendUpdateIfWeAreDueToProposeNext() { + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.valueOf(2); + final PayloadAttributes payloadAttributes = withProposerForSlot(headState, blockSlot); - forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + notifier.onForkChoiceUpdated(forkChoiceState); + // Not proposing block 1 so no payload attributes + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); - resetExecutionEngineChannelMock(); + notifier.onAttestationsDue(UInt64.ZERO); + verifyNoMoreInteractions(executionEngineChannel); - prepareAllValidators(); - - validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot()); + // Slot 1 is now assumed empty so prepare to propose in slot 2 + notifier.onAttestationsDue(UInt64.ONE); + verify(executionEngineChannel) + .forkChoiceUpdated(forkChoiceState, Optional.of(payloadAttributes)); } - @TestTemplate - void - onUpdatePreparableProposersShouldCallForkChoiceUpdatedOnNextSlotWhenPreparingProposerAfterAttestationDue() { - setRootAndForkChoiceState(); - - forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); - forkChoiceNotifier.onAttestationsDue(chainBuilder.getLatestSlot()); + @Test + void shouldUseStateFromCorrectEpochToCalculateBlockProposer() { + final int firstSlotOfNextEpoch = spec.getSlotsPerEpoch(UInt64.ZERO); + final UInt64 blockSlot = UInt64.valueOf(firstSlotOfNextEpoch); + final UInt64 slotBeforeBlock = blockSlot.minus(1); + final SignedBlockAndState headBlockAndState = + storageSystem.chainUpdater().advanceChainUntil(firstSlotOfNextEpoch - 1); + storageSystem.chainUpdater().updateBestBlock(headBlockAndState); + final BeaconState headState = headBlockAndState.getState(); + storageSystem.chainUpdater().setTime(spec.computeTimeAtSlot(headState, slotBeforeBlock)); + assertThat(recentChainData.getCurrentSlot()).contains(slotBeforeBlock); - resetExecutionEngineChannelMock(); + final PayloadAttributes payloadAttributes = withProposerForSlot(blockSlot); - prepareAllValidators(); + notifier.onForkChoiceUpdated(getCurrentForkChoiceState()); - validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); + verify(executionEngineChannel) + .forkChoiceUpdated(getCurrentForkChoiceState(), Optional.of(payloadAttributes)); } - @TestTemplate - void complexScenario1() { - setRootAndForkChoiceState(); - - // we begin with no proposers - forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); - - validateForkChoiceUpdatedWithoutPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); - - resetExecutionEngineChannelMock(); - - // then, before attestationDue, we prepare validator - prepareAllValidators(); + @Test + void onUpdatePreparableProposers_shouldSendNewNotificationWhenProposerAdded() { + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.ONE; - validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot()); + notifier.onForkChoiceUpdated(forkChoiceState); + verify(executionEngineChannel).forkChoiceUpdated(forkChoiceState, Optional.empty()); - resetExecutionEngineChannelMock(); - - // then forkchoice state changes before attestationDue - forkChoiceNotifier.onForkChoiceUpdated(root, forkChoiceState); - - validateForkChoiceUpdatedWithPayloadAttributes(chainBuilder.getLatestSlot().plus(1)); - - resetExecutionEngineChannelMock(); - - // attestationDue arrives - - forkChoiceNotifier.onAttestationsDue(chainBuilder.getLatestSlot()); - - validateForkChoiceUpdatedHasNotBeenCalled(); + final PayloadAttributes payloadAttributes = withProposerForSlot(headState, blockSlot); + verify(executionEngineChannel) + .forkChoiceUpdated(forkChoiceState, Optional.of(payloadAttributes)); } - private void prepareAllValidators() { - Collection proposers = - IntStream.range(0, VALIDATOR_KEYS.size()) - .mapToObj( - index -> - new BeaconPreparableProposer( - UInt64.valueOf(index), dataStructureUtil.randomBytes20())) - .collect(Collectors.toList()); + @Test + void getPayloadId_shouldReturnLatestPayloadId() { + final Bytes8 payloadId = dataStructureUtil.randomBytes8(); + final ForkChoiceState forkChoiceState = getCurrentForkChoiceState(); + final BeaconState headState = recentChainData.getBestState().orElseThrow(); + final UInt64 blockSlot = UInt64.ONE; + final PayloadAttributes payloadAttributes = withProposerForSlot(headState, blockSlot); - forkChoiceNotifier.onUpdatePreparableProposers(proposers); - } + final SafeFuture responseFuture = new SafeFuture<>(); + when(executionEngineChannel.forkChoiceUpdated(forkChoiceState, Optional.of(payloadAttributes))) + .thenReturn(responseFuture); - private void prepareWithNonProposingValidators() { - Collection proposers = - IntStream.range(VALIDATOR_KEYS.size() + 1, VALIDATOR_KEYS.size() + 5) - .mapToObj( - index -> - new BeaconPreparableProposer( - UInt64.valueOf(index), dataStructureUtil.randomBytes20())) - .collect(Collectors.toList()); + notifier.onForkChoiceUpdated(forkChoiceState); - forkChoiceNotifier.onUpdatePreparableProposers(proposers); - } + // Initially has no payload ID. + assertThatSafeFuture(notifier.getPayloadId()).isCompletedWithEmptyOptional(); - private void validateForkChoiceUpdatedHasNotBeenCalled() { - verify(executionEngineChannel, never()) - .forkChoiceUpdated(forkChoiceStateCaptor.capture(), payloadAttributesCaptor.capture()); + // But becomes available once we receive the response + responseFuture.complete( + new ForkChoiceUpdatedResult(ForkChoiceUpdatedStatus.SUCCESS, Optional.of(payloadId))); + assertThatSafeFuture(notifier.getPayloadId()).isCompletedWithOptionalContaining(payloadId); } - private void validateForkChoiceUpdatedWithPayloadAttributes(UInt64 targetSlot) { - verify(executionEngineChannel) - .forkChoiceUpdated(forkChoiceStateCaptor.capture(), payloadAttributesCaptor.capture()); - - assertThat(forkChoiceStateCaptor.getValue()).isEqualToComparingFieldByField(forkChoiceState); - - SignedBlockAndState signedBlockAndState = chainBuilder.getLatestBlockAndState(); - UInt64 expectedProposerIndex = - UInt64.valueOf(spec.getBeaconProposerIndex(signedBlockAndState.getState(), targetSlot)); - Bytes20 proposerLastSeenSlotAndFeeRecipient = - forkChoiceNotifier.getProposerIndexFeeRecipient(expectedProposerIndex); - assertThat(proposerLastSeenSlotAndFeeRecipient).isNotNull(); - - Optional payloadAttributes = payloadAttributesCaptor.getValue(); - assertThat(payloadAttributes).isNotEmpty(); - assertThat(payloadAttributes.get().getFeeRecipient()) - .isEqualTo(proposerLastSeenSlotAndFeeRecipient); + private PayloadAttributes withProposerForSlot(final UInt64 blockSlot) { + final Bytes32 bestBlockRoot = recentChainData.getBestBlockRoot().orElseThrow(); + final BeaconState state = + recentChainData + .retrieveStateAtSlot(new SlotAndBlockRoot(blockSlot, bestBlockRoot)) + .join() + .orElseThrow(); + return withProposerForSlot(state, blockSlot); } - private void validateForkChoiceUpdatedWithoutPayloadAttributes(UInt64 targetSlot) { - verify(executionEngineChannel) - .forkChoiceUpdated(forkChoiceStateCaptor.capture(), payloadAttributesCaptor.capture()); - - assertThat(forkChoiceStateCaptor.getValue()).isEqualToComparingFieldByField(forkChoiceState); - - SignedBlockAndState signedBlockAndState = chainBuilder.getLatestBlockAndState(); - UInt64 expectedProposerIndex = - UInt64.valueOf(spec.getBeaconProposerIndex(signedBlockAndState.getState(), targetSlot)); - Bytes20 proposerLastSeenSlotAndFeeRecipient = - forkChoiceNotifier.getProposerIndexFeeRecipient(expectedProposerIndex); - assertThat(proposerLastSeenSlotAndFeeRecipient).isNull(); - - Optional payloadAttributes = payloadAttributesCaptor.getValue(); - assertThat(payloadAttributes).isEmpty(); + private PayloadAttributes withProposerForSlot( + final BeaconState headState, final UInt64 blockSlot) { + final int block2Proposer = spec.getBeaconProposerIndex(headState, blockSlot); + final PayloadAttributes payloadAttributes = getExpectedPayloadAttributes(headState, blockSlot); + notifier.onUpdatePreparableProposers( + List.of( + new BeaconPreparableProposer( + UInt64.valueOf(block2Proposer), payloadAttributes.getFeeRecipient()))); + return payloadAttributes; } - private void resetExecutionEngineChannelMock() { - reset(executionEngineChannel); - when(executionEngineChannel.forkChoiceUpdated(any(), any())) - .thenReturn( - SafeFuture.completedFuture( - new ForkChoiceUpdatedResult(ForkChoiceUpdatedStatus.SUCCESS, Optional.empty()))); + private PayloadAttributes getExpectedPayloadAttributes( + final BeaconState headState, final UInt64 blockSlot) { + final Bytes20 feeRecipient = dataStructureUtil.randomBytes20(); + final UInt64 timestamp = spec.computeTimeAtSlot(headState, blockSlot); + final Bytes32 random = spec.getRandaoMix(headState, UInt64.ZERO); + return new PayloadAttributes(timestamp, random, feeRecipient); } - private void setRootAndForkChoiceState() { - SignedBlockAndState signedBlockAndState = chainBuilder.getLatestBlockAndState(); - root = signedBlockAndState.getRoot(); - Bytes32 executionHeadRoot = dataStructureUtil.randomBytes32(); - forkChoiceState = - new ForkChoiceState( - executionHeadRoot, - dataStructureUtil.randomBytes32(), - dataStructureUtil.randomBytes32()); + private ForkChoiceState getCurrentForkChoiceState() { + final Bytes32 headBlockRoot = recentChainData.getBestBlockRoot().orElseThrow(); + final Bytes32 headExecutionHash = + forkChoiceStrategy.executionBlockHash(headBlockRoot).orElseThrow(); + final Bytes32 finalizedRoot = recentChainData.getFinalizedCheckpoint().orElseThrow().getRoot(); + final Bytes32 finalizedExecutionHash = + forkChoiceStrategy.executionBlockHash(finalizedRoot).orElseThrow(); + + return new ForkChoiceState(headExecutionHash, headExecutionHash, finalizedExecutionHash); } } diff --git a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java index 8239d55801f..80516d5b145 100644 --- a/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java +++ b/services/beaconchain/src/main/java/tech/pegasys/teku/services/beaconchain/BeaconChainController.java @@ -832,7 +832,8 @@ private void initOperationsReOrgManager() { private void initForkChoiceNotifier() { LOG.debug("BeaconChainController.initForkChoiceNotifier()"); - forkChoiceNotifier = new ForkChoiceNotifier(recentChainData, executionEngine, spec); + forkChoiceNotifier = + ForkChoiceNotifier.create(asyncRunnerFactory, spec, executionEngine, recentChainData); } private void setupInitialState(final RecentChainData client) { diff --git a/services/powchain/build.gradle b/services/powchain/build.gradle index e958ad59103..6ae0020d375 100644 --- a/services/powchain/build.gradle +++ b/services/powchain/build.gradle @@ -9,7 +9,6 @@ dependencies { implementation project(':infrastructure:metrics') implementation project(':infrastructure:version') implementation project(':pow') - implementation project(':storage') implementation project(':storage:api') implementation project(':infrastructure:serviceutils') implementation project(':util') From a805134f153eee8d561f6b36f7afd049318d976e Mon Sep 17 00:00:00 2001 From: Adrian Sutton Date: Mon, 22 Nov 2021 15:44:04 +1000 Subject: [PATCH 6/6] Update test. --- .../teku/validator/client/BeaconProposerPreparerTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java b/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java index c68e58356b8..6a517820da9 100644 --- a/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java +++ b/validator/client/src/test/java/tech/pegasys/teku/validator/client/BeaconProposerPreparerTest.java @@ -17,9 +17,9 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import java.util.Collection; @@ -102,8 +102,11 @@ void should_callPrepareBeaconProposerAtBeginningOfEpoch(SpecContext specContext) @TestTemplate void should_notCallPrepareBeaconProposerAfterFirstSlotOfEpoch() { + beaconProposerPreparer.onSlot(UInt64.ZERO); + verify(validatorApiChannel).prepareBeaconProposer(any()); + beaconProposerPreparer.onSlot(UInt64.ONE); - verify(validatorApiChannel, never()).prepareBeaconProposer(any()); + verifyNoMoreInteractions(validatorApiChannel); } @TestTemplate