Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fine tune already seen txs tracker when a tx is removed from the pool #7755

Merged
merged 3 commits into from
Oct 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Upcoming Breaking Changes

### Additions and Improvements
- Fine tune already seen txs tracker when a tx is removed from the pool [#7755](https://github.com/hyperledger/besu/pull/7755)

### Bug fixes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void removeTransactionAddedListener(final long listenerIdentifier) {
public long addTransactionDroppedListener(
final TransactionDroppedListener transactionDroppedListener) {
return transactionPool.subscribeDroppedTransactions(
transactionDroppedListener::onTransactionDropped);
(transaction, reason) -> transactionDroppedListener.onTransactionDropped(transaction));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.RemovalReason;

import java.util.List;

Expand All @@ -34,7 +35,7 @@ public PendingTransactionDroppedSubscriptionService(
}

@Override
public void onTransactionDropped(final Transaction transaction) {
public void onTransactionDropped(final Transaction transaction, final RemovalReason reason) {
notifySubscribers(transaction.getHash());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.transactions.RemovalReason;

import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -47,6 +48,18 @@ public class PendingTransactionDroppedSubscriptionServiceTest {

private static final Hash TX_ONE =
Hash.fromHexString("0x15876958423545c3c7b0fcf9be8ffb543305ee1b43db87ed380dcf0cd16589f7");
private static final RemovalReason DUMMY_REMOVAL_REASON =
new RemovalReason() {
@Override
public String label() {
return "";
}

@Override
public boolean stopTracking() {
return false;
}
};

@Mock private SubscriptionManager subscriptionManager;
@Mock private Blockchain blockchain;
Expand All @@ -65,7 +78,7 @@ public void onTransactionAddedMustSendMessage() {
setUpSubscriptions(subscriptionIds);
final Transaction pending = transaction(TX_ONE);

service.onTransactionDropped(pending);
service.onTransactionDropped(pending, DUMMY_REMOVAL_REASON);

verifyNoInteractions(block);
verifyNoInteractions(blockchain);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PeerTransactionTracker implements EthPeer.DisconnectCallback {
public class PeerTransactionTracker
implements EthPeer.DisconnectCallback, PendingTransactionDroppedListener {
private static final Logger LOG = LoggerFactory.getLogger(PeerTransactionTracker.class);

private static final int MAX_TRACKED_SEEN_TRANSACTIONS = 100_000;
Expand Down Expand Up @@ -122,13 +123,14 @@ boolean hasPeerSeenTransaction(final EthPeer peer, final Hash txHash) {
}

private <T> Set<T> createTransactionsSet() {
return Collections.newSetFromMap(
new LinkedHashMap<>(1 << 4, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
return size() > MAX_TRACKED_SEEN_TRANSACTIONS;
}
});
return Collections.synchronizedSet(
Collections.newSetFromMap(
new LinkedHashMap<>(16, 0.75f, true) {
@Override
protected boolean removeEldestEntry(final Map.Entry<T, Boolean> eldest) {
return size() > MAX_TRACKED_SEEN_TRANSACTIONS;
}
}));
}

@Override
Expand Down Expand Up @@ -175,4 +177,11 @@ public void onDisconnect(final EthPeer peer) {
private String logPeerSet(final Set<EthPeer> peers) {
return peers.stream().map(EthPeer::getLoggableId).collect(Collectors.joining(","));
}

@Override
public void onTransactionDropped(final Transaction transaction, final RemovalReason reason) {
if (reason.stopTracking()) {
seenTransactions.values().stream().forEach(st -> st.remove(transaction.getHash()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,5 @@
@FunctionalInterface
public interface PendingTransactionDroppedListener {

void onTransactionDropped(Transaction transaction);
void onTransactionDropped(Transaction transaction, final RemovalReason reason);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright contributors to Hyperledger Besu.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*
* SPDX-License-Identifier: Apache-2.0
*/
package org.hyperledger.besu.ethereum.eth.transactions;

/** The reason why a pending tx has been removed */
public interface RemovalReason {
/**
* Return a label that identify this reason to be used in the metric system.
*
* @return a label
*/
String label();

/**
* Return true if we should stop tracking the tx as already seen
*
* @return true if no more tracking is needed
*/
boolean stopTracking();
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public TransactionPool(
initializeBlobMetrics();
initLogForReplay();
subscribePendingTransactions(this::mapBlobsOnTransactionAdded);
subscribeDroppedTransactions(this::unmapBlobsOnTransactionDropped);
subscribeDroppedTransactions(
(transaction, reason) -> unmapBlobsOnTransactionDropped(transaction));
}

private void initLogForReplay() {
Expand Down Expand Up @@ -720,16 +721,18 @@ class PendingTransactionsListenersProxy {

void subscribe() {
onAddedListenerId = pendingTransactions.subscribePendingTransactions(this::onAdded);
onDroppedListenerId = pendingTransactions.subscribeDroppedTransactions(this::onDropped);
onDroppedListenerId =
pendingTransactions.subscribeDroppedTransactions(
(transaction, reason) -> onDropped(transaction, reason));
}

void unsubscribe() {
pendingTransactions.unsubscribePendingTransactions(onAddedListenerId);
pendingTransactions.unsubscribeDroppedTransactions(onDroppedListenerId);
}

private void onDropped(final Transaction transaction) {
onDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction));
private void onDropped(final Transaction transaction, final RemovalReason reason) {
onDroppedListeners.forEach(listener -> listener.onTransactionDropped(transaction, reason));
}

private void onAdded(final Transaction transaction) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ protected PendingTransaction getEvictable() {
protected void internalRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction removedTx,
final RemovalReason removalReason) {
final LayeredRemovalReason removalReason) {
orderByFee.remove(removedTx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.FOLLOW_INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.FOLLOW_INVALIDATED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason;

import java.util.Map;
import java.util.NavigableMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.REJECTED_UNDERPRICED_REPLACEMENT;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.TRY_NEXT_LAYER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.BELOW_MIN_SCORE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.RemovedFrom.POOL;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.EVICTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.BELOW_MIN_SCORE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.CONFIRMED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.CROSS_LAYER_REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.REPLACED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.RemovedFrom.POOL;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
Expand Down Expand Up @@ -295,7 +295,9 @@ public PendingTransaction promoteFor(
if (remainingPromotionsPerType[txType.ordinal()] > 0) {
senderTxs.pollFirstEntry();
processRemove(
senderTxs, candidateTx.getTransaction(), RemovalReason.LayerMoveReason.PROMOTED);
senderTxs,
candidateTx.getTransaction(),
LayeredRemovalReason.LayerMoveReason.PROMOTED);
metrics.incrementRemoved(candidateTx, "promoted", name());

if (senderTxs.isEmpty()) {
Expand Down Expand Up @@ -386,7 +388,7 @@ protected void replaced(final PendingTransaction replacedTx) {
decreaseCounters(replacedTx);
metrics.incrementRemoved(replacedTx, REPLACED.label(), name());
internalReplaced(replacedTx);
notifyTransactionDropped(replacedTx);
notifyTransactionDropped(replacedTx, REPLACED);
}

protected abstract void internalReplaced(final PendingTransaction replacedTx);
Expand Down Expand Up @@ -415,15 +417,15 @@ private TransactionAddedResult maybeReplaceTransaction(final PendingTransaction
protected PendingTransaction processRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final Transaction transaction,
final RemovalReason removalReason) {
final LayeredRemovalReason removalReason) {
final PendingTransaction removedTx = pendingTransactions.remove(transaction.getHash());

if (removedTx != null) {
decreaseCounters(removedTx);
metrics.incrementRemoved(removedTx, removalReason.label(), name());
internalRemove(senderTxs, removedTx, removalReason);
if (removalReason.removedFrom().equals(POOL)) {
notifyTransactionDropped(removedTx);
notifyTransactionDropped(removedTx, removalReason);
}
}
return removedTx;
Expand All @@ -432,7 +434,7 @@ protected PendingTransaction processRemove(
protected PendingTransaction processEvict(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction evictedTx,
final RemovalReason reason) {
final LayeredRemovalReason reason) {
final PendingTransaction removedTx = pendingTransactions.remove(evictedTx.getHash());
if (removedTx != null) {
decreaseCounters(evictedTx);
Expand Down Expand Up @@ -545,7 +547,7 @@ protected abstract void internalConfirmed(
protected abstract void internalRemove(
final NavigableMap<Long, PendingTransaction> senderTxs,
final PendingTransaction pendingTransaction,
final RemovalReason removalReason);
final LayeredRemovalReason removalReason);

protected abstract PendingTransaction getEvictable();

Expand Down Expand Up @@ -606,9 +608,10 @@ protected void notifyTransactionAdded(final PendingTransaction pendingTransactio
listener -> listener.onTransactionAdded(pendingTransaction.getTransaction()));
}

protected void notifyTransactionDropped(final PendingTransaction pendingTransaction) {
protected void notifyTransactionDropped(
final PendingTransaction pendingTransaction, final LayeredRemovalReason reason) {
onDroppedListeners.forEach(
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction()));
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction(), reason));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.MOVE;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.LayerMoveReason.DEMOTED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.LayerMoveReason.DEMOTED;

import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
*/
package org.hyperledger.besu.ethereum.eth.transactions.layered;

import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.DROPPED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.DROPPED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
Expand All @@ -25,7 +25,7 @@
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransactionDroppedListener;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;
import org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason;
import org.hyperledger.besu.ethereum.mainnet.feemarket.FeeMarket;
import org.hyperledger.besu.util.Subscribers;

Expand Down Expand Up @@ -78,7 +78,7 @@ public List<PendingTransaction> getAll() {
@Override
public TransactionAddedResult add(
final PendingTransaction pendingTransaction, final int gap, final AddReason reason) {
notifyTransactionDropped(pendingTransaction);
notifyTransactionDropped(pendingTransaction, DROPPED);
metrics.incrementRemoved(pendingTransaction, DROPPED.label(), name());
++droppedCount;
return TransactionAddedResult.DROPPED;
Expand Down Expand Up @@ -152,9 +152,10 @@ public void unsubscribeFromDropped(final long id) {
onDroppedListeners.unsubscribe(id);
}

protected void notifyTransactionDropped(final PendingTransaction pendingTransaction) {
protected void notifyTransactionDropped(
final PendingTransaction pendingTransaction, final LayeredRemovalReason reason) {
onDroppedListeners.forEach(
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction()));
listener -> listener.onTransactionDropped(pendingTransaction.getTransaction(), reason));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.INTERNAL_ERROR;
import static org.hyperledger.besu.ethereum.eth.transactions.TransactionAddedResult.NONCE_TOO_FAR_IN_FUTURE_FOR_SENDER;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.AddReason.NEW;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.RemovalReason.PoolRemovalReason.RECONCILED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.INVALIDATED;
import static org.hyperledger.besu.ethereum.eth.transactions.layered.LayeredRemovalReason.PoolRemovalReason.RECONCILED;

import org.hyperledger.besu.datatypes.Address;
import org.hyperledger.besu.datatypes.Hash;
Expand Down
Loading
Loading