Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

7311: Build new peer task system with proof of concept example task implementation #7602

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ee23926
7311: Build new peer task system with proof of concept example task i…
Matilda-Clerke Sep 10, 2024
f872c12
7311: Fix reference hash change
Matilda-Clerke Sep 10, 2024
92802c5
7311: Add @params to javadoc for new parameters
Matilda-Clerke Sep 11, 2024
40f79a2
7311: Remove excess logging, select peer with highest reputation inst…
Matilda-Clerke Sep 11, 2024
4871d27
Merge branch 'main' into spike-replace-peer-task-system
Matilda-Clerke Sep 11, 2024
069c6f9
Merge branch 'main' into spike-replace-peer-task-system
Matilda-Clerke Sep 11, 2024
2af6e14
7311: spotless
Matilda-Clerke Sep 11, 2024
b744ece
7311: Add PoS check when considering chain height of peer
Matilda-Clerke Sep 11, 2024
1430f48
7311: Fix CheckPointSyncChainDownloaderTest
Matilda-Clerke Sep 11, 2024
e0e48e3
Merge branch 'main' into spike-replace-peer-task-system
Matilda-Clerke Sep 11, 2024
09893fc
7311: Provide feedback to peer
Matilda-Clerke Sep 12, 2024
8970306
7311: Replace second usage of GetReceiptsFromPeerTask and remove obso…
Matilda-Clerke Sep 13, 2024
998de68
Merge branch 'main' into spike-replace-peer-task-system
Matilda-Clerke Sep 13, 2024
5b3b051
7311: Refactor some usages of GetHeaadersFromPeerByNumberTask
Matilda-Clerke Sep 16, 2024
c737d4b
Merge branch 'main' into spike-replace-peer-task-system
Matilda-Clerke Sep 16, 2024
38bd8c1
7311: spotless
Matilda-Clerke Sep 16, 2024
2b2d9b8
Merge remote-tracking branch 'origin/spike-replace-peer-task-system' …
Matilda-Clerke Sep 16, 2024
a88c1eb
7311: Add peer request timer to PeerTaskExecutor
Matilda-Clerke Sep 16, 2024
2276bfd
Merge branch 'main' into spike-replace-peer-task-system
Matilda-Clerke Sep 16, 2024
2332ceb
7311: Fix javadoc issue
Matilda-Clerke Sep 16, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.MonitoredExecutors;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.manager.peertask.RequestSender;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.CheckpointBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.ClassicForkPeerValidator;
Expand Down Expand Up @@ -655,6 +658,13 @@ public BesuController build() {
final boolean fullSyncDisabled = !SyncMode.isFullSync(syncConfig.getSyncMode());
final SyncState syncState = new SyncState(blockchain, ethPeers, fullSyncDisabled, checkpoint);

final PeerManager peerManager = new PeerManager();
ethPeers.streamAllPeers().forEach(peerManager::addPeer);

final PeerTaskExecutor peerTaskExecutor =
new PeerTaskExecutor(
peerManager, new RequestSender(), currentProtocolSpecSupplier, metricsSystem);

if (chainPrunerConfiguration.getChainPruningEnabled()) {
final ChainDataPruner chainDataPruner = createChainPruner(blockchainStorage);
blockchain.observeBlockAdded(chainDataPruner);
Expand All @@ -677,7 +687,8 @@ public BesuController build() {
besuComponent.map(BesuComponent::getBlobCache).orElse(new BlobCache()),
miningParameters);

final List<PeerValidator> peerValidators = createPeerValidators(protocolSchedule);
final List<PeerValidator> peerValidators =
createPeerValidators(protocolSchedule, peerTaskExecutor);

final EthProtocolManager ethProtocolManager =
createEthProtocolManager(
Expand All @@ -689,6 +700,7 @@ public BesuController build() {
ethContext,
ethMessages,
scheduler,
peerManager,
peerValidators,
Optional.empty(),
forkIdManager);
Expand All @@ -705,7 +717,8 @@ public BesuController build() {
ethContext,
syncState,
ethProtocolManager,
pivotBlockSelector);
pivotBlockSelector,
peerTaskExecutor);

ethPeers.setTrailingPeerRequirementsSupplier(synchronizer::calculateTrailingPeerRequirements);

Expand Down Expand Up @@ -828,6 +841,7 @@ private TrieLogPruner createTrieLogPruner(
* @param syncState the sync state
* @param ethProtocolManager the eth protocol manager
* @param pivotBlockSelector the pivot block selector
* @param peerTaskExecutor the peer task executor
* @return the synchronizer
*/
protected DefaultSynchronizer createSynchronizer(
Expand All @@ -837,7 +851,8 @@ protected DefaultSynchronizer createSynchronizer(
final EthContext ethContext,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {
final PivotBlockSelector pivotBlockSelector,
final PeerTaskExecutor peerTaskExecutor) {

return new DefaultSynchronizer(
syncConfig,
Expand All @@ -852,7 +867,8 @@ protected DefaultSynchronizer createSynchronizer(
clock,
metricsSystem,
getFullSyncTerminationCondition(protocolContext.getBlockchain()),
pivotBlockSelector);
pivotBlockSelector,
peerTaskExecutor);
}

private PivotBlockSelector createPivotSelector(
Expand Down Expand Up @@ -1019,6 +1035,7 @@ protected String getSupportedProtocol() {
* @param ethContext the eth context
* @param ethMessages the eth messages
* @param scheduler the scheduler
* @param peerManager the peer manager
* @param peerValidators the peer validators
* @param mergePeerFilter the merge peer filter
* @param forkIdManager the fork id manager
Expand All @@ -1033,6 +1050,7 @@ protected EthProtocolManager createEthProtocolManager(
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final PeerManager peerManager,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
Expand All @@ -1049,6 +1067,7 @@ protected EthProtocolManager createEthProtocolManager(
mergePeerFilter,
synchronizerConfiguration,
scheduler,
peerManager,
forkIdManager);
}

Expand Down Expand Up @@ -1134,27 +1153,32 @@ private ChainDataPruner createChainPruner(final BlockchainStorage blockchainStor
* @param protocolSchedule the protocol schedule
* @return the list
*/
protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protocolSchedule) {
protected List<PeerValidator> createPeerValidators(
final ProtocolSchedule protocolSchedule, final PeerTaskExecutor peerTaskExecutor) {
final List<PeerValidator> validators = new ArrayList<>();

final OptionalLong daoBlock = genesisConfigOptions.getDaoForkBlock();
if (daoBlock.isPresent()) {
// Setup dao validator
validators.add(
new DaoForkPeerValidator(protocolSchedule, metricsSystem, daoBlock.getAsLong()));
new DaoForkPeerValidator(protocolSchedule, peerTaskExecutor, daoBlock.getAsLong()));
}

final OptionalLong classicBlock = genesisConfigOptions.getClassicForkBlock();
// setup classic validator
if (classicBlock.isPresent()) {
validators.add(
new ClassicForkPeerValidator(protocolSchedule, metricsSystem, classicBlock.getAsLong()));
new ClassicForkPeerValidator(
protocolSchedule, peerTaskExecutor, classicBlock.getAsLong()));
}

for (final Map.Entry<Long, Hash> requiredBlock : requiredBlocks.entrySet()) {
validators.add(
new RequiredBlocksPeerValidator(
protocolSchedule, metricsSystem, requiredBlock.getKey(), requiredBlock.getValue()));
protocolSchedule,
peerTaskExecutor,
requiredBlock.getKey(),
requiredBlock.getValue()));
}

final CheckpointConfigOptions checkpointConfigOptions =
Expand All @@ -1163,7 +1187,7 @@ protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protoc
validators.add(
new CheckpointBlocksPeerValidator(
protocolSchedule,
metricsSystem,
peerTaskExecutor,
checkpointConfigOptions.getNumber().orElseThrow(),
checkpointConfigOptions.getHash().map(Hash::fromHexString).orElseThrow()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand Down Expand Up @@ -241,6 +242,7 @@ protected EthProtocolManager createEthProtocolManager(
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final PeerManager peerManager,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
Expand All @@ -255,6 +257,7 @@ protected EthProtocolManager createEthProtocolManager(
ethContext,
ethMessages,
scheduler,
peerManager,
peerValidators,
mergePeerFilter,
forkIdManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.peervalidation.RequiredBlocksPeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.SynchronizerConfiguration;
Expand Down Expand Up @@ -97,6 +99,7 @@ protected EthProtocolManager createEthProtocolManager(
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final PeerManager peerManager,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
Expand Down Expand Up @@ -127,6 +130,7 @@ protected EthProtocolManager createEthProtocolManager(
ethContext,
ethMessages,
scheduler,
peerManager,
peerValidators,
filterToUse,
forkIdManager);
Expand Down Expand Up @@ -235,15 +239,16 @@ protected PluginServiceFactory createAdditionalPluginServices(
}

@Override
protected List<PeerValidator> createPeerValidators(final ProtocolSchedule protocolSchedule) {
List<PeerValidator> retval = super.createPeerValidators(protocolSchedule);
protected List<PeerValidator> createPeerValidators(
final ProtocolSchedule protocolSchedule, final PeerTaskExecutor peerTaskExecutor) {
List<PeerValidator> retval = super.createPeerValidators(protocolSchedule, peerTaskExecutor);
final OptionalLong powTerminalBlockNumber = genesisConfigOptions.getTerminalBlockNumber();
final Optional<Hash> powTerminalBlockHash = genesisConfigOptions.getTerminalBlockHash();
if (powTerminalBlockHash.isPresent() && powTerminalBlockNumber.isPresent()) {
retval.add(
new RequiredBlocksPeerValidator(
protocolSchedule,
metricsSystem,
peerTaskExecutor,
powTerminalBlockNumber.getAsLong(),
powTerminalBlockHash.get(),
0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.manager.MergePeerFilter;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerTaskExecutor;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer;
import org.hyperledger.besu.ethereum.eth.sync.PivotBlockSelector;
Expand Down Expand Up @@ -161,6 +163,7 @@ protected EthProtocolManager createEthProtocolManager(
final EthContext ethContext,
final EthMessages ethMessages,
final EthScheduler scheduler,
final PeerManager peerManager,
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final ForkIdManager forkIdManager) {
Expand All @@ -173,6 +176,7 @@ protected EthProtocolManager createEthProtocolManager(
ethContext,
ethMessages,
scheduler,
peerManager,
peerValidators,
mergePeerFilter,
forkIdManager);
Expand Down Expand Up @@ -227,7 +231,8 @@ protected DefaultSynchronizer createSynchronizer(
final EthContext ethContext,
final SyncState syncState,
final EthProtocolManager ethProtocolManager,
final PivotBlockSelector pivotBlockSelector) {
final PivotBlockSelector pivotBlockSelector,
final PeerTaskExecutor peerTaskExecutor) {

DefaultSynchronizer sync =
super.createSynchronizer(
Expand All @@ -237,7 +242,8 @@ protected DefaultSynchronizer createSynchronizer(
ethContext,
syncState,
ethProtocolManager,
pivotBlockSelector);
pivotBlockSelector,
peerTaskExecutor);

if (genesisConfigOptions.getTerminalTotalDifficulty().isPresent()) {
LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.hyperledger.besu.ethereum.core.Difficulty;
import org.hyperledger.besu.ethereum.eth.EthProtocol;
import org.hyperledger.besu.ethereum.eth.EthProtocolConfiguration;
import org.hyperledger.besu.ethereum.eth.manager.peertask.PeerManager;
import org.hyperledger.besu.ethereum.eth.messages.EthPV62;
import org.hyperledger.besu.ethereum.eth.messages.StatusMessage;
import org.hyperledger.besu.ethereum.eth.peervalidation.PeerValidator;
Expand Down Expand Up @@ -73,6 +74,7 @@ public class EthProtocolManager implements ProtocolManager, MinedBlockObserver {
private final EthPeers ethPeers;
private final EthMessages ethMessages;
private final EthContext ethContext;
private final PeerManager peerManager;
private final List<Capability> supportedCapabilities;
private final Blockchain blockchain;
private final BlockBroadcaster blockBroadcaster;
Expand All @@ -92,6 +94,7 @@ public EthProtocolManager(
final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler,
final PeerManager peerManager,
final ForkIdManager forkIdManager) {
this.networkId = networkId;
this.peerValidators = peerValidators;
Expand All @@ -100,6 +103,7 @@ public EthProtocolManager(
this.mergePeerFilter = mergePeerFilter;
this.shutdown = new CountDownLatch(1);
this.genesisHash = blockchain.getBlockHashByNumber(0L).orElse(Hash.ZERO);
this.peerManager = peerManager;

this.forkIdManager = forkIdManager;

Expand Down Expand Up @@ -140,7 +144,8 @@ public EthProtocolManager(
final List<PeerValidator> peerValidators,
final Optional<MergePeerFilter> mergePeerFilter,
final SynchronizerConfiguration synchronizerConfiguration,
final EthScheduler scheduler) {
final EthScheduler scheduler,
final PeerManager peerManager) {
this(
blockchain,
networkId,
Expand All @@ -154,6 +159,7 @@ public EthProtocolManager(
mergePeerFilter,
synchronizerConfiguration,
scheduler,
peerManager,
new ForkIdManager(
blockchain,
Collections.emptyList(),
Expand Down Expand Up @@ -337,6 +343,7 @@ public void processMessage(final Capability cap, final Message message) {
public void handleNewConnection(final PeerConnection connection) {
ethPeers.registerNewConnection(connection, peerValidators);
final EthPeer peer = ethPeers.peer(connection);
peerManager.addPeer(peer);

final Capability cap = connection.capability(getSupportedProtocol());
final ForkId latestForkId =
Expand Down Expand Up @@ -369,6 +376,7 @@ public void handleDisconnect(
final DisconnectReason reason,
final boolean initiatedByPeer) {
final boolean wasActiveConnection = ethPeers.registerDisconnect(connection);
peerManager.removePeer(connection.getPeer());
LOG.atDebug()
.setMessage("Disconnect - active Connection? {} - {} - {} - {} {} - {} peers left")
.addArgument(wasActiveConnection)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.manager.peertask;

public class NoAvailablePeerException extends Exception {}
Loading
Loading