From 0e1b41f25593a67d057ad3acf7811a632ccca04e Mon Sep 17 00:00:00 2001 From: garyschulte Date: Wed, 28 Feb 2024 13:12:43 -0800 Subject: [PATCH 01/16] squash of snap server commits Signed-off-by: garyschulte --- .../controller/BesuControllerBuilder.java | 15 +- ethereum/core/build.gradle | 1 + .../proof/WorldStateProofProvider.java | 30 +- .../NoOpBonsaiCachedWorldStorageManager.java | 2 +- .../BonsaiWorldStateKeyValueStorage.java | 4 +- .../common/DiffBasedWorldStateProvider.java | 2 +- .../DiffBasedCachedWorldStorageManager.java | 59 +- .../DiffBasedWorldStateKeyValueStorage.java | 26 +- .../common/storage/flat/FlatDbStrategy.java | 97 ++- .../worldstate/DataStorageConfiguration.java | 12 + .../worldstate/WorldStateArchive.java | 3 - .../bonsai/BonsaiSnapshotIsolationTests.java | 8 +- .../eth/EthProtocolConfiguration.java | 2 +- .../eth/manager/snap/SnapProtocolManager.java | 8 +- .../ethereum/eth/manager/snap/SnapServer.java | 580 ++++++++++++++++- .../messages/snap/GetAccountRangeMessage.java | 3 + .../eth/sync/DefaultSynchronizer.java | 9 + .../ethereum/eth/sync/snapsync/StackTrie.java | 65 +- .../request/AccountRangeDataRequest.java | 6 + .../request/StorageRangeDataRequest.java | 7 + ...torageFlatDatabaseHealingRangeRequest.java | 2 +- .../eth/manager/snap/SnapServerTest.java | 604 ++++++++++++++++++ .../task/SnapProtocolManagerTestUtil.java | 61 -- .../besu/ethereum/trie/CompactEncoding.java | 32 + .../kvstore/InMemoryKeyValueStorage.java | 10 +- .../kvstore/LayeredKeyValueStorage.java | 123 +++- .../SegmentedInMemoryKeyValueStorage.java | 52 +- 27 files changed, 1594 insertions(+), 229 deletions(-) create mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServerTest.java delete mode 100644 ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/SnapProtocolManagerTestUtil.java diff --git a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java index d70590c37d5..8eb7138c5a9 100644 --- a/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/controller/BesuControllerBuilder.java @@ -652,9 +652,6 @@ public BesuController build() { peerValidators, Optional.empty()); - final Optional maybeSnapProtocolManager = - createSnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive); - final PivotBlockSelector pivotBlockSelector = createPivotSelector( protocolSchedule, protocolContext, ethContext, syncState, metricsSystem); @@ -671,6 +668,10 @@ public BesuController build() { protocolContext.setSynchronizer(Optional.of(synchronizer)); + final Optional maybeSnapProtocolManager = + createSnapProtocolManager( + protocolContext, worldStateStorageCoordinator, peerValidators, ethPeers, snapMessages); + final MiningCoordinator miningCoordinator = createMiningCoordinator( protocolSchedule, @@ -986,12 +987,14 @@ protected ProtocolContext createProtocolContext( } private Optional createSnapProtocolManager( + final ProtocolContext protocolContext, + final WorldStateStorageCoordinator worldStateStorageCoordinator, final List peerValidators, final EthPeers ethPeers, - final EthMessages snapMessages, - final WorldStateArchive worldStateArchive) { + final EthMessages snapMessages) { return Optional.of( - new SnapProtocolManager(peerValidators, ethPeers, snapMessages, worldStateArchive)); + new SnapProtocolManager( + worldStateStorageCoordinator, peerValidators, ethPeers, snapMessages, protocolContext)); } WorldStateArchive createWorldStateArchive( diff --git a/ethereum/core/build.gradle b/ethereum/core/build.gradle index fa1b462344b..4abde77acee 100644 --- a/ethereum/core/build.gradle +++ b/ethereum/core/build.gradle @@ -48,6 +48,7 @@ dependencies { implementation 'com.fasterxml.jackson.core:jackson-databind' implementation 'com.google.guava:guava' + implementation 'com.github.ben-manes.caffeine:caffeine' implementation 'com.google.dagger:dagger' implementation 'org.apache.maven:maven-artifact' annotationProcessor 'com.google.dagger:dagger-compiler' diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/proof/WorldStateProofProvider.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/proof/WorldStateProofProvider.java index 0258ce1034a..9bd7f55090d 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/proof/WorldStateProofProvider.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/proof/WorldStateProofProvider.java @@ -28,6 +28,7 @@ import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue; import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,6 +42,8 @@ import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; import org.apache.tuweni.units.bigints.UInt256; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * The WorldStateProofProvider class is responsible for providing proofs for world state entries. It @@ -49,6 +52,7 @@ public class WorldStateProofProvider { private final WorldStateStorageCoordinator worldStateStorageCoordinator; + private static final Logger LOG = LoggerFactory.getLogger(WorldStateProofProvider.class); public WorldStateProofProvider(final WorldStateStorageCoordinator worldStateStorageCoordinator) { this.worldStateStorageCoordinator = worldStateStorageCoordinator; @@ -85,7 +89,8 @@ private SortedMap> getStorageProofs( final List accountStorageKeys) { final MerkleTrie storageTrie = newAccountStorageTrie(accountHash, account.getStorageRoot()); - final NavigableMap> storageProofs = new TreeMap<>(); + final NavigableMap> storageProofs = + new TreeMap<>(Comparator.comparing(Bytes32::toHexString)); accountStorageKeys.forEach( key -> storageProofs.put(key, storageTrie.getValueWithProof(Hash.hash(key)))); return storageProofs; @@ -153,19 +158,26 @@ public boolean isValidRangeProof( final SortedMap keys) { // check if it's monotonic increasing - if (!Ordering.natural().isOrdered(keys.keySet())) { + if (keys.size() > 1 && !Ordering.natural().isOrdered(keys.keySet())) { return false; } - // when proof is empty we need to have all the keys to reconstruct the trie + // when proof is empty and we requested the full range, we should + // have all the keys to reconstruct the trie if (proofs.isEmpty()) { - final MerkleTrie trie = new SimpleMerklePatriciaTrie<>(Function.identity()); - // add the received keys in the trie - for (Map.Entry key : keys.entrySet()) { - trie.put(key.getKey(), key.getValue()); + if (startKeyHash.equals(Bytes32.ZERO)) { + final MerkleTrie trie = new SimpleMerklePatriciaTrie<>(Function.identity()); + // add the received keys in the trie + for (Map.Entry key : keys.entrySet()) { + trie.put(key.getKey(), key.getValue()); + } + return rootHash.equals(trie.getRootHash()); + } else { + // TODO: possibly accept a node loader so we can verify this with already + // completed partial storage requests + LOG.info("failing proof due to incomplete range without proofs"); + return false; } - - return rootHash.equals(trie.getRootHash()); } // reconstruct a part of the trie with the proof diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/cache/NoOpBonsaiCachedWorldStorageManager.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/cache/NoOpBonsaiCachedWorldStorageManager.java index 0e4ee9cbffd..487456e9d44 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/cache/NoOpBonsaiCachedWorldStorageManager.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/cache/NoOpBonsaiCachedWorldStorageManager.java @@ -38,7 +38,7 @@ public synchronized void addCachedLayer( } @Override - public boolean containWorldStateStorage(final Hash blockHash) { + public boolean contains(final Hash blockHash) { return false; } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/BonsaiWorldStateKeyValueStorage.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/BonsaiWorldStateKeyValueStorage.java index a58316fc62c..3f1a87b1348 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/BonsaiWorldStateKeyValueStorage.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/storage/BonsaiWorldStateKeyValueStorage.java @@ -127,9 +127,7 @@ public Optional getAccountStorageTrieNode( } public Optional getTrieNodeUnsafe(final Bytes key) { - return composedWorldStateStorage - .get(TRIE_BRANCH_STORAGE, Bytes.concatenate(key).toArrayUnsafe()) - .map(Bytes::wrap); + return composedWorldStateStorage.get(TRIE_BRANCH_STORAGE, key.toArrayUnsafe()).map(Bytes::wrap); } public Optional getStorageValueByStorageSlotKey( diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/DiffBasedWorldStateProvider.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/DiffBasedWorldStateProvider.java index bda86f7dfac..f985aec7b5d 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/DiffBasedWorldStateProvider.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/DiffBasedWorldStateProvider.java @@ -117,7 +117,7 @@ public Optional get(final Hash rootHash, final Hash blockHash) { @Override public boolean isWorldStateAvailable(final Hash rootHash, final Hash blockHash) { - return cachedWorldStorageManager.containWorldStateStorage(blockHash) + return cachedWorldStorageManager.contains(blockHash) || persistedState.blockHash().equals(blockHash) || worldStateKeyValueStorage.isWorldStateAvailable(rootHash, blockHash); } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/cache/DiffBasedCachedWorldStorageManager.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/cache/DiffBasedCachedWorldStorageManager.java index 287213aa57a..3535cea5d8a 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/cache/DiffBasedCachedWorldStorageManager.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/cache/DiffBasedCachedWorldStorageManager.java @@ -15,7 +15,9 @@ package org.hyperledger.besu.ethereum.trie.diffbased.common.cache; import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.core.BlockHeader; +import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.worldview.BonsaiWorldState; import org.hyperledger.besu.ethereum.trie.diffbased.common.DiffBasedWorldStateProvider; import org.hyperledger.besu.ethereum.trie.diffbased.common.StorageSubscriber; import org.hyperledger.besu.ethereum.trie.diffbased.common.storage.DiffBasedLayeredWorldStateKeyValueStorage; @@ -29,8 +31,11 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import java.util.function.Function; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import org.apache.tuweni.bytes.Bytes32; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,6 +46,11 @@ public abstract class DiffBasedCachedWorldStorageManager implements StorageSubsc LoggerFactory.getLogger(DiffBasedCachedWorldStorageManager.class); private final DiffBasedWorldStateProvider archive; private final EvmConfiguration evmConfiguration; + private final Cache stateRootToBlockHeaderCache = + Caffeine.newBuilder() + .maximumSize(RETAINED_LAYERS) + .expireAfterWrite(100, TimeUnit.MINUTES) + .build(); private final DiffBasedWorldStateKeyValueStorage rootWorldStateStorage; private final Map cachedWorldStatesByHash; @@ -104,6 +114,8 @@ public synchronized void addCachedLayer( ((DiffBasedLayeredWorldStateKeyValueStorage) forWorldState.getWorldStateStorage()) .clone())); } + // add stateroot -> blockHeader cache entry + stateRootToBlockHeaderCache.put(blockHeader.getStateRoot(), blockHeader); } scrubCachedLayers(blockHeader.getNumber()); } @@ -192,7 +204,7 @@ public Optional getHeadWorldState( }); } - public boolean containWorldStateStorage(final Hash blockHash) { + public boolean contains(final Hash blockHash) { return cachedWorldStatesByHash.containsKey(blockHash); } @@ -200,6 +212,51 @@ public void reset() { this.cachedWorldStatesByHash.clear(); } + public void primeRootToBlockHashCache(final Blockchain blockchain, final int numEntries) { + // prime the stateroot-to-blockhash cache + long head = blockchain.getChainHeadHeader().getNumber(); + for (long i = head; i > Math.max(0, head - numEntries); i--) { + blockchain + .getBlockHeader(i) + .ifPresent(header -> stateRootToBlockHeaderCache.put(header.getStateRoot(), header)); + } + } + + /** + * Returns the worldstate for the supplied root hash, or the head worldstate if no root hash is + * supplied. synchronized to prevent concurrent adds to the cache of the same root hash. + * + * @param rootHash optional rootHash to supply worldstate storage for + * @return Optional worldstate storage + */ + public synchronized Optional getStorageByRootHash( + final Optional rootHash) { + if (rootHash.isPresent()) { + // if we supplied a hash, return the worldstate for that hash if it is available: + return rootHash + .map(stateRootToBlockHeaderCache::getIfPresent) + .flatMap( + header -> + Optional.ofNullable(cachedWorldStatesByHash.get(header.getHash())) + .map(DiffBasedCachedWorldView::getWorldStateStorage) + .or( + () -> { + // if not cached already, maybe fetch and cache this worldstate + var maybeWorldState = + archive.getMutable(header, false).map(BonsaiWorldState.class::cast); + maybeWorldState.ifPresent( + ws -> addCachedLayer(header, header.getStateRoot(), ws)); + return maybeWorldState.map(BonsaiWorldState::getWorldStateStorage); + })); + } else { + // if we did not supply a hash, return the head worldstate from cachedWorldStates + return rootWorldStateStorage + .getWorldStateBlockHash() + .map(cachedWorldStatesByHash::get) + .map(DiffBasedCachedWorldView::getWorldStateStorage); + } + } + @Override public void onClearStorage() { this.cachedWorldStatesByHash.clear(); diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/DiffBasedWorldStateKeyValueStorage.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/DiffBasedWorldStateKeyValueStorage.java index 2424828ee60..33afbe5d4c9 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/DiffBasedWorldStateKeyValueStorage.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/DiffBasedWorldStateKeyValueStorage.java @@ -35,11 +35,13 @@ import java.nio.charset.StandardCharsets; import java.util.List; -import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Predicate; import java.util.stream.Stream; +import kotlin.Pair; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; import org.slf4j.Logger; @@ -120,19 +122,37 @@ public Optional getWorldStateBlockHash() { .map(Hash::wrap); } - public Map streamFlatAccounts( + public NavigableMap streamFlatAccounts( final Bytes startKeyHash, final Bytes32 endKeyHash, final long max) { return getFlatDbStrategy() .streamAccountFlatDatabase(composedWorldStateStorage, startKeyHash, endKeyHash, max); } - public Map streamFlatStorages( + public NavigableMap streamFlatAccounts( + final Bytes startKeyHash, + final Bytes32 endKeyHash, + final Predicate> takeWhile) { + return getFlatDbStrategy() + .streamAccountFlatDatabase(composedWorldStateStorage, startKeyHash, endKeyHash, takeWhile); + } + + public NavigableMap streamFlatStorages( final Hash accountHash, final Bytes startKeyHash, final Bytes32 endKeyHash, final long max) { return getFlatDbStrategy() .streamStorageFlatDatabase( composedWorldStateStorage, accountHash, startKeyHash, endKeyHash, max); } + public NavigableMap streamFlatStorages( + final Hash accountHash, + final Bytes startKeyHash, + final Bytes32 endKeyHash, + final Predicate> takeWhile) { + return getFlatDbStrategy() + .streamStorageFlatDatabase( + composedWorldStateStorage, accountHash, startKeyHash, endKeyHash, takeWhile); + } + public boolean isWorldStateAvailable(final Bytes32 rootHash, final Hash blockHash) { return composedWorldStateStorage .get(TRIE_BRANCH_STORAGE, WORLD_ROOT_HASH_KEY) diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategy.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategy.java index 37c2877b167..484391560c6 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategy.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/trie/diffbased/common/storage/flat/FlatDbStrategy.java @@ -28,9 +28,12 @@ import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorageTransaction; -import java.util.Map; +import java.util.Comparator; +import java.util.NavigableMap; import java.util.Optional; import java.util.TreeMap; +import java.util.function.Function; +import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -46,7 +49,6 @@ * data, and storage data from the corresponding KeyValueStorage. */ public abstract class FlatDbStrategy { - protected final MetricsSystem metricsSystem; protected final Counter getAccountCounter; protected final Counter getAccountFoundInFlatDatabaseCounter; @@ -190,47 +192,86 @@ public void resetOnResync(final SegmentedKeyValueStorage storage) { storage.clear(ACCOUNT_STORAGE_STORAGE); } - public Map streamAccountFlatDatabase( + public NavigableMap streamAccountFlatDatabase( final SegmentedKeyValueStorage storage, final Bytes startKeyHash, final Bytes32 endKeyHash, final long max) { - final Stream> pairStream = - storage - .streamFromKey( - ACCOUNT_INFO_STATE, startKeyHash.toArrayUnsafe(), endKeyHash.toArrayUnsafe()) - .limit(max) - .map(pair -> new Pair<>(Bytes32.wrap(pair.getKey()), Bytes.wrap(pair.getValue()))); - final TreeMap collected = - pairStream.collect( - Collectors.toMap(Pair::getFirst, Pair::getSecond, (v1, v2) -> v1, TreeMap::new)); - pairStream.close(); - return collected; + return toNavigableMap(accountsToPairStream(storage, startKeyHash, endKeyHash).limit(max)); } - public Map streamStorageFlatDatabase( + public NavigableMap streamAccountFlatDatabase( + final SegmentedKeyValueStorage storage, + final Bytes startKeyHash, + final Bytes32 endKeyHash, + final Predicate> takeWhile) { + + return toNavigableMap( + accountsToPairStream(storage, startKeyHash, endKeyHash).takeWhile(takeWhile)); + } + + /** streams RLP encoded storage values using a specified stream limit. */ + public NavigableMap streamStorageFlatDatabase( final SegmentedKeyValueStorage storage, final Hash accountHash, final Bytes startKeyHash, final Bytes32 endKeyHash, final long max) { - final Stream> pairStream = - storage - .streamFromKey( - ACCOUNT_STORAGE_STORAGE, - Bytes.concatenate(accountHash, startKeyHash).toArrayUnsafe(), - Bytes.concatenate(accountHash, endKeyHash).toArrayUnsafe()) - .limit(max) - .map( - pair -> - new Pair<>( - Bytes32.wrap(Bytes.wrap(pair.getKey()).slice(Hash.SIZE)), - RLP.encodeValue(Bytes.wrap(pair.getValue()).trimLeadingZeros()))); + return toNavigableMap( + storageToPairStream(storage, accountHash, startKeyHash, endKeyHash, RLP::encodeValue) + .limit(max)); + } + + /** streams raw storage Bytes using a specified predicate filter and value mapper. */ + public NavigableMap streamStorageFlatDatabase( + final SegmentedKeyValueStorage storage, + final Hash accountHash, + final Bytes startKeyHash, + final Bytes32 endKeyHash, + final Predicate> takeWhile) { + + return toNavigableMap( + storageToPairStream(storage, accountHash, startKeyHash, endKeyHash, RLP::encodeValue) + .takeWhile(takeWhile)); + } + + private static Stream> storageToPairStream( + final SegmentedKeyValueStorage storage, + final Hash accountHash, + final Bytes startKeyHash, + final Bytes32 endKeyHash, + final Function valueMapper) { + + return storage + .streamFromKey( + ACCOUNT_STORAGE_STORAGE, + Bytes.concatenate(accountHash, startKeyHash).toArrayUnsafe(), + Bytes.concatenate(accountHash, endKeyHash).toArrayUnsafe()) + .map( + pair -> + new Pair<>( + Bytes32.wrap(Bytes.wrap(pair.getKey()).slice(Hash.SIZE)), + valueMapper.apply(Bytes.wrap(pair.getValue()).trimLeadingZeros()))); + } + + private static Stream> accountsToPairStream( + final SegmentedKeyValueStorage storage, final Bytes startKeyHash, final Bytes32 endKeyHash) { + return storage + .streamFromKey(ACCOUNT_INFO_STATE, startKeyHash.toArrayUnsafe(), endKeyHash.toArrayUnsafe()) + .map(pair -> new Pair<>(Bytes32.wrap(pair.getKey()), Bytes.wrap(pair.getValue()))); + } + + private static NavigableMap toNavigableMap( + final Stream> pairStream) { final TreeMap collected = pairStream.collect( - Collectors.toMap(Pair::getFirst, Pair::getSecond, (v1, v2) -> v1, TreeMap::new)); + Collectors.toMap( + Pair::getFirst, + Pair::getSecond, + (v1, v2) -> v1, + () -> new TreeMap<>(Comparator.comparing(Bytes::toHexString)))); pairStream.close(); return collected; } diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/DataStorageConfiguration.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/DataStorageConfiguration.java index 7ad5362aeff..6951494651d 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/DataStorageConfiguration.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/DataStorageConfiguration.java @@ -34,6 +34,13 @@ public interface DataStorageConfiguration { .unstable(Unstable.DEFAULT) .build(); + DataStorageConfiguration BONSAI_CODE_BY_HASH_CONFIG = + ImmutableDataStorageConfiguration.builder() + .dataStorageFormat(DataStorageFormat.BONSAI) + .bonsaiMaxLayersToLoad(DEFAULT_BONSAI_MAX_LAYERS_TO_LOAD) + .unstable(Unstable.CODE_BY_CODE_HASH) + .build(); + DataStorageConfiguration DEFAULT_BONSAI_CONFIG = ImmutableDataStorageConfiguration.builder() .dataStorageFormat(DataStorageFormat.BONSAI) @@ -72,6 +79,11 @@ interface Unstable { DataStorageConfiguration.Unstable DEFAULT = ImmutableDataStorageConfiguration.Unstable.builder().build(); + DataStorageConfiguration.Unstable CODE_BY_CODE_HASH = + ImmutableDataStorageConfiguration.Unstable.builder() + .bonsaiCodeStoredByCodeHashEnabled(true) + .build(); + @Value.Default default boolean getBonsaiLimitTrieLogsEnabled() { return DEFAULT_BONSAI_LIMIT_TRIE_LOGS_ENABLED; diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateArchive.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateArchive.java index 1816063243e..89b36917d85 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateArchive.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateArchive.java @@ -19,7 +19,6 @@ import org.hyperledger.besu.ethereum.core.BlockHeader; import org.hyperledger.besu.ethereum.core.MutableWorldState; import org.hyperledger.besu.ethereum.proof.WorldStateProof; -import org.hyperledger.besu.ethereum.trie.MerkleTrie; import org.hyperledger.besu.evm.worldstate.WorldState; import java.io.Closeable; @@ -31,8 +30,6 @@ import org.apache.tuweni.units.bigints.UInt256; public interface WorldStateArchive extends Closeable { - Hash EMPTY_ROOT_HASH = Hash.wrap(MerkleTrie.EMPTY_TRIE_NODE_HASH); - Optional get(Hash rootHash, Hash blockHash); boolean isWorldStateAvailable(Hash rootHash, Hash blockHash); diff --git a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/BonsaiSnapshotIsolationTests.java b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/BonsaiSnapshotIsolationTests.java index 507047604bc..c786a609ce6 100644 --- a/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/BonsaiSnapshotIsolationTests.java +++ b/ethereum/core/src/test/java/org/hyperledger/besu/ethereum/trie/diffbased/bonsai/BonsaiSnapshotIsolationTests.java @@ -61,12 +61,8 @@ public void testIsolatedFromHead_behindHead() { assertThat(res.isSuccessful()).isTrue(); assertThat(res2.isSuccessful()).isTrue(); - assertThat( - archive.getCachedWorldStorageManager().containWorldStateStorage(firstBlock.getHash())) - .isTrue(); - assertThat( - archive.getCachedWorldStorageManager().containWorldStateStorage(secondBlock.getHash())) - .isTrue(); + assertThat(archive.getCachedWorldStorageManager().contains(firstBlock.getHash())).isTrue(); + assertThat(archive.getCachedWorldStorageManager().contains(secondBlock.getHash())).isTrue(); assertThat(archive.getMutable().get(testAddress)).isNotNull(); assertThat(archive.getMutable().get(testAddress).getBalance()) diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/EthProtocolConfiguration.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/EthProtocolConfiguration.java index 73834831eff..93c69f92da9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/EthProtocolConfiguration.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/EthProtocolConfiguration.java @@ -24,7 +24,7 @@ public class EthProtocolConfiguration { public static final int DEFAULT_MAX_MESSAGE_SIZE = 10 * ByteUnits.MEGABYTE; - public static final int DEFAULT_MAX_GET_BLOCK_HEADERS = 192; + public static final int DEFAULT_MAX_GET_BLOCK_HEADERS = 512; public static final int DEFAULT_MAX_GET_BLOCK_BODIES = 128; public static final int DEFAULT_MAX_GET_RECEIPTS = 256; public static final int DEFAULT_MAX_GET_NODE_DATA = 384; diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java index de2cb280c8c..356f84da4b9 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapProtocolManager.java @@ -14,6 +14,7 @@ */ package org.hyperledger.besu.ethereum.eth.manager.snap; +import org.hyperledger.besu.ethereum.ProtocolContext; import org.hyperledger.besu.ethereum.eth.SnapProtocol; import org.hyperledger.besu.ethereum.eth.manager.EthMessage; import org.hyperledger.besu.ethereum.eth.manager.EthMessages; @@ -29,7 +30,7 @@ import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.messages.DisconnectMessage.DisconnectReason; import org.hyperledger.besu.ethereum.rlp.RLPException; -import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; import java.math.BigInteger; import java.util.Comparator; @@ -49,14 +50,15 @@ public class SnapProtocolManager implements ProtocolManager { private final EthMessages snapMessages; public SnapProtocolManager( + final WorldStateStorageCoordinator worldStateStorageCoordinator, final List peerValidators, final EthPeers ethPeers, final EthMessages snapMessages, - final WorldStateArchive worldStateArchive) { + final ProtocolContext protocolContext) { this.ethPeers = ethPeers; this.snapMessages = snapMessages; this.supportedCapabilities = calculateCapabilities(); - new SnapServer(snapMessages, worldStateArchive); + new SnapServer(snapMessages, worldStateStorageCoordinator, protocolContext); } private List calculateCapabilities() { diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServer.java index 17fb0a246cb..1e5437c64bb 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServer.java @@ -14,66 +14,588 @@ */ package org.hyperledger.besu.ethereum.eth.manager.snap; +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.ethereum.ProtocolContext; +import org.hyperledger.besu.ethereum.chain.Blockchain; import org.hyperledger.besu.ethereum.eth.manager.EthMessages; import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage; import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.GetAccountRangeMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.GetByteCodesMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.GetStorageRangeMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.GetTrieNodesMessage; import org.hyperledger.besu.ethereum.eth.messages.snap.SnapV1; import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage; import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage; +import org.hyperledger.besu.ethereum.eth.sync.DefaultSynchronizer; import org.hyperledger.besu.ethereum.p2p.rlpx.wire.MessageData; -import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; +import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider; +import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput; +import org.hyperledger.besu.ethereum.trie.CompactEncoding; +import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.BonsaiWorldStateProvider; +import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage; +import org.hyperledger.besu.ethereum.trie.diffbased.common.cache.DiffBasedCachedWorldStorageManager; +import org.hyperledger.besu.ethereum.worldstate.FlatDbMode; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; +import org.hyperledger.besu.plugin.services.BesuEvents; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Function; +import java.util.function.Predicate; +import java.util.stream.Collectors; +import com.google.common.annotations.VisibleForTesting; +import kotlin.Pair; import kotlin.collections.ArrayDeque; +import org.apache.commons.lang3.time.StopWatch; +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; +import org.apache.tuweni.units.bigints.UInt256; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** See https://github.com/ethereum/devp2p/blob/master/caps/snap.md */ @SuppressWarnings("unused") -class SnapServer { +class SnapServer implements BesuEvents.InitialSyncCompletionListener { + private static final Logger LOGGER = LoggerFactory.getLogger(SnapServer.class); + private static final int PRIME_STATE_ROOT_CACHE_LIMIT = 128; + private static final int MAX_ENTRIES_PER_REQUEST = 100000; + private static final int MAX_RESPONSE_SIZE = 2 * 1024 * 1024; + private static final AccountRangeMessage EMPTY_ACCOUNT_RANGE = + AccountRangeMessage.create(new HashMap<>(), new ArrayDeque<>()); + private static final StorageRangeMessage EMPTY_STORAGE_RANGE = + StorageRangeMessage.create(new ArrayDeque<>(), Collections.emptyList()); + private static final TrieNodesMessage EMPTY_TRIE_NODES_MESSAGE = + TrieNodesMessage.create(new ArrayList<>()); + private static final ByteCodesMessage EMPTY_BYTE_CODES_MESSAGE = + ByteCodesMessage.create(new ArrayDeque<>()); + static final Hash HASH_LAST = Hash.wrap(Bytes32.leftPad(Bytes.fromHexString("FF"), (byte) 0xFF)); + + private final AtomicBoolean isStarted = new AtomicBoolean(false); + private final AtomicLong listenerId = new AtomicLong(); private final EthMessages snapMessages; - private final WorldStateArchive worldStateArchive; - SnapServer(final EthMessages snapMessages, final WorldStateArchive worldStateArchive) { + // provide worldstate storage by root hash + private final Function, Optional> + worldStateStorageProvider; + private final WorldStateStorageCoordinator worldStateStorageCoordinator; + + SnapServer( + final EthMessages snapMessages, + final WorldStateStorageCoordinator worldStateStorageCoordinator, + final ProtocolContext protocolContext) { + this( + snapMessages, + worldStateStorageCoordinator, + rootHash -> + ((BonsaiWorldStateProvider) protocolContext.getWorldStateArchive()) + .getCachedWorldStorageManager() + .getStorageByRootHash(rootHash) + .map(BonsaiWorldStateKeyValueStorage.class::cast)); + + Optional.of(protocolContext.getWorldStateArchive()) + .filter(__ -> worldStateStorageCoordinator.isMatchingFlatMode(FlatDbMode.FULL)) + .map(BonsaiWorldStateProvider.class::cast) + .ifPresent( + flatArchive -> { + var cachedStorageManager = flatArchive.getCachedWorldStorageManager(); + var blockchain = protocolContext.getBlockchain(); + + // prime state-root-to-blockhash cache + primeWorldStateArchive(cachedStorageManager, blockchain); + + // subscribe to initial sync completed events to start/stop snap server: + protocolContext + .getSynchronizer() + .filter(z -> z instanceof DefaultSynchronizer) + .map(DefaultSynchronizer.class::cast) + .ifPresentOrElse( + z -> this.listenerId.set(z.subscribeInitialSync(this)), + () -> LOGGER.warn("SnapServer created without reference to sync status")); + }); + } + + /** + * Create a snap server without registering a listener for worldstate initial sync events or + * priming worldstates by root hash. + */ + @VisibleForTesting + SnapServer( + final EthMessages snapMessages, + final WorldStateStorageCoordinator worldStateStorageCoordinator, + final Function, Optional> + worldStateStorageProvider) { this.snapMessages = snapMessages; - this.worldStateArchive = worldStateArchive; - this.registerResponseConstructors(); + this.worldStateStorageCoordinator = worldStateStorageCoordinator; + this.worldStateStorageProvider = worldStateStorageProvider; + registerResponseConstructors(); + } + + @Override + public void onInitialSyncCompleted() { + start(); + } + + @Override + public void onInitialSyncRestart() { + stop(); + } + + public SnapServer start() { + isStarted.set(true); + return this; + } + + public SnapServer stop() { + isStarted.set(false); + return this; + } + + private void primeWorldStateArchive( + final DiffBasedCachedWorldStorageManager storageManager, final Blockchain blockchain) { + // at startup, prime the latest worldstates by roothash: + storageManager.primeRootToBlockHashCache(blockchain, PRIME_STATE_ROOT_CACHE_LIMIT); } private void registerResponseConstructors() { snapMessages.registerResponseConstructor( - SnapV1.GET_ACCOUNT_RANGE, - messageData -> constructGetAccountRangeResponse(worldStateArchive, messageData)); + SnapV1.GET_ACCOUNT_RANGE, messageData -> constructGetAccountRangeResponse(messageData)); snapMessages.registerResponseConstructor( - SnapV1.GET_STORAGE_RANGE, - messageData -> constructGetStorageRangeResponse(worldStateArchive, messageData)); + SnapV1.GET_STORAGE_RANGE, messageData -> constructGetStorageRangeResponse(messageData)); snapMessages.registerResponseConstructor( - SnapV1.GET_BYTECODES, - messageData -> constructGetBytecodesResponse(worldStateArchive, messageData)); + SnapV1.GET_BYTECODES, messageData -> constructGetBytecodesResponse(messageData)); snapMessages.registerResponseConstructor( - SnapV1.GET_TRIE_NODES, - messageData -> constructGetTrieNodesResponse(worldStateArchive, messageData)); + SnapV1.GET_TRIE_NODES, messageData -> constructGetTrieNodesResponse(messageData)); + } + + MessageData constructGetAccountRangeResponse(final MessageData message) { + if (!isStarted.get()) { + return EMPTY_ACCOUNT_RANGE; + } + StopWatch stopWatch = StopWatch.createStarted(); + + final GetAccountRangeMessage getAccountRangeMessage = GetAccountRangeMessage.readFrom(message); + final GetAccountRangeMessage.Range range = getAccountRangeMessage.range(true); + final int maxResponseBytes = Math.min(range.responseBytes().intValue(), MAX_RESPONSE_SIZE); + + LOGGER + .atTrace() + .setMessage("Receive getAccountRangeMessage for {} from {} to {}") + .addArgument(() -> asLogHash(range.worldStateRootHash())) + .addArgument(() -> asLogHash(range.startKeyHash())) + .addArgument(() -> asLogHash(range.endKeyHash())) + .log(); + try { + return worldStateStorageProvider + .apply(Optional.of(range.worldStateRootHash())) + .map( + storage -> { + LOGGER.trace("obtained worldstate in {}", stopWatch); + NavigableMap accounts = + storage.streamFlatAccounts( + range.startKeyHash(), + range.endKeyHash(), + new StatefulPredicate( + "account", + maxResponseBytes, + (pair) -> { + var rlpOutput = new BytesValueRLPOutput(); + rlpOutput.startList(); + rlpOutput.writeBytes(pair.getFirst()); + rlpOutput.writeRLPBytes(pair.getSecond()); + rlpOutput.endList(); + return rlpOutput.encodedSize(); + })); + + if (accounts.isEmpty()) { + // fetch next account after range, if it exists + LOGGER.debug( + "found no accounts in range, taking first value starting from {}", + asLogHash(range.endKeyHash())); + accounts = storage.streamFlatAccounts(range.endKeyHash(), UInt256.MAX_VALUE, 1L); + } + + final var worldStateProof = + new WorldStateProofProvider(worldStateStorageCoordinator); + final List proof = + worldStateProof.getAccountProofRelatedNodes( + range.worldStateRootHash(), Hash.wrap(range.startKeyHash())); + + if (!accounts.isEmpty()) { + proof.addAll( + worldStateProof.getAccountProofRelatedNodes( + range.worldStateRootHash(), Hash.wrap(accounts.lastKey()))); + } + var resp = AccountRangeMessage.create(accounts, proof); + if (accounts.isEmpty()) { + LOGGER.debug( + "returned empty account range message for {} to {}, proof count {}", + asLogHash(range.startKeyHash()), + asLogHash(range.endKeyHash()), + proof.size()); + } + LOGGER.debug( + "returned in {} account range {} to {} with {} accounts and {} proofs, resp size {} of max {}", + stopWatch, + asLogHash(range.startKeyHash()), + asLogHash(range.endKeyHash()), + accounts.size(), + proof.size(), + resp.getSize(), + maxResponseBytes); + return resp; + }) + .orElseGet( + () -> { + LOGGER.debug("returned empty account range due to worldstate not present"); + return EMPTY_ACCOUNT_RANGE; + }); + } catch (Exception ex) { + LOGGER.error("Unexpected exception serving account range request", ex); + } + return EMPTY_ACCOUNT_RANGE; + } + + MessageData constructGetStorageRangeResponse(final MessageData message) { + if (!isStarted.get()) { + return EMPTY_STORAGE_RANGE; + } + StopWatch stopWatch = StopWatch.createStarted(); + + final GetStorageRangeMessage getStorageRangeMessage = GetStorageRangeMessage.readFrom(message); + final GetStorageRangeMessage.StorageRange range = getStorageRangeMessage.range(true); + final int maxResponseBytes = Math.min(range.responseBytes().intValue(), MAX_RESPONSE_SIZE); + + LOGGER + .atTrace() + .setMessage("Receive get storage range message size {} from {} to {} for {}") + .addArgument(message::getSize) + .addArgument(() -> asLogHash(range.startKeyHash())) + .addArgument( + () -> Optional.ofNullable(range.endKeyHash()).map(SnapServer::asLogHash).orElse("''")) + .addArgument( + () -> + range.hashes().stream() + .map(SnapServer::asLogHash) + .collect(Collectors.joining(",", "[", "]"))) + .log(); + try { + return worldStateStorageProvider + .apply(Optional.of(range.worldStateRootHash())) + .map( + storage -> { + LOGGER.trace("obtained worldstate in {}", stopWatch); + // reusable predicate to limit by rec count and bytes: + var statefulPredicate = + new StatefulPredicate( + "storage", + maxResponseBytes, + (pair) -> { + var slotRlpOutput = new BytesValueRLPOutput(); + slotRlpOutput.startList(); + slotRlpOutput.writeBytes(pair.getFirst()); + slotRlpOutput.writeBytes(pair.getSecond()); + slotRlpOutput.endList(); + return slotRlpOutput.encodedSize(); + }); + + // only honor start and end hash if request is for a single account's storage: + Bytes32 startKeyBytes, endKeyBytes; + boolean isPartialRange = false; + if (range.hashes().size() > 1) { + startKeyBytes = Bytes32.ZERO; + endKeyBytes = HASH_LAST; + } else { + startKeyBytes = range.startKeyHash(); + endKeyBytes = range.endKeyHash(); + isPartialRange = + !(startKeyBytes.equals(Hash.ZERO) && endKeyBytes.equals(HASH_LAST)); + } + + ArrayDeque> collectedStorages = new ArrayDeque<>(); + List proofNodes = new ArrayList<>(); + final var worldStateProof = + new WorldStateProofProvider(worldStateStorageCoordinator); + + for (var forAccountHash : range.hashes()) { + var accountStorages = + storage.streamFlatStorages( + Hash.wrap(forAccountHash), startKeyBytes, endKeyBytes, statefulPredicate); + + //// address partial range queries that return empty + if (accountStorages.isEmpty() && isPartialRange) { + // fetch next slot after range, if it exists + LOGGER.debug( + "found no slots in range, taking first value starting from {}", + asLogHash(range.endKeyHash())); + accountStorages = + storage.streamFlatStorages( + Hash.wrap(forAccountHash), range.endKeyHash(), UInt256.MAX_VALUE, 1L); + } + + // don't send empty storage ranges + if (!accountStorages.isEmpty()) { + collectedStorages.add(accountStorages); + } + + // if a partial storage range was requested, or we interrupted storage due to + // request limits, send proofs: + if (isPartialRange || !statefulPredicate.shouldGetMore()) { + // send a proof for the left side range origin + proofNodes.addAll( + worldStateProof.getStorageProofRelatedNodes( + getAccountStorageRoot(forAccountHash, storage), + forAccountHash, + Hash.wrap(startKeyBytes))); + if (!accountStorages.isEmpty()) { + // send a proof for the last key on the right + proofNodes.addAll( + worldStateProof.getStorageProofRelatedNodes( + getAccountStorageRoot(forAccountHash, storage), + forAccountHash, + Hash.wrap(accountStorages.lastKey()))); + } + } + + if (!statefulPredicate.shouldGetMore()) { + break; + } + } + + var resp = StorageRangeMessage.create(collectedStorages, proofNodes); + LOGGER.debug( + "returned in {} storage {} to {} range {} to {} with {} storages and {} proofs, resp size {} of max {}", + stopWatch, + asLogHash(range.hashes().first()), + asLogHash(range.hashes().last()), + asLogHash(range.startKeyHash()), + asLogHash(range.endKeyHash()), + collectedStorages.size(), + proofNodes.size(), + resp.getSize(), + maxResponseBytes); + return resp; + }) + .orElseGet( + () -> { + LOGGER.debug("returned empty storage range due to missing worldstate"); + return EMPTY_STORAGE_RANGE; + }); + } catch (Exception ex) { + LOGGER.error("Unexpected exception serving storage range request", ex); + return EMPTY_STORAGE_RANGE; + } + } + + MessageData constructGetBytecodesResponse(final MessageData message) { + if (!isStarted.get()) { + return EMPTY_BYTE_CODES_MESSAGE; + } + StopWatch stopWatch = StopWatch.createStarted(); + + final GetByteCodesMessage getByteCodesMessage = GetByteCodesMessage.readFrom(message); + final GetByteCodesMessage.CodeHashes codeHashes = getByteCodesMessage.codeHashes(true); + final int maxResponseBytes = Math.min(codeHashes.responseBytes().intValue(), MAX_RESPONSE_SIZE); + LOGGER + .atTrace() + .setMessage("Receive get bytecodes message for {} hashes") + .addArgument(codeHashes.hashes()::size) + .log(); + + // there is no worldstate root or block header for us to use, so default to head. This + // can cause problems for self-destructed contracts pre-shanghai. for now since this impl + // is deferring to #5889, we can just get any flat code storage and know we are not deleting + // code for now. + try { + return worldStateStorageProvider + .apply(Optional.empty()) + .map( + storage -> { + LOGGER.trace("obtained worldstate in {}", stopWatch); + List codeBytes = new ArrayDeque<>(); + for (Bytes32 codeHash : codeHashes.hashes()) { + Optional optCode = storage.getCode(Hash.wrap(codeHash), null); + if (optCode.isPresent()) { + if (sumListBytes(codeBytes) + optCode.get().size() > maxResponseBytes) { + break; + } + codeBytes.add(optCode.get()); + } + } + var resp = ByteCodesMessage.create(codeBytes); + LOGGER.debug( + "returned in {} code bytes message with {} entries, resp size {} of max {}", + stopWatch, + codeBytes.size(), + resp.getSize(), + maxResponseBytes); + return resp; + }) + .orElseGet( + () -> { + LOGGER.debug("returned empty byte codes message due to missing worldstate"); + return EMPTY_BYTE_CODES_MESSAGE; + }); + } catch (Exception ex) { + LOGGER.error("Unexpected exception serving bytecodes request", ex); + return EMPTY_BYTE_CODES_MESSAGE; + } } - private MessageData constructGetAccountRangeResponse( - final WorldStateArchive worldStateArchive, final MessageData message) { - // TODO implement - return AccountRangeMessage.create(new HashMap<>(), new ArrayDeque<>()); + MessageData constructGetTrieNodesResponse(final MessageData message) { + if (!isStarted.get()) { + return EMPTY_TRIE_NODES_MESSAGE; + } + StopWatch stopWatch = StopWatch.createStarted(); + + final GetTrieNodesMessage getTrieNodesMessage = GetTrieNodesMessage.readFrom(message); + final GetTrieNodesMessage.TrieNodesPaths triePaths = getTrieNodesMessage.paths(true); + final int maxResponseBytes = Math.min(triePaths.responseBytes().intValue(), MAX_RESPONSE_SIZE); + LOGGER + .atTrace() + .setMessage("Receive get trie nodes message of size {}") + .addArgument(() -> triePaths.paths().size()) + .log(); + + try { + return worldStateStorageProvider + .apply(Optional.of(triePaths.worldStateRootHash())) + .map( + storage -> { + LOGGER.trace("obtained worldstate in {}", stopWatch); + ArrayList trieNodes = new ArrayList<>(); + for (var triePath : triePaths.paths()) { + // first element in paths is account + if (triePath.size() == 1) { + // if there is only one path, presume it should be compact encoded account path + var optStorage = + storage.getTrieNodeUnsafe(CompactEncoding.decode(triePath.get(0))); + if (optStorage.isPresent()) { + if (sumListBytes(trieNodes) + optStorage.get().size() > maxResponseBytes) { + break; + } + trieNodes.add(optStorage.get()); + } + + } else { + // otherwise the first element should be account hash, and subsequent paths + // are compact encoded account storage paths + + final Bytes accountPrefix = triePath.get(0); + + List storagePaths = triePath.subList(1, triePath.size()); + for (var path : storagePaths) { + var optStorage = + storage.getTrieNodeUnsafe( + Bytes.concatenate(accountPrefix, CompactEncoding.decode(path))); + if (optStorage.isPresent()) { + if (sumListBytes(trieNodes) + optStorage.get().size() > maxResponseBytes) { + break; + } + trieNodes.add(optStorage.get()); + } + } + } + } + var resp = TrieNodesMessage.create(trieNodes); + LOGGER.debug( + "returned in {} trie nodes message with {} entries, resp size {} of max {}", + stopWatch, + trieNodes.size(), + resp.getCode(), + maxResponseBytes); + return resp; + }) + .orElseGet( + () -> { + LOGGER.debug("returned empty trie nodes message due to missing worldstate"); + return EMPTY_TRIE_NODES_MESSAGE; + }); + } catch (Exception ex) { + LOGGER.error("Unexpected exception serving trienodes request", ex); + return EMPTY_TRIE_NODES_MESSAGE; + } + } + + static class StatefulPredicate implements Predicate> { + + final AtomicInteger byteLimit = new AtomicInteger(0); + final AtomicInteger recordLimit = new AtomicInteger(0); + final AtomicBoolean shouldContinue = new AtomicBoolean(true); + final Function, Integer> encodingSizeAccumulator; + final int maxResponseBytes; + // TODO: remove this hack, 10% is a fudge factor to account for the proof node size + final int maxResponseBytesFudgeFactor; + final String forWhat; + + StatefulPredicate( + final String forWhat, + final int maxResponseBytes, + final Function, Integer> encodingSizeAccumulator) { + this.maxResponseBytes = maxResponseBytes; + this.maxResponseBytesFudgeFactor = maxResponseBytes * 9 / 10; + this.forWhat = forWhat; + this.encodingSizeAccumulator = encodingSizeAccumulator; + } + + public boolean shouldGetMore() { + return shouldContinue.get(); + } + + @Override + public boolean test(final Pair pair) { + LOGGER + .atTrace() + .setMessage("{} pre-accumulate limits, bytes: {} , stream count: {}") + .addArgument(() -> forWhat) + .addArgument(byteLimit::get) + .addArgument(recordLimit::get) + .log(); + + var underRecordLimit = recordLimit.addAndGet(1) <= MAX_ENTRIES_PER_REQUEST; + var underByteLimit = + byteLimit.accumulateAndGet(0, (cur, __) -> cur + encodingSizeAccumulator.apply(pair)) + < maxResponseBytesFudgeFactor; + if (underRecordLimit && underByteLimit) { + return true; + } else { + shouldContinue.set(false); + LOGGER + .atDebug() + .setMessage("{} post-accumulate limits, bytes: {} , stream count: {}") + .addArgument(() -> forWhat) + .addArgument(byteLimit::get) + .addArgument(recordLimit::get) + .log(); + return false; + } + } } - private MessageData constructGetStorageRangeResponse( - final WorldStateArchive worldStateArchive, final MessageData message) { - // TODO implement - return StorageRangeMessage.create(new ArrayDeque<>(), new ArrayDeque<>()); + Hash getAccountStorageRoot( + final Bytes32 accountHash, final BonsaiWorldStateKeyValueStorage storage) { + return storage + .getTrieNodeUnsafe(Bytes.concatenate(accountHash, Bytes.EMPTY)) + .map(Hash::hash) + .orElse(Hash.EMPTY_TRIE_HASH); } - private MessageData constructGetBytecodesResponse( - final WorldStateArchive worldStateArchive, final MessageData message) { - // TODO implement - return ByteCodesMessage.create(new ArrayDeque<>()); + private static int sumListBytes(final List listOfBytes) { + // TODO: remove hack, 10% is a fudge factor to account for the overhead of rlp encoding + return listOfBytes.stream().map(Bytes::size).reduce((a, b) -> a + b).orElse(0) * 11 / 10; } - private MessageData constructGetTrieNodesResponse( - final WorldStateArchive worldStateArchive, final MessageData message) { - return TrieNodesMessage.create(new ArrayDeque<>()); + private static String asLogHash(final Bytes32 hash) { + var str = hash.toHexString(); + return str.substring(0, 4) + ".." + str.substring(59, 63); } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetAccountRangeMessage.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetAccountRangeMessage.java index 66aae554d48..8f5fcaf9a73 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetAccountRangeMessage.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/messages/snap/GetAccountRangeMessage.java @@ -22,6 +22,7 @@ import org.hyperledger.besu.ethereum.rlp.RLPInput; import java.math.BigInteger; +import java.util.Optional; import org.apache.tuweni.bytes.Bytes; import org.apache.tuweni.bytes.Bytes32; @@ -95,6 +96,8 @@ public Range range(final boolean withRequestId) { @Value.Immutable public interface Range { + Optional requestId(); + Hash worldStateRootHash(); Hash startKeyHash(); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java index 28c1d193d19..af35898ac22 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/DefaultSynchronizer.java @@ -38,6 +38,7 @@ import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; import org.hyperledger.besu.metrics.BesuMetricCategory; import org.hyperledger.besu.plugin.data.SyncStatus; +import org.hyperledger.besu.plugin.services.BesuEvents; import org.hyperledger.besu.plugin.services.BesuEvents.SyncStatusListener; import org.hyperledger.besu.plugin.services.MetricsSystem; import org.hyperledger.besu.util.log.FramedLogMessage; @@ -362,6 +363,14 @@ public boolean unsubscribeInSync(final long listenerId) { return syncState.unsubscribeSyncStatus(listenerId); } + public long subscribeInitialSync(final BesuEvents.InitialSyncCompletionListener listener) { + return syncState.subscribeCompletionReached(listener); + } + + public boolean unsubscribeInitialSync(final long listenerId) { + return syncState.unsubscribeInitialConditionReached(listenerId); + } + private Void finalizeSync(final Void unused) { LOG.info("Stopping block propagation."); blockPropagationManager.ifPresent(BlockPropagationManager::stop); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java index 7221c8da4fa..ef397ead75f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/StackTrie.java @@ -123,38 +123,41 @@ public void commit(final FlatDatabaseUpdater flatDatabaseUpdater, final NodeUpda proofsEntries.put(Hash.hash(proof), proof); } - final InnerNodeDiscoveryManager snapStoredNodeFactory = - new InnerNodeDiscoveryManager<>( - (location, hash) -> Optional.ofNullable(proofsEntries.get(hash)), - Function.identity(), - Function.identity(), - startKeyHash, - proofs.isEmpty() ? RangeManager.MAX_RANGE : keys.lastKey(), - true); - - final MerkleTrie trie = - new StoredMerklePatriciaTrie<>( - snapStoredNodeFactory, proofs.isEmpty() ? MerkleTrie.EMPTY_TRIE_NODE_HASH : rootHash); - - for (Map.Entry entry : keys.entrySet()) { - trie.put(entry.getKey(), entry.getValue()); - } - - keys.forEach(flatDatabaseUpdater::update); - - trie.commit( - nodeUpdater, - (new SnapCommitVisitor<>( - nodeUpdater, - startKeyHash, - proofs.isEmpty() ? RangeManager.MAX_RANGE : keys.lastKey()) { - @Override - public void maybeStoreNode(final Bytes location, final Node node) { - if (!node.isHealNeeded()) { - super.maybeStoreNode(location, node); + if (!keys.isEmpty()) { + final InnerNodeDiscoveryManager snapStoredNodeFactory = + new InnerNodeDiscoveryManager<>( + (location, hash) -> Optional.ofNullable(proofsEntries.get(hash)), + Function.identity(), + Function.identity(), + startKeyHash, + proofs.isEmpty() ? RangeManager.MAX_RANGE : keys.lastKey(), + true); + + final MerkleTrie trie = + new StoredMerklePatriciaTrie<>( + snapStoredNodeFactory, + proofs.isEmpty() ? MerkleTrie.EMPTY_TRIE_NODE_HASH : rootHash); + + for (Map.Entry entry : keys.entrySet()) { + trie.put(entry.getKey(), entry.getValue()); + } + + keys.forEach(flatDatabaseUpdater::update); + + trie.commit( + nodeUpdater, + (new SnapCommitVisitor<>( + nodeUpdater, + startKeyHash, + proofs.isEmpty() ? RangeManager.MAX_RANGE : keys.lastKey()) { + @Override + public void maybeStoreNode(final Bytes location, final Node node) { + if (!node.isHealNeeded()) { + super.maybeStoreNode(location, node); + } } - } - })); + })); + } } } diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java index bb98534fd0e..b01722240d4 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/AccountRangeDataRequest.java @@ -160,6 +160,12 @@ public void addResponse( if (!accounts.isEmpty() || !proofs.isEmpty()) { if (!worldStateProofProvider.isValidRangeProof( startKeyHash, endKeyHash, getRootHash(), proofs, accounts)) { + // this happens on repivot and on bad proofs + LOG.atTrace() + .setMessage("invalid range proof received for account range {} {}") + .addArgument(accounts.firstKey()) + .addArgument(accounts.lastKey()) + .log(); isProofValid = Optional.of(false); } else { stackTrie.addElement(startKeyHash, proofs, accounts); diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java index 81b4897a863..c3d8c53c45f 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/StorageRangeDataRequest.java @@ -136,6 +136,13 @@ public void addResponse( startKeyHash, endKeyHash, storageRoot, proofs, slots)) { // If the proof is invalid, it means that the storage will be a mix of several blocks. // Therefore, it will be necessary to heal the account's storage subsequently + LOG.atDebug() + .setMessage("invalid storage range proof received for account hash {} range {} {}") + .addArgument(() -> accountHash) + .addArgument(() -> slots.isEmpty() ? "none" : slots.firstKey()) + .addArgument(() -> slots.isEmpty() ? "none" : slots.lastKey()) + .log(); + downloadState.addAccountToHealingList(CompactEncoding.bytesToPath(accountHash)); // We will request the new storage root of the account because it is apparently no longer // valid with the new pivot block. diff --git a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/StorageFlatDatabaseHealingRangeRequest.java b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/StorageFlatDatabaseHealingRangeRequest.java index ed81fe65d50..4d8fd028b25 100644 --- a/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/StorageFlatDatabaseHealingRangeRequest.java +++ b/ethereum/eth/src/main/java/org/hyperledger/besu/ethereum/eth/sync/snapsync/request/heal/StorageFlatDatabaseHealingRangeRequest.java @@ -124,7 +124,7 @@ public void addLocalData( final NavigableMap slots, final ArrayDeque proofs) { if (!slots.isEmpty() && !proofs.isEmpty()) { - // very proof in order to check if the local flat database is valid or not + // verify proof in order to check if the local flat database is valid or not isProofValid = worldStateProofProvider.isValidRangeProof( startKeyHash, endKeyHash, storageRoot, proofs, slots); diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServerTest.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServerTest.java new file mode 100644 index 00000000000..fb6f63e1e1c --- /dev/null +++ b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/snap/SnapServerTest.java @@ -0,0 +1,604 @@ +/* + * Copyright Hyperledger Besu Contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ +package org.hyperledger.besu.ethereum.eth.manager.snap; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.hyperledger.besu.ethereum.eth.manager.snap.SnapServer.HASH_LAST; +import static org.hyperledger.besu.ethereum.worldstate.DataStorageConfiguration.BONSAI_CODE_BY_HASH_CONFIG; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; + +import org.hyperledger.besu.datatypes.Hash; +import org.hyperledger.besu.datatypes.Wei; +import org.hyperledger.besu.ethereum.core.InMemoryKeyValueStorageProvider; +import org.hyperledger.besu.ethereum.eth.manager.EthMessages; +import org.hyperledger.besu.ethereum.eth.messages.snap.AccountRangeMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.ByteCodesMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.GetAccountRangeMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.GetByteCodesMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.GetStorageRangeMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.GetTrieNodesMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.StorageRangeMessage; +import org.hyperledger.besu.ethereum.eth.messages.snap.TrieNodesMessage; +import org.hyperledger.besu.ethereum.proof.WorldStateProofProvider; +import org.hyperledger.besu.ethereum.rlp.BytesValueRLPOutput; +import org.hyperledger.besu.ethereum.rlp.RLP; +import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStorageProvider; +import org.hyperledger.besu.ethereum.trie.CompactEncoding; +import org.hyperledger.besu.ethereum.trie.MerkleTrie; +import org.hyperledger.besu.ethereum.trie.diffbased.bonsai.storage.BonsaiWorldStateKeyValueStorage; +import org.hyperledger.besu.ethereum.trie.patricia.SimpleMerklePatriciaTrie; +import org.hyperledger.besu.ethereum.trie.patricia.StoredMerklePatriciaTrie; +import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue; +import org.hyperledger.besu.ethereum.worldstate.WorldStateStorageCoordinator; +import org.hyperledger.besu.metrics.ObservableMetricsSystem; +import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem; + +import java.math.BigInteger; +import java.util.Collections; +import java.util.List; +import java.util.NavigableMap; +import java.util.Optional; +import java.util.Random; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.tuweni.bytes.Bytes; +import org.apache.tuweni.bytes.Bytes32; +import org.junit.jupiter.api.Test; + +public class SnapServerTest { + static Random rand = new Random(); + + record SnapTestAccount( + Hash addressHash, + StateTrieAccountValue accountValue, + MerkleTrie storage, + Bytes code) { + Bytes accountRLP() { + return RLP.encode(accountValue::writeTo); + } + } + + static final ObservableMetricsSystem noopMetrics = new NoOpMetricsSystem(); + + final KeyValueStorageProvider storageProvider = new InMemoryKeyValueStorageProvider(); + final BonsaiWorldStateKeyValueStorage inMemoryStorage = + new BonsaiWorldStateKeyValueStorage(storageProvider, noopMetrics, BONSAI_CODE_BY_HASH_CONFIG); + final WorldStateStorageCoordinator storageCoordinator = + new WorldStateStorageCoordinator(inMemoryStorage); + final StoredMerklePatriciaTrie storageTrie = + new StoredMerklePatriciaTrie<>( + inMemoryStorage::getAccountStateTrieNode, Function.identity(), Function.identity()); + final WorldStateProofProvider proofProvider = new WorldStateProofProvider(storageCoordinator); + + final Function, Optional> spyProvider = + spy( + new Function, Optional>() { + // explicit non-final class is necessary for Mockito to spy: + @Override + public Optional apply(final Optional hash) { + return Optional.of(inMemoryStorage); + } + }); + + final SnapServer snapServer = + new SnapServer(new EthMessages(), storageCoordinator, spyProvider).start(); + + final SnapTestAccount acct1 = createTestAccount("10"); + final SnapTestAccount acct2 = createTestAccount("20"); + final SnapTestAccount acct3 = createTestContractAccount("30", inMemoryStorage); + final SnapTestAccount acct4 = createTestContractAccount("40", inMemoryStorage); + + @Test + public void assertNoStartNoOp() { + // account found at startHash + insertTestAccounts(acct4, acct3, acct1, acct2); + + // stop snap server so that we should not be processing snap requests + snapServer.stop(); + + var rangeData = requestAccountRange(acct1.addressHash, acct4.addressHash).accountData(false); + + // assert empty account response and no attempt to fetch worldstate + assertThat(rangeData.accounts().isEmpty()).isTrue(); + assertThat(rangeData.proofs().isEmpty()).isTrue(); + verify(spyProvider, never()).apply(any()); + + // assert empty storage response and no attempt to fetch worldstate + var storageRange = + requestStorageRange(List.of(acct3.addressHash), Hash.ZERO, HASH_LAST).slotsData(false); + assertThat(storageRange.slots().isEmpty()).isTrue(); + assertThat(storageRange.proofs().isEmpty()).isTrue(); + verify(spyProvider, never()).apply(any()); + + // assert empty trie nodes response and no attempt to fetch worldstate + var trieNodes = + requestTrieNodes(storageTrie.getRootHash(), List.of(List.of(Bytes.fromHexString("0x01")))) + .nodes(false); + assertThat(trieNodes.isEmpty()).isTrue(); + verify(spyProvider, never()).apply(any()); + + // assert empty code response and no attempt to fetch worldstate + var codes = + requestByteCodes(List.of(acct3.accountValue.getCodeHash())).bytecodes(false).codes(); + assertThat(codes.isEmpty()).isTrue(); + verify(spyProvider, never()).apply(any()); + } + + @Test + public void assertEmptyRangeLeftProofOfExclusionAndNextAccount() { + // for a range request that returns empty, we should return just a proof of exclusion on the + // left and the next account after the limit hash + insertTestAccounts(acct1, acct4); + + var rangeData = + getAndVerifyAccountRangeData(requestAccountRange(acct2.addressHash, acct3.addressHash), 1); + + // expect to find only one value acct4, outside the requested range + var outOfRangeVal = rangeData.accounts().entrySet().stream().findFirst(); + assertThat(outOfRangeVal).isPresent(); + assertThat(outOfRangeVal.get().getKey()).isEqualTo(acct4.addressHash()); + + // assert proofs are valid for the requested range + assertThat(assertIsValidAccountRangeProof(acct2.addressHash, rangeData)).isTrue(); + } + + @Test + public void assertAccountLimitRangeResponse() { + // assert we limit the range response according to size + final int acctCount = 2000; + final long acctRLPSize = 105; + + List randomLoad = IntStream.range(1, 4096).boxed().collect(Collectors.toList()); + Collections.shuffle(randomLoad); + randomLoad.stream() + .forEach( + i -> + insertTestAccounts( + createTestAccount( + Bytes.concatenate( + Bytes.fromHexString("0x40"), + Bytes.fromHexStringLenient(Integer.toHexString(i * 256))) + .toHexString()))); + + final BytesValueRLPOutput tmp = new BytesValueRLPOutput(); + tmp.startList(); + tmp.writeBytes(storageTrie.getRootHash()); + tmp.writeBytes(Hash.ZERO); + tmp.writeBytes(HASH_LAST); + tmp.writeBigIntegerScalar(BigInteger.valueOf(acctRLPSize * acctCount)); + tmp.endList(); + var tinyRangeLimit = new GetAccountRangeMessage(tmp.encoded()).wrapMessageData(BigInteger.ONE); + + var rangeData = + getAndVerifyAccountRangeData( + (AccountRangeMessage) snapServer.constructGetAccountRangeResponse(tinyRangeLimit), + // TODO: after sorting out the request fudge factor, adjust this assertion to match + acctCount * 90 / 100 - 1); + + // assert proofs are valid for the requested range + assertThat(assertIsValidAccountRangeProof(Hash.ZERO, rangeData)).isTrue(); + } + + @Test + public void assertLastEmptyRange() { + // When our final range request is empty, no next account is possible, + // and we should return just a proof of exclusion of the right + insertTestAccounts(acct1, acct2); + var rangeData = + getAndVerifyAccountRangeData(requestAccountRange(acct3.addressHash, acct4.addressHash), 0); + + // assert proofs are valid for the requested range + assertThat(assertIsValidAccountRangeProof(acct3.addressHash, rangeData)).isTrue(); + } + + @Test + public void assertAccountFoundAtStartHashProof() { + // account found at startHash + insertTestAccounts(acct4, acct3, acct1, acct2); + var rangeData = + getAndVerifyAccountRangeData(requestAccountRange(acct1.addressHash, acct4.addressHash), 4); + + // assert proofs are valid for requested range + assertThat(assertIsValidAccountRangeProof(acct1.addressHash, rangeData)).isTrue(); + } + + @Test + public void assertCompleteStorageForSingleAccount() { + insertTestAccounts(acct1, acct2, acct3, acct4); + var rangeData = requestStorageRange(List.of(acct3.addressHash), Hash.ZERO, HASH_LAST); + assertThat(rangeData).isNotNull(); + var slotsData = rangeData.slotsData(false); + assertThat(slotsData).isNotNull(); + assertThat(slotsData.slots()).isNotNull(); + assertThat(slotsData.slots().size()).isEqualTo(1); + var firstAccountStorages = slotsData.slots().first(); + assertThat(firstAccountStorages.size()).isEqualTo(10); + // no proofs for complete storage range: + assertThat(slotsData.proofs().size()).isEqualTo(0); + + assertThat( + assertIsValidStorageProof(acct3, Hash.ZERO, firstAccountStorages, slotsData.proofs())) + .isTrue(); + } + + @Test + public void assertPartialStorageForSingleAccountEmptyRange() { + insertTestAccounts(acct3); + var rangeData = + requestStorageRange( + List.of(acct3.addressHash), Hash.ZERO, Hash.fromHexStringLenient("0x00ff")); + assertThat(rangeData).isNotNull(); + var slotsData = rangeData.slotsData(false); + assertThat(slotsData).isNotNull(); + assertThat(slotsData.slots()).isNotNull(); + // expect 1 slot PAST the requested empty range + assertThat(slotsData.slots().size()).isEqualTo(1); + // expect left and right proofs for empty storage range: + assertThat(slotsData.proofs().size()).isGreaterThan(0); + // assert proofs are valid for the requested range + assertThat( + assertIsValidStorageProof( + acct3, Hash.ZERO, slotsData.slots().first(), slotsData.proofs())) + .isTrue(); + } + + @Test + public void assertLastEmptyPartialStorageForSingleAccount() { + // When our final range request is empty, no next account is possible, + // and we should return just a proof of exclusion of the right + + insertTestAccounts(acct3); + var rangeData = requestStorageRange(List.of(acct3.addressHash), HASH_LAST, HASH_LAST); + assertThat(rangeData).isNotNull(); + var slotsData = rangeData.slotsData(false); + assertThat(slotsData).isNotNull(); + assertThat(slotsData.slots()).isNotNull(); + // expect no slots PAST the requested empty range + assertThat(slotsData.slots().size()).isEqualTo(0); + // expect left and right proofs for empty storage range: + assertThat(slotsData.proofs().size()).isGreaterThan(0); + // assert proofs are valid for the requested range + assertThat( + assertIsValidStorageProof( + acct3, + Hash.fromHexStringLenient("0xFF"), + Collections.emptyNavigableMap(), + slotsData.proofs())) + .isTrue(); + } + + @Test + public void assertStorageLimitRangeResponse() { + // assert we limit the range response according to bytessize + final int storageSlotSize = 70; + final int storageSlotCount = 16; + insertTestAccounts(acct1, acct2, acct3, acct4); + + final BytesValueRLPOutput tmp = new BytesValueRLPOutput(); + tmp.startList(); + tmp.writeBigIntegerScalar(BigInteger.ONE); + tmp.writeBytes(storageTrie.getRootHash()); + tmp.writeList( + List.of(acct3.addressHash, acct4.addressHash), + (hash, rlpOutput) -> rlpOutput.writeBytes(hash)); + tmp.writeBytes(Hash.ZERO); + tmp.writeBytes(HASH_LAST); + tmp.writeBigIntegerScalar(BigInteger.valueOf(storageSlotCount * storageSlotSize)); + tmp.endList(); + var tinyRangeLimit = new GetStorageRangeMessage(tmp.encoded()); + + var rangeData = + (StorageRangeMessage) snapServer.constructGetStorageRangeResponse(tinyRangeLimit); + + // assert proofs are valid for the requested range + assertThat(rangeData).isNotNull(); + var slotsData = rangeData.slotsData(false); + assertThat(slotsData).isNotNull(); + assertThat(slotsData.slots()).isNotNull(); + assertThat(slotsData.slots().size()).isEqualTo(2); + var firstAccountStorages = slotsData.slots().first(); + // expecting to see complete 10 slot storage for acct3 + assertThat(firstAccountStorages.size()).isEqualTo(10); + var secondAccountStorages = slotsData.slots().last(); + // expecting to see only 6 since request was limited to 16 slots + // TODO: after sorting out the request fudge factor, adjust this assertion to match + assertThat(secondAccountStorages.size()).isEqualTo(6 * 90 / 100 - 1); + // proofs required for interrupted storage range: + assertThat(slotsData.proofs().size()).isNotEqualTo(0); + + assertThat( + assertIsValidStorageProof(acct4, Hash.ZERO, secondAccountStorages, slotsData.proofs())) + .isTrue(); + } + + @Test + public void assertAccountTriePathRequest() { + insertTestAccounts(acct1, acct2, acct3, acct4); + var partialPathToAcct2 = CompactEncoding.bytesToPath(acct2.addressHash).slice(0, 1); + var partialPathToAcct1 = Bytes.fromHexString("0x01"); // first nibble is 1 + var trieNodeRequest = + requestTrieNodes( + storageTrie.getRootHash(), + List.of(List.of(partialPathToAcct2), List.of(partialPathToAcct1))); + assertThat(trieNodeRequest).isNotNull(); + List trieNodes = trieNodeRequest.nodes(false); + assertThat(trieNodes).isNotNull(); + assertThat(trieNodes.size()).isEqualTo(2); + } + + @Test + public void assertAccountTrieLimitRequest() { + insertTestAccounts(acct1, acct2, acct3, acct4); + final int accountNodeSize = 147; + final int accountNodeLimit = 3; + + var partialPathToAcct1 = Bytes.fromHexString("0x01"); // first nibble is 1 + var partialPathToAcct2 = CompactEncoding.bytesToPath(acct2.addressHash).slice(0, 1); + var partialPathToAcct3 = Bytes.fromHexString("0x03"); // first nibble is 1 + var partialPathToAcct4 = Bytes.fromHexString("0x04"); // first nibble is 1 + final BytesValueRLPOutput tmp = new BytesValueRLPOutput(); + tmp.startList(); + tmp.writeBigIntegerScalar(BigInteger.ONE); + tmp.writeBytes(storageTrie.getRootHash()); + tmp.writeList( + List.of( + List.of(partialPathToAcct4), + List.of(partialPathToAcct3), + List.of(partialPathToAcct2), + List.of(partialPathToAcct1)), + (path, rlpOutput) -> + rlpOutput.writeList(path, (b, subRlpOutput) -> subRlpOutput.writeBytes(b))); + tmp.writeBigIntegerScalar(BigInteger.valueOf(accountNodeLimit * accountNodeSize)); + tmp.endList(); + + var trieNodeRequest = + (TrieNodesMessage) + snapServer.constructGetTrieNodesResponse(new GetTrieNodesMessage(tmp.encoded())); + + assertThat(trieNodeRequest).isNotNull(); + List trieNodes = trieNodeRequest.nodes(false); + assertThat(trieNodes).isNotNull(); + // TODO: adjust this assertion after sorting out the request fudge factor + assertThat(trieNodes.size()).isEqualTo(accountNodeLimit * 90 / 100); + } + + @Test + public void assertStorageTriePathRequest() { + insertTestAccounts(acct1, acct2, acct3, acct4); + var pathToSlot11 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0101")); + var pathToSlot12 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0102")); + var pathToSlot1a = CompactEncoding.encode(Bytes.fromHexStringLenient("0x010A")); // not present + var trieNodeRequest = + requestTrieNodes( + storageTrie.getRootHash(), + List.of( + List.of(acct3.addressHash, pathToSlot11, pathToSlot12, pathToSlot1a), + List.of(acct4.addressHash, pathToSlot11, pathToSlot12, pathToSlot1a))); + assertThat(trieNodeRequest).isNotNull(); + List trieNodes = trieNodeRequest.nodes(false); + assertThat(trieNodes).isNotNull(); + assertThat(trieNodes.size()).isEqualTo(4); + } + + @Test + public void assertStorageTrieLimitRequest() { + insertTestAccounts(acct1, acct2, acct3, acct4); + final int trieNodeSize = 69; + final int trieNodeLimit = 3; + + var pathToSlot11 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0101")); + var pathToSlot12 = CompactEncoding.encode(Bytes.fromHexStringLenient("0x0102")); + var pathToSlot1a = CompactEncoding.encode(Bytes.fromHexStringLenient("0x010A")); // not present + + final BytesValueRLPOutput tmp = new BytesValueRLPOutput(); + tmp.startList(); + tmp.writeBigIntegerScalar(BigInteger.ONE); + tmp.writeBytes(storageTrie.getRootHash()); + tmp.writeList( + List.of( + List.of(acct3.addressHash, pathToSlot11, pathToSlot12, pathToSlot1a), + List.of(acct4.addressHash, pathToSlot11, pathToSlot12, pathToSlot1a)), + (path, rlpOutput) -> + rlpOutput.writeList(path, (b, subRlpOutput) -> subRlpOutput.writeBytes(b))); + tmp.writeBigIntegerScalar(BigInteger.valueOf(trieNodeLimit * trieNodeSize)); + tmp.endList(); + + var trieNodeRequest = + (TrieNodesMessage) + snapServer.constructGetTrieNodesResponse(new GetTrieNodesMessage(tmp.encoded())); + + assertThat(trieNodeRequest).isNotNull(); + List trieNodes = trieNodeRequest.nodes(false); + assertThat(trieNodes).isNotNull(); + // TODO: adjust this assertion after sorting out the request fudge factor + assertThat(trieNodes.size()).isEqualTo(trieNodeLimit * 90 / 100); + } + + @Test + public void assertCodePresent() { + insertTestAccounts(acct1, acct2, acct3, acct4); + var codeRequest = + requestByteCodes( + List.of(acct3.accountValue.getCodeHash(), acct4.accountValue.getCodeHash())); + assertThat(codeRequest).isNotNull(); + ByteCodesMessage.ByteCodes codes = codeRequest.bytecodes(false); + assertThat(codes).isNotNull(); + assertThat(codes.codes().size()).isEqualTo(2); + } + + @Test + public void assertCodeLimitRequest() { + insertTestAccounts(acct1, acct2, acct3, acct4); + final int codeSize = 32; + final int codeLimit = 2; + + final BytesValueRLPOutput tmp = new BytesValueRLPOutput(); + tmp.startList(); + tmp.writeBigIntegerScalar(BigInteger.ONE); + tmp.writeList( + List.of(acct3.accountValue.getCodeHash(), acct4.accountValue.getCodeHash()), + (hash, rlpOutput) -> rlpOutput.writeBytes(hash)); + tmp.writeBigIntegerScalar(BigInteger.valueOf(codeSize * codeLimit)); + tmp.endList(); + + var codeRequest = + (ByteCodesMessage) + snapServer.constructGetBytecodesResponse(new GetByteCodesMessage(tmp.encoded())); + + assertThat(codeRequest).isNotNull(); + ByteCodesMessage.ByteCodes codes = codeRequest.bytecodes(false); + assertThat(codes).isNotNull(); + // TODO adjust this assertion after sorting out the request fudge factor + assertThat(codes.codes().size()).isEqualTo(codeLimit * 90 / 100); + } + + static SnapTestAccount createTestAccount(final String hexAddr) { + return new SnapTestAccount( + Hash.wrap(Bytes32.rightPad(Bytes.fromHexString(hexAddr))), + new StateTrieAccountValue( + rand.nextInt(0, 1), Wei.of(rand.nextLong(0L, 1L)), Hash.EMPTY_TRIE_HASH, Hash.EMPTY), + new SimpleMerklePatriciaTrie<>(a -> a), + Bytes.EMPTY); + } + + static SnapTestAccount createTestContractAccount( + final String hexAddr, final BonsaiWorldStateKeyValueStorage storage) { + Hash acctHash = Hash.wrap(Bytes32.rightPad(Bytes.fromHexString(hexAddr))); + MerkleTrie trie = + new StoredMerklePatriciaTrie<>( + (loc, hash) -> storage.getAccountStorageTrieNode(acctHash, loc, hash), + Hash.EMPTY_TRIE_HASH, + a -> a, + a -> a); + Bytes32 mockCode = Bytes32.random(); + + // mock some storage data + var flatdb = storage.getFlatDbStrategy(); + var updater = storage.updater(); + updater.putCode(Hash.hash(mockCode), mockCode); + IntStream.range(10, 20) + .boxed() + .forEach( + i -> { + Bytes32 mockBytes32 = Bytes32.rightPad(Bytes.fromHexString(i.toString())); + var rlpOut = new BytesValueRLPOutput(); + rlpOut.writeBytes(mockBytes32); + trie.put(mockBytes32, rlpOut.encoded()); + flatdb.putFlatAccountStorageValueByStorageSlotHash( + updater.getWorldStateTransaction(), + acctHash, + Hash.wrap(mockBytes32), + mockBytes32); + }); + trie.commit( + (location, key, value) -> + updater.putAccountStorageTrieNode(acctHash, location, key, value)); + updater.commit(); + return new SnapTestAccount( + acctHash, + new StateTrieAccountValue( + rand.nextInt(0, 1), Wei.of(rand.nextLong(0L, 1L)), + Hash.wrap(trie.getRootHash()), Hash.hash(mockCode)), + trie, + mockCode); + } + + void insertTestAccounts(final SnapTestAccount... accounts) { + final var updater = inMemoryStorage.updater(); + for (SnapTestAccount account : accounts) { + updater.putAccountInfoState(account.addressHash(), account.accountRLP()); + storageTrie.put(account.addressHash(), account.accountRLP()); + } + storageTrie.commit(updater::putAccountStateTrieNode); + updater.commit(); + } + + boolean assertIsValidAccountRangeProof( + final Hash startHash, final AccountRangeMessage.AccountRangeData accountRange) { + Bytes32 lastKey = + Optional.of(accountRange.accounts()) + .filter(z -> z.size() > 0) + .map(NavigableMap::lastKey) + .orElse(startHash); + + return proofProvider.isValidRangeProof( + startHash, + lastKey, + storageTrie.getRootHash(), + accountRange.proofs(), + accountRange.accounts()); + } + + boolean assertIsValidStorageProof( + final SnapTestAccount account, + final Hash startHash, + final NavigableMap slotRangeData, + final List proofs) { + + Bytes32 lastKey = + Optional.of(slotRangeData) + .filter(z -> z.size() > 0) + .map(NavigableMap::lastKey) + .orElse(startHash); + + // this is only working for single account ranges for now + return proofProvider.isValidRangeProof( + startHash, lastKey, account.accountValue.getStorageRoot(), proofs, slotRangeData); + } + + AccountRangeMessage requestAccountRange(final Hash startHash, final Hash limitHash) { + return (AccountRangeMessage) + snapServer.constructGetAccountRangeResponse( + GetAccountRangeMessage.create( + Hash.wrap(storageTrie.getRootHash()), startHash, limitHash) + .wrapMessageData(BigInteger.ONE)); + } + + StorageRangeMessage requestStorageRange( + final List accountHashes, final Hash startHash, final Hash limitHash) { + return (StorageRangeMessage) + snapServer.constructGetStorageRangeResponse( + GetStorageRangeMessage.create( + Hash.wrap(storageTrie.getRootHash()), accountHashes, startHash, limitHash) + .wrapMessageData(BigInteger.ONE)); + } + + TrieNodesMessage requestTrieNodes(final Bytes32 rootHash, final List> trieNodesList) { + return (TrieNodesMessage) + snapServer.constructGetTrieNodesResponse( + GetTrieNodesMessage.create(Hash.wrap(rootHash), trieNodesList) + .wrapMessageData(BigInteger.ONE)); + } + + ByteCodesMessage requestByteCodes(final List codeHashes) { + return (ByteCodesMessage) + snapServer.constructGetBytecodesResponse( + GetByteCodesMessage.create(codeHashes).wrapMessageData(BigInteger.ONE)); + } + + AccountRangeMessage.AccountRangeData getAndVerifyAccountRangeData( + final AccountRangeMessage range, final int expectedSize) { + assertThat(range).isNotNull(); + var accountData = range.accountData(false); + assertThat(accountData).isNotNull(); + assertThat(accountData.accounts().size()).isEqualTo(expectedSize); + return accountData; + } +} diff --git a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/SnapProtocolManagerTestUtil.java b/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/SnapProtocolManagerTestUtil.java deleted file mode 100644 index 4fd88a0f3ed..00000000000 --- a/ethereum/eth/src/test/java/org/hyperledger/besu/ethereum/eth/manager/task/SnapProtocolManagerTestUtil.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright contributors to Hyperledger Besu - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on - * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the - * specific language governing permissions and limitations under the License. - * - * SPDX-License-Identifier: Apache-2.0 - */ -package org.hyperledger.besu.ethereum.eth.manager.task; - -import org.hyperledger.besu.ethereum.core.BlockchainSetupUtil; -import org.hyperledger.besu.ethereum.eth.manager.EthMessages; -import org.hyperledger.besu.ethereum.eth.manager.EthPeers; -import org.hyperledger.besu.ethereum.eth.manager.EthProtocolManager; -import org.hyperledger.besu.ethereum.eth.manager.RespondingEthPeer; -import org.hyperledger.besu.ethereum.eth.manager.snap.SnapProtocolManager; -import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive; -import org.hyperledger.besu.plugin.services.storage.DataStorageFormat; - -import java.util.Collections; - -public class SnapProtocolManagerTestUtil { - - public static SnapProtocolManager create(final EthPeers ethPeers) { - return create( - BlockchainSetupUtil.forTesting(DataStorageFormat.FOREST).getWorldArchive(), ethPeers); - } - - public static SnapProtocolManager create( - final WorldStateArchive worldStateArchive, final EthPeers ethPeers) { - - EthMessages messages = new EthMessages(); - - return new SnapProtocolManager(Collections.emptyList(), ethPeers, messages, worldStateArchive); - } - - public static SnapProtocolManager create( - final WorldStateArchive worldStateArchive, - final EthPeers ethPeers, - final EthMessages snapMessages) { - return new SnapProtocolManager( - Collections.emptyList(), ethPeers, snapMessages, worldStateArchive); - } - - public static RespondingEthPeer createPeer( - final EthProtocolManager ethProtocolManager, - final SnapProtocolManager snapProtocolManager, - final long estimatedHeight) { - return RespondingEthPeer.builder() - .ethProtocolManager(ethProtocolManager) - .snapProtocolManager(snapProtocolManager) - .estimatedHeight(estimatedHeight) - .build(); - } -} diff --git a/ethereum/trie/src/main/java/org/hyperledger/besu/ethereum/trie/CompactEncoding.java b/ethereum/trie/src/main/java/org/hyperledger/besu/ethereum/trie/CompactEncoding.java index 6303c63812f..4e9eaf92087 100644 --- a/ethereum/trie/src/main/java/org/hyperledger/besu/ethereum/trie/CompactEncoding.java +++ b/ethereum/trie/src/main/java/org/hyperledger/besu/ethereum/trie/CompactEncoding.java @@ -24,6 +24,13 @@ private CompactEncoding() {} public static final byte LEAF_TERMINATOR = 0x10; + /** + * Converts a byte sequence into a path by splitting each byte into two nibbles. The resulting + * path is terminated with a leaf terminator. + * + * @param bytes the byte sequence to convert into a path + * @return the resulting path + */ public static Bytes bytesToPath(final Bytes bytes) { final MutableBytes path = MutableBytes.create(bytes.size() * 2 + 1); int j = 0; @@ -36,6 +43,15 @@ public static Bytes bytesToPath(final Bytes bytes) { return path; } + /** + * Converts a path into a byte sequence by combining each pair of nibbles into a byte. The path + * must be a leaf path, i.e., it must be terminated with a leaf terminator. + * + * @param path the path to convert into a byte sequence + * @return the resulting byte sequence + * @throws IllegalArgumentException if the path is empty or not a leaf path, or if it contains + * elements larger than a nibble + */ public static Bytes pathToBytes(final Bytes path) { checkArgument(!path.isEmpty(), "Path must not be empty"); checkArgument(path.get(path.size() - 1) == LEAF_TERMINATOR, "Path must be a leaf path"); @@ -52,6 +68,14 @@ public static Bytes pathToBytes(final Bytes path) { return bytes; } + /** + * Encodes a path into a compact form. The encoding includes a metadata byte that indicates + * whether the path is a leaf path and whether its length is odd or even. + * + * @param path the path to encode + * @return the encoded path + * @throws IllegalArgumentException if the path contains elements larger than a nibble + */ public static Bytes encode(final Bytes path) { int size = path.size(); final boolean isLeaf = size > 0 && path.get(size - 1) == LEAF_TERMINATOR; @@ -88,6 +112,14 @@ public static Bytes encode(final Bytes path) { return encoded; } + /** + * Decodes a path from its compact form. The decoding process takes into account the metadata byte + * that indicates whether the path is a leaf path and whether its length is odd or even. + * + * @param encoded the encoded path to decode + * @return the decoded path + * @throws IllegalArgumentException if the encoded path is empty or its metadata byte is invalid + */ public static Bytes decode(final Bytes encoded) { final int size = encoded.size(); checkArgument(size > 0); diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java index e3d82ca9dc4..930783d7be1 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/InMemoryKeyValueStorage.java @@ -18,7 +18,7 @@ import java.io.PrintStream; import java.nio.charset.StandardCharsets; -import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -57,9 +57,9 @@ public boolean isEligibleToHighSpecFlag() { } }; - private static ConcurrentMap>> asSegmentMap( - final Map> initialMap) { - final ConcurrentMap>> segmentMap = + private static ConcurrentMap>> + asSegmentMap(final NavigableMap> initialMap) { + final ConcurrentMap>> segmentMap = new ConcurrentHashMap<>(); segmentMap.put(SEGMENT_IDENTIFIER, initialMap); return segmentMap; @@ -78,7 +78,7 @@ public InMemoryKeyValueStorage() { * * @param initialMap the initial map */ - public InMemoryKeyValueStorage(final Map> initialMap) { + public InMemoryKeyValueStorage(final NavigableMap> initialMap) { super(SEGMENT_IDENTIFIER, new SegmentedInMemoryKeyValueStorage(asSegmentMap(initialMap))); rwLock = ((SegmentedInMemoryKeyValueStorage) storage).rwLock; } diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LayeredKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LayeredKeyValueStorage.java index 6713772a03d..6416c4d5463 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LayeredKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/LayeredKeyValueStorage.java @@ -15,6 +15,10 @@ */ package org.hyperledger.besu.services.kvstore; +import static java.util.Spliterator.DISTINCT; +import static java.util.Spliterator.ORDERED; +import static java.util.Spliterator.SORTED; + import org.hyperledger.besu.plugin.services.exception.StorageException; import org.hyperledger.besu.plugin.services.storage.SegmentIdentifier; import org.hyperledger.besu.plugin.services.storage.SegmentedKeyValueStorage; @@ -22,13 +26,17 @@ import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; +import java.util.Spliterators; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; import java.util.stream.Stream; +import java.util.stream.StreamSupport; import com.google.common.collect.Streams; import org.apache.commons.lang3.tuple.Pair; @@ -60,7 +68,7 @@ public LayeredKeyValueStorage(final SegmentedKeyValueStorage parent) { * @param parent the parent key value storage for this layered storage. */ public LayeredKeyValueStorage( - final ConcurrentMap>> map, + final ConcurrentMap>> map, final SegmentedKeyValueStorage parent) { super(map); this.parent = parent; @@ -82,7 +90,7 @@ public Optional get(final SegmentIdentifier segmentId, final byte[] key) try { Bytes wrapKey = Bytes.wrap(key); final Optional foundKey = - hashValueStore.computeIfAbsent(segmentId, __ -> new HashMap<>()).get(wrapKey); + hashValueStore.computeIfAbsent(segmentId, __ -> newSegmentMap()).get(wrapKey); if (foundKey == null) { return parent.get(segmentId, key); } else { @@ -116,28 +124,67 @@ public Optional getNearestTo( @Override public Stream> stream(final SegmentIdentifier segmentId) { throwIfClosed(); + var ourLayerState = hashValueStore.computeIfAbsent(segmentId, s -> newSegmentMap()); - final Lock lock = rwLock.readLock(); - lock.lock(); - try { - // copy of our in memory store to use for streaming and filtering: - var ourLayerState = - Optional.ofNullable(hashValueStore.get(segmentId)) - .map(HashMap::new) - .orElse(new HashMap<>()); - - return Streams.concat( - ourLayerState.entrySet().stream() - .filter(entry -> entry.getValue().isPresent()) - .map( - bytesEntry -> - Pair.of(bytesEntry.getKey().toArrayUnsafe(), bytesEntry.getValue().get())) - // since we are layered, concat a parent stream filtered by our map entries: - , - parent.stream(segmentId).filter(e -> !ourLayerState.containsKey(Bytes.of(e.getLeft())))); - } finally { - lock.unlock(); + if (ourLayerState == null) { + return parent.stream(segmentId); } + + // otherwise, interleave the sorted streams: + final PeekingIterator>> ourIterator = + new PeekingIterator<>( + ourLayerState.entrySet().stream() + .filter(entry -> entry.getValue().isPresent()) + .iterator()); + + final PeekingIterator> parentIterator = + new PeekingIterator<>(parent.stream(segmentId).iterator()); + + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize( + new Iterator<>() { + @Override + public boolean hasNext() { + return ourIterator.hasNext() || parentIterator.hasNext(); + } + + private Pair mapEntryToPair( + final Map.Entry> entry) { + return Optional.of(entry) + .map( + e -> + Pair.of( + e.getKey().toArrayUnsafe(), + e.getValue().orElseGet(() -> new byte[0]))) + .get(); + } + + @Override + public Pair next() { + var ourPeek = ourIterator.peek(); + var parentPeek = parentIterator.peek(); + + if (ourPeek == null || parentPeek == null) { + return ourPeek == null + ? parentIterator.next() + : mapEntryToPair(ourIterator.next()); + } + + // otherwise compare: + int comparison = ourPeek.getKey().compareTo(Bytes.wrap(parentPeek.getKey())); + if (comparison < 0) { + return mapEntryToPair(ourIterator.next()); + } else if (comparison == 0) { + // skip dupe key from parent, return ours: + parentIterator.next(); + return mapEntryToPair(ourIterator.next()); + } else { + return parentIterator.next(); + } + } + }, + ORDERED | SORTED | DISTINCT), + false); } @Override @@ -186,7 +233,7 @@ public Stream streamKeys(final SegmentIdentifier segmentId) { @Override public boolean tryDelete(final SegmentIdentifier segmentId, final byte[] key) { hashValueStore - .computeIfAbsent(segmentId, __ -> new HashMap<>()) + .computeIfAbsent(segmentId, __ -> newSegmentMap()) .put(Bytes.wrap(key), Optional.empty()); return true; } @@ -206,7 +253,7 @@ public void commit() throws StorageException { .forEach( entry -> hashValueStore - .computeIfAbsent(entry.getKey(), __ -> new HashMap<>()) + .computeIfAbsent(entry.getKey(), __ -> newSegmentMap()) .putAll(entry.getValue())); // put empty rather than remove in order to not ask parent in case of deletion @@ -214,7 +261,7 @@ public void commit() throws StorageException { .forEach( segmentEntry -> hashValueStore - .computeIfAbsent(segmentEntry.getKey(), __ -> new HashMap<>()) + .computeIfAbsent(segmentEntry.getKey(), __ -> newSegmentMap()) .putAll( segmentEntry.getValue().stream() .collect( @@ -246,4 +293,30 @@ private void throwIfClosed() { throw new StorageException("Storage has been closed"); } } + + private static class PeekingIterator implements Iterator { + private final Iterator iterator; + private E next; + + public PeekingIterator(final Iterator iterator) { + this.iterator = iterator; + this.next = iterator.hasNext() ? iterator.next() : null; + } + + public E peek() { + return next; + } + + @Override + public boolean hasNext() { + return next != null; + } + + @Override + public E next() { + E oldNext = next; + next = iterator.hasNext() ? iterator.next() : null; + return oldNext; + } + } } diff --git a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedInMemoryKeyValueStorage.java b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedInMemoryKeyValueStorage.java index ff8ca7249cb..0467449523a 100644 --- a/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedInMemoryKeyValueStorage.java +++ b/services/kvstore/src/main/java/org/hyperledger/besu/services/kvstore/SegmentedInMemoryKeyValueStorage.java @@ -24,15 +24,18 @@ import org.hyperledger.besu.plugin.services.storage.SnappedKeyValueStorage; import java.io.PrintStream; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.NavigableMap; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -49,11 +52,35 @@ public class SegmentedInMemoryKeyValueStorage implements SnappedKeyValueStorage, SnappableKeyValueStorage, SegmentedKeyValueStorage { /** protected access for the backing hash map. */ - final ConcurrentMap>> hashValueStore; + final ConcurrentMap>> hashValueStore; /** protected access to the rw lock. */ protected final ReadWriteLock rwLock = new ReentrantReadWriteLock(); + /** + * Create a navigable segment map, with a compatible Bytes comparator + * + * @return segment map + */ + protected static NavigableMap> newSegmentMap() { + return newSegmentMap(Collections.emptyMap()); + } + + /** + * Create and populate a navigable segment map, with a compatible Bytes comparator. + * + * @param sourceMap sourcemap to initialize the segmentmap with. + * @return populated segment map + */ + protected static NavigableMap> newSegmentMap( + final Map> sourceMap) { + // comparing by string to prevent Bytes comparator from collapsing zeroes + NavigableMap> segMap = + new ConcurrentSkipListMap<>(Comparator.comparing(Bytes::toHexString)); + segMap.putAll(sourceMap); + return segMap; + } + /** Instantiates a new In memory key value storage. */ public SegmentedInMemoryKeyValueStorage() { this(new ConcurrentHashMap<>()); @@ -65,7 +92,8 @@ public SegmentedInMemoryKeyValueStorage() { * @param hashValueStore the hash value store */ protected SegmentedInMemoryKeyValueStorage( - final ConcurrentMap>> hashValueStore) { + final ConcurrentMap>> + hashValueStore) { this.hashValueStore = hashValueStore; } @@ -79,8 +107,8 @@ public SegmentedInMemoryKeyValueStorage(final List segments) segments.stream() .collect( Collectors - .>> - toConcurrentMap(s -> s, s -> new ConcurrentHashMap<>()))); + .>> + toConcurrentMap(s -> s, s -> newSegmentMap()))); } @Override @@ -107,7 +135,7 @@ public Optional get(final SegmentIdentifier segmentIdentifier, final byt lock.lock(); try { return hashValueStore - .computeIfAbsent(segmentIdentifier, s -> new HashMap<>()) + .computeIfAbsent(segmentIdentifier, s -> newSegmentMap()) .getOrDefault(Bytes.wrap(key), Optional.empty()); } finally { lock.unlock(); @@ -127,7 +155,7 @@ public Optional getNearestTo( (Map.Entry> a) -> a.getKey().commonPrefixLength(key)) .thenComparing(Map.Entry.comparingByKey()); return this.hashValueStore - .computeIfAbsent(segmentIdentifier, s -> new HashMap<>()) + .computeIfAbsent(segmentIdentifier, s -> newSegmentMap()) .entrySet() .stream() // only return keys equal to or less than @@ -164,9 +192,10 @@ public Stream> stream(final SegmentIdentifier segmentIdenti lock.lock(); try { return ImmutableSet.copyOf( - hashValueStore.computeIfAbsent(segmentIdentifier, s -> new HashMap<>()).entrySet()) + hashValueStore.computeIfAbsent(segmentIdentifier, s -> newSegmentMap()).entrySet()) .stream() .filter(bytesEntry -> bytesEntry.getValue().isPresent()) + .sorted(Map.Entry.comparingByKey()) .map( bytesEntry -> Pair.of(bytesEntry.getKey().toArrayUnsafe(), bytesEntry.getValue().get())); @@ -199,7 +228,7 @@ public Stream streamKeys(final SegmentIdentifier segmentIdentifier) { lock.lock(); try { return ImmutableMap.copyOf( - hashValueStore.computeIfAbsent(segmentIdentifier, s -> new HashMap<>())) + hashValueStore.computeIfAbsent(segmentIdentifier, s -> newSegmentMap())) .entrySet() .stream() .filter(bytesEntry -> bytesEntry.getValue().isPresent()) @@ -244,8 +273,7 @@ public SegmentedInMemoryKeyValueStorage takeSnapshot() { return new SegmentedInMemoryKeyValueStorage( hashValueStore.entrySet().stream() .collect( - Collectors.toConcurrentMap( - Map.Entry::getKey, e -> new ConcurrentHashMap<>(e.getValue())))); + Collectors.toConcurrentMap(Map.Entry::getKey, e -> newSegmentMap(e.getValue())))); } @Override @@ -287,7 +315,7 @@ public void commit() throws StorageException { .forEach( entry -> hashValueStore - .computeIfAbsent(entry.getKey(), __ -> new HashMap<>()) + .computeIfAbsent(entry.getKey(), __ -> newSegmentMap()) .putAll(entry.getValue())); removedKeys.entrySet().stream() @@ -295,7 +323,7 @@ public void commit() throws StorageException { entry -> { var keyset = hashValueStore - .computeIfAbsent(entry.getKey(), __ -> new HashMap<>()) + .computeIfAbsent(entry.getKey(), __ -> newSegmentMap()) .keySet(); keyset.removeAll(entry.getValue()); }); From 73567a25448609631965178435a7062f99e98ab6 Mon Sep 17 00:00:00 2001 From: garyschulte Date: Fri, 15 Mar 2024 13:12:42 -0700 Subject: [PATCH 02/16] address potentially null storage format that occurrs in some unit tests Signed-off-by: garyschulte --- .../ethereum/worldstate/WorldStateStorageCoordinator.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateStorageCoordinator.java b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateStorageCoordinator.java index c35fe423aee..db45b6aefaa 100644 --- a/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateStorageCoordinator.java +++ b/ethereum/core/src/main/java/org/hyperledger/besu/ethereum/worldstate/WorldStateStorageCoordinator.java @@ -69,7 +69,11 @@ public STRATEGY getStrategy( } public boolean isMatchingFlatMode(final FlatDbMode flatDbMode) { - if (getDataStorageFormat().equals(DataStorageFormat.BONSAI)) { + Optional storageFormat = + Optional.ofNullable(getDataStorageFormat()) + .filter(format -> format.equals(DataStorageFormat.BONSAI)); + + if (storageFormat.isPresent()) { final BonsaiWorldStateKeyValueStorage bonsaiWorldStateStorageStrategy = (BonsaiWorldStateKeyValueStorage) worldStateKeyValueStorage(); return bonsaiWorldStateStorageStrategy.getFlatDbMode().equals(flatDbMode); From d3a91aa3732de1002ae79f5b46957e4e7cf362fe Mon Sep 17 00:00:00 2001 From: garyschulte Date: Fri, 15 Mar 2024 17:10:13 -0700 Subject: [PATCH 03/16] add snapserver experimental flag, refactor snap server start Signed-off-by: garyschulte --- .../org/hyperledger/besu/cli/BesuCommand.java | 2 + .../cli/ConfigurationOverviewBuilder.java | 16 ++ .../options/unstable/SynchronizerOptions.java | 22 ++- .../controller/BesuControllerBuilder.java | 14 +- .../common/storage/flat/FlatDbStrategy.java | 4 + .../storage/flat/FlatDbStrategyProvider.java | 2 +- .../worldstate/DataStorageConfiguration.java | 12 -- .../WorldStateStorageCoordinator.java | 5 + .../ethereum/eth/manager/snap/SnapServer.java | 158 +++++++++--------- .../sync/snapsync/SnapSyncConfiguration.java | 7 + .../eth/manager/snap/SnapServerTest.java | 35 +++- 11 files changed, 178 insertions(+), 99 deletions(-) diff --git a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java index 26422b0f660..624699f70c5 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/BesuCommand.java @@ -2732,6 +2732,8 @@ private String generateConfigurationOverview() { getDataStorageConfiguration().getUnstable().getBonsaiTrieLogPruningWindowSize()); } + builder.setSnapServerEnabled(this.unstableSynchronizerOptions.isSnapsyncServerEnabled()); + builder.setTxPoolImplementation(buildTransactionPoolConfiguration().getTxPoolImplementation()); builder.setWorldStateUpdateMode(unstableEvmOptions.toDomainObject().worldUpdaterMode()); diff --git a/besu/src/main/java/org/hyperledger/besu/cli/ConfigurationOverviewBuilder.java b/besu/src/main/java/org/hyperledger/besu/cli/ConfigurationOverviewBuilder.java index bf03c675d8d..51c88798158 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/ConfigurationOverviewBuilder.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/ConfigurationOverviewBuilder.java @@ -55,6 +55,7 @@ public class ConfigurationOverviewBuilder { private boolean isBonsaiLimitTrieLogsEnabled = false; private long trieLogRetentionLimit = 0; private Integer trieLogsPruningWindowSize = null; + private boolean isSnapServerEnabled = false; private TransactionPoolConfiguration.Implementation txPoolImplementation; private EvmConfiguration.WorldUpdaterMode worldStateUpdateMode; private Map environment; @@ -219,6 +220,17 @@ public ConfigurationOverviewBuilder setTrieLogRetentionLimit(final long limit) { return this; } + /** + * Sets snap server enabled/disabled + * + * @param snapServerEnabled bool to indicate if is snap server is enabled + * @return the builder + */ + public ConfigurationOverviewBuilder setSnapServerEnabled(final boolean snapServerEnabled) { + isSnapServerEnabled = snapServerEnabled; + return this; + } + /** * Sets trie logs pruning window size * @@ -339,6 +351,10 @@ public String build() { lines.add("Using " + worldStateUpdateMode + " worldstate update mode"); + if (isSnapServerEnabled) { + lines.add("Experimental Snap Sync server enabled"); + } + if (isBonsaiLimitTrieLogsEnabled) { final StringBuilder trieLogPruningString = new StringBuilder(); trieLogPruningString diff --git a/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java index 7c9de815797..82393adbcf5 100644 --- a/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java +++ b/besu/src/main/java/org/hyperledger/besu/cli/options/unstable/SynchronizerOptions.java @@ -82,6 +82,8 @@ public class SynchronizerOptions implements CLIOptions