From 3a2aeabcda56f879b34cca42bc84e77c04243d35 Mon Sep 17 00:00:00 2001 From: Fabio Di Fabio Date: Thu, 18 Aug 2022 21:45:07 +0200 Subject: [PATCH] Refactor and fix retrying get block switching peer (#4256) * Refactor retrying peer task switching peers at every try RetryingGetBlockFromPeersTask had a problem that prevented to complete when all the peers were tried without success, and that also had the consequence to not removing the failed requested block for the internal caches in BlockPropagationManager, that could cause a stall since that block will to be tried to be retrieved again. Signed-off-by: Fabio Di Fabio --- CHANGELOG.md | 1 + .../task/AbstractRetryingPeerTask.java | 11 +- .../AbstractRetryingSwitchingPeerTask.java | 153 +++++++++++++++++ .../task/RetryingGetBlockFromPeersTask.java | 98 ++++------- .../eth/sync/BlockPropagationManager.java | 1 - .../eth/sync/backwardsync/SyncStepStep.java | 1 - .../ethtaskutils/AbstractMessageTaskTest.java | 4 +- .../ethtaskutils/RetryingMessageTaskTest.java | 10 +- .../RetryingSwitchingPeerMessageTaskTest.java | 162 ++++++++++++++++++ .../RetryingGetBlockFromPeersTaskTest.java | 72 ++++++++ .../AbstractBlockPropagationManagerTest.java | 35 ++++ 11 files changed, 469 insertions(+), 79 deletions(-) create mode 100644 ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingSwitchingPeerMessageTaskTest.java create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTaskTest.java diff --git a/CHANGELOG.md b/CHANGELOG.md index 7a125cb2501..98ae3516e07 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ ### Bug Fixes - Fixes off-by-one error for mainnet TTD fallback [#4223](https://github.com/hyperledger/besu/pull/4223) - Fix off-by-one error in AbstractRetryingPeerTask [#4254](https://github.com/hyperledger/besu/pull/4254) +- Refactor and fix retrying get block switching peer [#4256](https://github.com/hyperledger/besu/pull/4256) - Fix encoding of key (short hex) in eth_getProof [#4261](https://github.com/hyperledger/besu/pull/4261) - Fix for post-merge networks fast-sync [#4224](https://github.com/hyperledger/besu/pull/4224), [#4276](https://github.com/hyperledger/besu/pull/4276) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java index 148c5282511..30033200355 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingPeerTask.java @@ -140,12 +140,13 @@ protected void handleTaskError(final Throwable error) { } protected boolean isRetryableError(final Throwable error) { - final boolean isPeerError = - error instanceof PeerBreachedProtocolException - || error instanceof PeerDisconnectedException - || error instanceof NoAvailablePeersException; + return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerFailure(error)); + } - return error instanceof TimeoutException || (!assignedPeer.isPresent() && isPeerError); + protected boolean isPeerFailure(final Throwable error) { + return error instanceof PeerBreachedProtocolException + || error instanceof PeerDisconnectedException + || error instanceof NoAvailablePeersException; } protected EthContext getEthContext() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java new file mode 100644 index 00000000000..d9724895e66 --- /dev/null +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/AbstractRetryingSwitchingPeerTask.java @@ -0,0 +1,153 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.task; + +import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.traceLambda; + +import org.hyperledger.besu.ethereum.eth.manager.EthContext; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthPeers; +import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; +import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; +import org.hyperledger.besu.plugin.services.MetricsSystem; + +import java.util.HashSet; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public abstract class AbstractRetryingSwitchingPeerTask extends AbstractRetryingPeerTask { + + private static final Logger LOG = + LoggerFactory.getLogger(AbstractRetryingSwitchingPeerTask.class); + + private final Set triedPeers = new HashSet<>(); + private final Set failedPeers = new HashSet<>(); + + protected AbstractRetryingSwitchingPeerTask( + final EthContext ethContext, + final MetricsSystem metricsSystem, + final Predicate isEmptyResponse, + final int maxRetries) { + super(ethContext, maxRetries, isEmptyResponse, metricsSystem); + } + + @Override + public void assignPeer(final EthPeer peer) { + super.assignPeer(peer); + triedPeers.add(peer); + } + + protected abstract CompletableFuture executeTaskOnCurrentPeer(final EthPeer peer); + + @Override + protected CompletableFuture executePeerTask(final Optional assignedPeer) { + + final Optional maybePeer = + assignedPeer + .filter(u -> getRetryCount() == 1) // first try with the assigned peer if present + .map(Optional::of) + .orElseGet(this::selectNextPeer); // otherwise select a new one from the pool + + if (maybePeer.isEmpty()) { + traceLambda( + LOG, + "No peer found to try to execute task at attempt {}, tried peers {}", + this::getRetryCount, + triedPeers::toString); + final var ex = new NoAvailablePeersException(); + return CompletableFuture.failedFuture(ex); + } + + final EthPeer peerToUse = maybePeer.get(); + assignPeer(peerToUse); + + traceLambda( + LOG, + "Trying to execute task on peer {}, attempt {}", + this::getAssignedPeer, + this::getRetryCount); + + return executeTaskOnCurrentPeer(peerToUse) + .thenApply( + peerResult -> { + traceLambda( + LOG, + "Got result {} from peer {}, attempt {}", + peerResult::toString, + peerToUse::toString, + this::getRetryCount); + result.complete(peerResult); + return peerResult; + }); + } + + @Override + protected void handleTaskError(final Throwable error) { + if (isPeerFailure(error)) { + getAssignedPeer().ifPresent(peer -> failedPeers.add(peer)); + } + super.handleTaskError(error); + } + + @Override + protected boolean isRetryableError(final Throwable error) { + return error instanceof TimeoutException || isPeerFailure(error); + } + + private Optional selectNextPeer() { + final Optional maybeNextPeer = remainingPeersToTry().findFirst(); + + if (maybeNextPeer.isEmpty()) { + // tried all the peers, restart from the best one but excluding the failed ones + refreshPeers(); + triedPeers.retainAll(failedPeers); + return remainingPeersToTry().findFirst(); + } + + return maybeNextPeer; + } + + private Stream remainingPeersToTry() { + return getEthContext() + .getEthPeers() + .streamBestPeers() + .filter(peer -> !peer.isDisconnected() && !triedPeers.contains(peer)); + } + + private void refreshPeers() { + final EthPeers peers = getEthContext().getEthPeers(); + // If we are at max connections, then refresh peers disconnecting one of the failed peers, + // or the least useful + if (peers.peerCount() >= peers.getMaxPeers()) { + failedPeers.stream() + .filter(peer -> !peer.isDisconnected()) + .findAny() + .or(() -> peers.streamAvailablePeers().sorted(peers.getBestChainComparator()).findFirst()) + .ifPresent( + peer -> { + debugLambda(LOG, "Refresh peers disconnecting peer {}", peer::toString); + peer.disconnect(DisconnectReason.USELESS_PEER); + }); + } + } +} diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java index 2f2fe3086c6..c1894189065 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTask.java @@ -14,135 +14,101 @@ */ package org.hyperledger.besu.ethereum.eth.manager.task; +import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda; + import org.hyperledger.besu.datatypes.Hash; -import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.core.Block; import org.hyperledger.besu.ethereum.eth.manager.EthContext; import org.hyperledger.besu.ethereum.eth.manager.EthPeer; import org.hyperledger.besu.ethereum.eth.manager.exceptions.IncompleteResultsException; -import org.hyperledger.besu.ethereum.eth.manager.exceptions.NoAvailablePeersException; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule; import org.hyperledger.besu.plugin.services.MetricsSystem; -import java.util.HashSet; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class RetryingGetBlockFromPeersTask - extends AbstractRetryingPeerTask> { + extends AbstractRetryingSwitchingPeerTask> { private static final Logger LOG = LoggerFactory.getLogger(RetryingGetBlockFromPeersTask.class); - private final ProtocolContext protocolContext; private final ProtocolSchedule protocolSchedule; - private final Optional blockHash; + private final Optional maybeBlockHash; private final long blockNumber; - private final Set triedPeers = new HashSet<>(); protected RetryingGetBlockFromPeersTask( - final ProtocolContext protocolContext, final EthContext ethContext, final ProtocolSchedule protocolSchedule, final MetricsSystem metricsSystem, final int maxRetries, - final Optional blockHash, + final Optional maybeBlockHash, final long blockNumber) { - super(ethContext, maxRetries, Objects::isNull, metricsSystem); - this.protocolContext = protocolContext; + super(ethContext, metricsSystem, Objects::isNull, maxRetries); this.protocolSchedule = protocolSchedule; - this.blockHash = blockHash; + this.maybeBlockHash = maybeBlockHash; this.blockNumber = blockNumber; } public static RetryingGetBlockFromPeersTask create( - final ProtocolContext protocolContext, final ProtocolSchedule protocolSchedule, final EthContext ethContext, final MetricsSystem metricsSystem, final int maxRetries, - final Optional hash, + final Optional maybeHash, final long blockNumber) { return new RetryingGetBlockFromPeersTask( - protocolContext, - ethContext, - protocolSchedule, - metricsSystem, - maxRetries, - hash, - blockNumber); - } - - @Override - public void assignPeer(final EthPeer peer) { - super.assignPeer(peer); - triedPeers.add(peer); + ethContext, protocolSchedule, metricsSystem, maxRetries, maybeHash, blockNumber); } @Override - protected CompletableFuture> executePeerTask( - final Optional assignedPeer) { - + protected CompletableFuture> executeTaskOnCurrentPeer( + final EthPeer currentPeer) { final GetBlockFromPeerTask getBlockTask = GetBlockFromPeerTask.create( - protocolSchedule, getEthContext(), blockHash, blockNumber, getMetricsSystem()); - - getBlockTask.assignPeer( - assignedPeer - .filter(unused -> getRetryCount() == 1) // first try with the assigned preferred peer - .orElseGet( // then selecting a new one from the pool - () -> { - assignPeer(selectNextPeer()); - return getAssignedPeer().get(); - })); - - LOG.debug( - "Getting block {} ({}) from peer {}, attempt {}", - blockNumber, - blockHash, - getAssignedPeer(), - getRetryCount()); + protocolSchedule, getEthContext(), maybeBlockHash, blockNumber, getMetricsSystem()); + getBlockTask.assignPeer(currentPeer); return executeSubTask(getBlockTask::run) .thenApply( peerResult -> { + debugLambda( + LOG, + "Got block {} from peer {}, attempt {}", + peerResult.getResult()::toLogString, + peerResult.getPeer()::toString, + this::getRetryCount); result.complete(peerResult); return peerResult; }); } - private EthPeer selectNextPeer() { - return getEthContext() - .getEthPeers() - .streamBestPeers() - .filter(peer -> !triedPeers.contains(peer)) - .findFirst() - .orElseThrow(NoAvailablePeersException::new); - } - @Override protected boolean isRetryableError(final Throwable error) { - return (blockNumber > protocolContext.getBlockchain().getChainHeadBlockNumber()) - && (super.isRetryableError(error) || error instanceof IncompleteResultsException); + return super.isRetryableError(error) || error instanceof IncompleteResultsException; } @Override protected void handleTaskError(final Throwable error) { if (getRetryCount() < getMaxRetries()) { - LOG.debug( - "Failed to get block {} ({}) from peer {}, attempt {}, retrying later", - blockNumber, - blockHash, - getAssignedPeer(), - getRetryCount()); + debugLambda( + LOG, + "Failed to get block {} from peer {}, attempt {}, retrying later", + this::logBlockNumberMaybeHash, + this::getAssignedPeer, + this::getRetryCount); } else { LOG.warn( - "Failed to get block {} ({}) after {} retries", blockNumber, blockHash, getRetryCount()); + "Failed to get block {} after {} retries", logBlockNumberMaybeHash(), getRetryCount()); } super.handleTaskError(error); } + + private String logBlockNumberMaybeHash() { + return blockNumber + maybeBlockHash.map(h -> " (" + h.toHexString() + ")").orElse(""); + } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java index e76b710f9d7..aa72844f6d4 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/BlockPropagationManager.java @@ -400,7 +400,6 @@ private CompletableFuture getBlockFromPeers( final Optional blockHash) { final RetryingGetBlockFromPeersTask getBlockTask = RetryingGetBlockFromPeersTask.create( - protocolContext, protocolSchedule, ethContext, metricsSystem, diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java index d1dae073396..88ddc18b2c6 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/backwardsync/SyncStepStep.java @@ -50,7 +50,6 @@ public CompletableFuture executeAsync(final Hash hash) { private CompletableFuture requestBlock(final Hash targetHash) { final RetryingGetBlockFromPeersTask getBlockTask = RetryingGetBlockFromPeersTask.create( - context.getProtocolContext(), context.getProtocolSchedule(), context.getEthContext(), context.getMetricsSystem(), diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java index 81a4cb1c689..d1b787e3013 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/AbstractMessageTaskTest.java @@ -58,6 +58,7 @@ * @param The type of data returned from the network */ public abstract class AbstractMessageTaskTest { + protected static final int MAX_PEERS = 5; protected static Blockchain blockchain; protected static ProtocolSchedule protocolSchedule; protected static ProtocolContext protocolContext; @@ -77,7 +78,6 @@ public static void setup() { blockchain = blockchainSetupUtil.getBlockchain(); protocolSchedule = blockchainSetupUtil.getProtocolSchedule(); protocolContext = blockchainSetupUtil.getProtocolContext(); - assertThat(blockchainSetupUtil.getMaxBlockNumber()).isGreaterThanOrEqualTo(20L); } @@ -91,7 +91,7 @@ public void setupTest() { EthProtocol.NAME, TestClock.fixed(), metricsSystem, - 25, + MAX_PEERS, EthProtocolConfiguration.DEFAULT_MAX_MESSAGE_SIZE)); final EthMessages ethMessages = new EthMessages(); final EthScheduler ethScheduler = diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java index 5f383d66fd8..b69e316398e 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingMessageTaskTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import org.junit.Before; import org.junit.Test; /** @@ -34,11 +35,12 @@ * @param The type of data being requested from the network */ public abstract class RetryingMessageTaskTest extends AbstractMessageTaskTest { + protected static final int DEFAULT_MAX_RETRIES = 4; + protected int maxRetries; - protected final int maxRetries; - - protected RetryingMessageTaskTest() { - this.maxRetries = 4; + @Before + public void resetMaxRetries() { + this.maxRetries = DEFAULT_MAX_RETRIES; } @Override diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingSwitchingPeerMessageTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingSwitchingPeerMessageTaskTest.java new file mode 100644 index 00000000000..e7c953f0935 --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/ethtaskutils/RetryingSwitchingPeerMessageTaskTest.java @@ -0,0 +1,162 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.ethtaskutils; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManagerTestUtil; +import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; +import org.hyperledger.besu.ethereum.eth.manager.exceptions.MaxRetriesReachedException; +import org.hyperledger.besu.ethereum.eth.manager.task.EthTask; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import org.junit.Test; + +/** + * Tests ethTasks that request data from the network, and retry until all of the data is received. + * + * @param The type of data being requested from the network + */ +public abstract class RetryingSwitchingPeerMessageTaskTest extends RetryingMessageTaskTest { + protected Optional responsivePeer = Optional.empty(); + + @Override + protected void assertResultMatchesExpectation( + final T requestedData, final T response, final EthPeer respondingPeer) { + assertThat(response).isEqualTo(requestedData); + responsivePeer.ifPresent(rp -> assertThat(rp).isEqualByComparingTo(respondingPeer)); + } + + @Test + public void completesWhenBestPeerEmptyAndSecondPeerIsResponsive() + throws ExecutionException, InterruptedException { + // Setup first unresponsive peer + final RespondingEthPeer firstPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10); + + // Setup second responsive peer + final RespondingEthPeer secondPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 9); + + // Execute task and wait for response + final T requestedData = generateDataToBeRequested(); + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + + // First peer is not responsive + firstPeer.respond(RespondingEthPeer.emptyResponder()); + // Second peer is responsive + secondPeer.respondTimes( + RespondingEthPeer.blockchainResponder( + blockchain, protocolContext.getWorldStateArchive(), transactionPool), + 2); + + responsivePeer = Optional.of(secondPeer.getEthPeer()); + + assertThat(future.isDone()).isTrue(); + assertResultMatchesExpectation(requestedData, future.get(), secondPeer.getEthPeer()); + } + + @Test + public void completesWhenBestPeerTimeoutsAndSecondPeerIsResponsive() + throws ExecutionException, InterruptedException { + // Setup first unresponsive peer + final RespondingEthPeer firstPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10); + + // Setup second responsive peer + final RespondingEthPeer secondPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 9); + + // Execute task and wait for response + final T requestedData = generateDataToBeRequested(); + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + + // First peer timeouts + peerCountToTimeout.set(1); + firstPeer.respondTimes( + RespondingEthPeer.blockchainResponder( + blockchain, protocolContext.getWorldStateArchive(), transactionPool), + 2); + // Second peer is responsive + secondPeer.respondTimes( + RespondingEthPeer.blockchainResponder( + blockchain, protocolContext.getWorldStateArchive(), transactionPool), + 2); + + responsivePeer = Optional.of(secondPeer.getEthPeer()); + + assertThat(future.isDone()).isTrue(); + assertResultMatchesExpectation(requestedData, future.get(), secondPeer.getEthPeer()); + } + + @Test + public void failsWhenAllPeersFail() { + // Setup first unresponsive peer + final RespondingEthPeer firstPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 10); + + // Setup second unresponsive peer + final RespondingEthPeer secondPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 9); + + // Execute task and wait for response + final T requestedData = generateDataToBeRequested(); + final EthTask task = createTask(requestedData); + final CompletableFuture future = task.run(); + + for (int i = 0; i < maxRetries && !future.isDone(); i++) { + // First peer is unresponsive + firstPeer.respondWhile(RespondingEthPeer.emptyResponder(), firstPeer::hasOutstandingRequests); + // Second peer is unresponsive + secondPeer.respondWhile( + RespondingEthPeer.emptyResponder(), secondPeer::hasOutstandingRequests); + } + + assertThat(future.isDone()).isTrue(); + assertThat(future.isCompletedExceptionally()).isTrue(); + assertThatThrownBy(future::get).hasCauseInstanceOf(MaxRetriesReachedException.class); + } + + @Test + public void disconnectAPeerWhenAllPeersTried() { + maxRetries = MAX_PEERS + 1; + final int numPeers = MAX_PEERS; + final List respondingPeers = new ArrayList<>(numPeers); + for (int i = 0; i < numPeers; i++) { + respondingPeers.add(EthProtocolManagerTestUtil.createPeer(ethProtocolManager, numPeers - i)); + } + + // Execute task and wait for response + final T requestedData = generateDataToBeRequested(); + final EthTask task = createTask(requestedData); + task.run(); + + respondingPeers.forEach( + respondingPeer -> + respondingPeer.respondWhile( + RespondingEthPeer.emptyResponder(), respondingPeer::hasOutstandingRequests)); + + assertThat(respondingPeers.get(numPeers - 1).getEthPeer().isDisconnected()).isTrue(); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTaskTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTaskTest.java new file mode 100644 index 00000000000..9e900287b1f --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/RetryingGetBlockFromPeersTaskTest.java @@ -0,0 +1,72 @@ +/* + * Copyright contributors to Hyperledger Besu + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.task; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.hyperledger.besu.ethereum.core.BlockHeader.GENESIS_BLOCK_NUMBER; +import static org.mockito.Mockito.mock; + +import org.hyperledger.besu.ethereum.core.Block; +import org.hyperledger.besu.ethereum.eth.manager.EthPeer; +import org.hyperledger.besu.ethereum.eth.manager.ethtaskutils.RetryingSwitchingPeerMessageTaskTest; +import org.hyperledger.besu.ethereum.eth.manager.task.AbstractPeerTask.PeerTaskResult; + +import java.util.Optional; +import java.util.concurrent.ExecutionException; + +import org.junit.Ignore; +import org.junit.Test; + +public class RetryingGetBlockFromPeersTaskTest + extends RetryingSwitchingPeerMessageTaskTest> { + + @Override + protected void assertResultMatchesExpectation( + final PeerTaskResult requestedData, + final PeerTaskResult response, + final EthPeer respondingPeer) { + assertThat(response.getResult()).isEqualTo(requestedData.getResult()); + } + + @Override + protected PeerTaskResult generateDataToBeRequested() { + final Block block = blockchain.getBlockByNumber(10).get(); + return new PeerTaskResult<>(mock(EthPeer.class), block); + } + + @Override + protected RetryingGetBlockFromPeersTask createTask(final PeerTaskResult requestedData) { + return RetryingGetBlockFromPeersTask.create( + protocolSchedule, + ethContext, + metricsSystem, + maxRetries, + Optional.of(requestedData.getResult().getHash()), + GENESIS_BLOCK_NUMBER); + } + + @Test + @Override + @Ignore("GetBlock could not return partial response") + public void failsWhenPeerReturnsPartialResultThenStops() {} + + @Override + @Test + @Ignore("GetBlock could not return partial response") + public void completesWhenPeerReturnsPartialResult() + throws ExecutionException, InterruptedException { + super.completesWhenPeerReturnsPartialResult(); + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java index ac6aa6550ad..3f15a68cd99 100644 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/sync/AbstractBlockPropagationManagerTest.java @@ -932,5 +932,40 @@ public void shouldNotListenToBlockAddedEventsWhenTTDReachedAndFinal() { verifyNoInteractions(pendingBlocksManager); } + @Test + public void shouldRequestBlockAgainIfFirstGetBlockFails() { + blockchainUtil.importFirstBlocks(2); + final Block nextBlock = blockchainUtil.getBlock(2); + + // Sanity check + assertThat(blockchain.contains(nextBlock.getHash())).isFalse(); + + blockPropagationManager.start(); + + final RespondingEthPeer firstPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); + final NewBlockHashesMessage nextAnnouncement = + NewBlockHashesMessage.create( + Collections.singletonList( + new NewBlockHashesMessage.NewBlockHash( + nextBlock.getHash(), nextBlock.getHeader().getNumber()))); + + // Broadcast message and peer fail to respond + EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, firstPeer, nextAnnouncement); + firstPeer.respondWhile(RespondingEthPeer.emptyResponder(), firstPeer::hasOutstandingRequests); + + assertThat(blockchain.contains(nextBlock.getHash())).isFalse(); + + // Re-broadcast the previous message and peer responds + final RespondingEthPeer secondPeer = + EthProtocolManagerTestUtil.createPeer(ethProtocolManager, 0); + EthProtocolManagerTestUtil.broadcastMessage(ethProtocolManager, secondPeer, nextAnnouncement); + final Responder goodResponder = RespondingEthPeer.blockchainResponder(getFullBlockchain()); + + secondPeer.respondWhile(goodResponder, secondPeer::hasOutstandingRequests); + + assertThat(blockchain.contains(nextBlock.getHash())).isTrue(); + } + public abstract Blockchain getFullBlockchain(); }