Skip to content

Commit

Permalink
Bugfix snapshot transaction segfaults after storage truncation (hyper…
Browse files Browse the repository at this point in the history
…ledger#4786)

* subscribe snapshot worldstates to parent worldstate storage events like clear and clearFlatDatabase and close as appropriate to avoid segfaults
* fix for direct snapshot creation when using snapshot archive
* ensure we only prune bonsai worldstates if we have more than the configured number of retained states

Signed-off-by: garyschulte <garyschulte@gmail.com>
  • Loading branch information
garyschulte committed Jan 14, 2023
1 parent 364a808 commit 2a89edd
Show file tree
Hide file tree
Showing 21 changed files with 505 additions and 193 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,8 @@ private MutableWorldState duplicateWorldStateAtParent() {
if (ws.isPersistable()) {
return ws;
} else {
var wsCopy = ws.copy();
try {
ws.close();
} catch (Exception ex) {
LOG.error(
"unexpected error closing non-peristable worldstate + "
+ parentHeader.toLogString(),
ex);
}
return wsCopy;
// non-persistable worldstates should return a copy which is persistable:
return ws.copy();
}
})
.orElseThrow(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import static org.hyperledger.besu.util.Slf4jLambdaHelper.debugLambda;

import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.TrieLogManager.CachedWorldState;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage.BonsaiUpdater;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
Expand All @@ -28,27 +28,29 @@
import java.util.Optional;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import org.apache.tuweni.bytes.Bytes32;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AbstractTrieLogManager<T extends CachedWorldState> implements TrieLogManager {
public abstract class AbstractTrieLogManager<T extends MutableWorldState>
implements TrieLogManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTrieLogManager.class);
public static final long RETAINED_LAYERS = 512; // at least 256 + typical rollbacks

protected final Blockchain blockchain;
protected final BonsaiWorldStateKeyValueStorage worldStateStorage;
protected final BonsaiWorldStateKeyValueStorage rootWorldStateStorage;

protected final Map<Bytes32, T> cachedWorldStatesByHash;
protected final Map<Bytes32, CachedWorldState<T>> cachedWorldStatesByHash;
protected final long maxLayersToLoad;

public AbstractTrieLogManager(
AbstractTrieLogManager(
final Blockchain blockchain,
final BonsaiWorldStateKeyValueStorage worldStateStorage,
final long maxLayersToLoad,
final Map<Bytes32, T> cachedWorldStatesByHash) {
final Map<Bytes32, CachedWorldState<T>> cachedWorldStatesByHash) {
this.blockchain = blockchain;
this.worldStateStorage = worldStateStorage;
this.rootWorldStateStorage = worldStateStorage;
this.cachedWorldStatesByHash = cachedWorldStatesByHash;
this.maxLayersToLoad = maxLayersToLoad;
}
Expand All @@ -57,19 +59,24 @@ public AbstractTrieLogManager(
public synchronized void saveTrieLog(
final BonsaiWorldStateArchive worldStateArchive,
final BonsaiWorldStateUpdater localUpdater,
final Hash worldStateRootHash,
final BlockHeader blockHeader) {
final Hash forWorldStateRootHash,
final BlockHeader forBlockHeader,
final BonsaiPersistedWorldState forWorldState) {
// do not overwrite a trielog layer that already exists in the database.
// if it's only in memory we need to save it
// for example, in case of reorg we don't replace a trielog layer
if (worldStateStorage.getTrieLog(blockHeader.getHash()).isEmpty()) {
final BonsaiWorldStateKeyValueStorage.BonsaiUpdater stateUpdater =
worldStateStorage.updater();
if (rootWorldStateStorage.getTrieLog(forBlockHeader.getHash()).isEmpty()) {
final BonsaiUpdater stateUpdater = forWorldState.getWorldStateStorage().updater();
boolean success = false;
try {
final TrieLogLayer trieLog =
prepareTrieLog(blockHeader, worldStateRootHash, localUpdater, worldStateArchive);
persistTrieLog(blockHeader, worldStateRootHash, trieLog, stateUpdater);
prepareTrieLog(
forBlockHeader,
forWorldStateRootHash,
localUpdater,
worldStateArchive,
forWorldState);
persistTrieLog(forBlockHeader, forWorldStateRootHash, trieLog, stateUpdater);
success = true;
} finally {
if (success) {
Expand All @@ -81,45 +88,57 @@ public synchronized void saveTrieLog(
}
}

private TrieLogLayer prepareTrieLog(
protected abstract void addCachedLayer(
final BlockHeader blockHeader,
final Hash currentWorldStateRootHash,
final Hash worldStateRootHash,
final TrieLogLayer trieLog,
final BonsaiWorldStateArchive worldStateArchive,
final BonsaiPersistedWorldState forWorldState);

@VisibleForTesting
TrieLogLayer prepareTrieLog(
final BlockHeader blockHeader,
final Hash worldStateRootHash,
final BonsaiWorldStateUpdater localUpdater,
final BonsaiWorldStateArchive worldStateArchive) {
final BonsaiWorldStateArchive worldStateArchive,
final BonsaiPersistedWorldState forWorldState) {
debugLambda(LOG, "Adding layered world state for {}", blockHeader::toLogString);
final TrieLogLayer trieLog = localUpdater.generateTrieLog(blockHeader.getBlockHash());
trieLog.freeze();
addCachedLayer(blockHeader, currentWorldStateRootHash, trieLog, worldStateArchive);
addCachedLayer(blockHeader, worldStateRootHash, trieLog, worldStateArchive, forWorldState);
scrubCachedLayers(blockHeader.getNumber());
return trieLog;
}

synchronized void scrubCachedLayers(final long newMaxHeight) {
final long waterline = newMaxHeight - RETAINED_LAYERS;
cachedWorldStatesByHash.values().stream()
.filter(layer -> layer.getHeight() < waterline)
.collect(Collectors.toList())
.stream()
.forEach(
layer -> {
cachedWorldStatesByHash.remove(layer.getTrieLog().getBlockHash());
Optional.ofNullable(layer.getMutableWorldState())
.ifPresent(
ws -> {
try {
ws.close();
} catch (Exception e) {
LOG.warn("Error closing bonsai worldstate layer", e);
}
});
});
if (cachedWorldStatesByHash.size() > RETAINED_LAYERS) {
final long waterline = newMaxHeight - RETAINED_LAYERS;
cachedWorldStatesByHash.values().stream()
.filter(layer -> layer.getHeight() < waterline)
.collect(Collectors.toList())
.stream()
.forEach(
layer -> {
cachedWorldStatesByHash.remove(layer.getTrieLog().getBlockHash());
layer.dispose();
Optional.ofNullable(layer.getMutableWorldState())
.ifPresent(
ws -> {
try {
ws.close();
} catch (Exception e) {
LOG.warn("Error closing bonsai worldstate layer", e);
}
});
});
}
}

private void persistTrieLog(
final BlockHeader blockHeader,
final Hash worldStateRootHash,
final TrieLogLayer trieLog,
final BonsaiWorldStateKeyValueStorage.BonsaiUpdater stateUpdater) {
final BonsaiUpdater stateUpdater) {
debugLambda(
LOG,
"Persisting trie log for block hash {} and world state root {}",
Expand All @@ -136,7 +155,7 @@ private void persistTrieLog(
public Optional<MutableWorldState> getBonsaiCachedWorldState(final Hash blockHash) {
if (cachedWorldStatesByHash.containsKey(blockHash)) {
return Optional.ofNullable(cachedWorldStatesByHash.get(blockHash))
.map(T::getMutableWorldState);
.map(CachedWorldState::getMutableWorldState);
}
return Optional.empty();
}
Expand All @@ -151,7 +170,7 @@ public Optional<TrieLogLayer> getTrieLogLayer(final Hash blockHash) {
if (cachedWorldStatesByHash.containsKey(blockHash)) {
return Optional.of(cachedWorldStatesByHash.get(blockHash).getTrieLog());
} else {
return worldStateStorage.getTrieLog(blockHash).map(TrieLogLayer::fromBytes);
return rootWorldStateStorage.getTrieLog(blockHash).map(TrieLogLayer::fromBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
import org.hyperledger.besu.ethereum.bonsai.BonsaiWorldStateKeyValueStorage.BonsaiStorageSubscriber;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.trie.StoredMerklePatriciaTrie;

Expand All @@ -27,14 +28,17 @@
import org.apache.tuweni.bytes.Bytes;
import org.apache.tuweni.units.bigints.UInt256;

public class BonsaiInMemoryWorldState extends BonsaiPersistedWorldState {
public class BonsaiInMemoryWorldState extends BonsaiPersistedWorldState
implements BonsaiStorageSubscriber {

private boolean isPersisted = false;
private final Long worldstateSubcriberId;

public BonsaiInMemoryWorldState(
final BonsaiWorldStateArchive archive,
final BonsaiWorldStateKeyValueStorage worldStateStorage) {
super(archive, worldStateStorage);
worldstateSubcriberId = worldStateStorage.subscribe(this);
}

@Override
Expand Down Expand Up @@ -146,7 +150,7 @@ public void persist(final BlockHeader blockHeader) {
final Hash newWorldStateRootHash = rootHash(localUpdater);
archive
.getTrieLogManager()
.saveTrieLog(archive, localUpdater, newWorldStateRootHash, blockHeader);
.saveTrieLog(archive, localUpdater, newWorldStateRootHash, blockHeader, this);
worldStateRootHash = newWorldStateRootHash;
worldStateBlockHash = blockHeader.getBlockHash();
isPersisted = true;
Expand All @@ -155,6 +159,7 @@ public void persist(final BlockHeader blockHeader) {
@Override
public void close() throws Exception {
// if storage is snapshot-based we need to close:
worldStateStorage.unSubscribe(worldstateSubcriberId);
worldStateStorage.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MutableWorldState;
import org.hyperledger.besu.ethereum.core.SnapshotMutableWorldState;
import org.hyperledger.besu.ethereum.worldstate.StateTrieAccountValue;
import org.hyperledger.besu.evm.account.Account;
import org.hyperledger.besu.evm.worldstate.WorldState;
Expand Down Expand Up @@ -261,14 +262,18 @@ public Stream<StreamableAccount> streamAccounts(final Bytes32 startKeyHash, fina
@MustBeClosed
public MutableWorldState copy() {
// return an in-memory worldstate that is based on a persisted snapshot for this blockhash.
return archive
.getMutableSnapshot(this.blockHash())
.map(BonsaiSnapshotWorldState.class::cast)
.map(snapshot -> new BonsaiInMemoryWorldState(archive, snapshot.getWorldStateStorage()))
.orElseThrow(
() ->
new StorageException(
"Unable to copy Layered Worldstate for " + blockHash().toHexString()));
try (SnapshotMutableWorldState snapshot =
archive
.getMutableSnapshot(this.blockHash())
.map(SnapshotMutableWorldState.class::cast)
.orElseThrow(
() ->
new StorageException(
"Unable to copy Layered Worldstate for " + blockHash().toHexString()))) {
return new BonsaiInMemoryWorldState(archive, snapshot.getWorldStateStorage());
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,11 @@ public BonsaiPersistedWorldState(
(addr, value) ->
archive
.getCachedMerkleTrieLoader()
.preLoadAccount(worldStateStorage, worldStateRootHash, addr),
.preLoadAccount(getWorldStateStorage(), worldStateRootHash, addr),
(addr, value) ->
archive
.getCachedMerkleTrieLoader()
.preLoadStorageSlot(worldStateStorage, addr, value));
.preLoadStorageSlot(getWorldStateStorage(), addr, value));
}

public BonsaiWorldStateArchive getArchive() {
Expand Down Expand Up @@ -282,6 +282,7 @@ public void persist(final BlockHeader blockHeader) {

final BonsaiWorldStateUpdater localUpdater = updater.copy();
final BonsaiWorldStateKeyValueStorage.BonsaiUpdater stateUpdater = worldStateStorage.updater();
Runnable saveTrieLog = () -> {};

try {
final Hash newWorldStateRootHash = calculateRootHash(stateUpdater, localUpdater);
Expand All @@ -296,9 +297,12 @@ public void persist(final BlockHeader blockHeader) {
+ " calculated "
+ newWorldStateRootHash.toHexString());
}
archive
.getTrieLogManager()
.saveTrieLog(archive, localUpdater, newWorldStateRootHash, blockHeader);
saveTrieLog =
() ->
archive
.getTrieLogManager()
.saveTrieLog(archive, localUpdater, newWorldStateRootHash, blockHeader, this);

stateUpdater
.getTrieBranchStorageTransaction()
.put(WORLD_BLOCK_HASH_KEY, blockHeader.getHash().toArrayUnsafe());
Expand All @@ -317,6 +321,7 @@ public void persist(final BlockHeader blockHeader) {
if (success) {
stateUpdater.commit();
updater.reset();
saveTrieLog.run();
} else {
stateUpdater.rollback();
updater.reset();
Expand Down
Loading

0 comments on commit 2a89edd

Please sign in to comment.