Skip to content

Commit

Permalink
Introduce score for pending transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 committed Jul 22, 2024
1 parent bcbfec0 commit 0fecdf1
Show file tree
Hide file tree
Showing 10 changed files with 155 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ private TransactionSelectionResult transactionSelectionResultForInvalidResult(
* @return True if the invalid reason is transient, false otherwise.
*/
private boolean isTransientValidationError(final TransactionInvalidReason invalidReason) {
return invalidReason.equals(TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE)
return invalidReason.equals(TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE)
|| invalidReason.equals(TransactionInvalidReason.GAS_PRICE_BELOW_CURRENT_BASE_FEE)
|| invalidReason.equals(TransactionInvalidReason.NONCE_TOO_HIGH);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1423,15 +1423,17 @@ protected MiningParameters createMiningParameters(

private static class PluginTransactionSelectionResult extends TransactionSelectionResult {
private enum PluginStatus implements Status {
PLUGIN_INVALID(false, true),
PLUGIN_INVALID_TRANSIENT(false, false);
PLUGIN_INVALID(false, true, false),
PLUGIN_INVALID_TRANSIENT(false, false, true);

private final boolean stop;
private final boolean discard;
private final boolean penalize;

PluginStatus(final boolean stop, final boolean discard) {
PluginStatus(final boolean stop, final boolean discard, final boolean penalize) {
this.stop = stop;
this.discard = discard;
this.penalize = penalize;
}

@Override
Expand All @@ -1443,6 +1445,11 @@ public boolean stop() {
public boolean discard() {
return discard;
}

@Override
public boolean penalize() {
return penalize;
}
}

public static final TransactionSelectionResult GENERIC_PLUGIN_INVALID_TRANSIENT =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public abstract class PendingTransaction
private final Transaction transaction;
private final long addedAt;
private final long sequence; // Allows prioritization based on order transactions are added
private volatile byte score = Byte.MAX_VALUE;

private int memorySize = NOT_INITIALIZED;

Expand Down Expand Up @@ -123,6 +124,20 @@ public int memorySize() {
return memorySize;
}

public byte getScore() {
return score;
}

public void decrementScore() {
// use temp var to avoid non-atomic update of volatile var
final byte newScore = (byte) (score - 1);

// check to avoid underflow
if (newScore < score) {
score = newScore;
}
}

public abstract PendingTransaction detachedCopy();

private int computeMemorySize() {
Expand Down Expand Up @@ -255,6 +270,8 @@ public String toString() {
+ isReceivedFromLocalSource()
+ ", hasPriority="
+ hasPriority()
+ ", score="
+ score
+ '}';
}

Expand All @@ -267,6 +284,8 @@ public String toTraceLog() {
+ isReceivedFromLocalSource()
+ ", hasPriority="
+ hasPriority()
+ ", score="
+ score
+ ", "
+ transaction.toTraceLog()
+ "}";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolMetrics;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.BiFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;

/**
* Holds the current set of executable pending transactions, that are candidate for inclusion on
Expand Down Expand Up @@ -188,6 +192,46 @@ public List<SenderPendingTransactions> getBySender() {
.toList();
}

public Map<Byte, List<SenderPendingTransactions>> getByScore() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return orderByFee.descendingSet().stream()
.map(PendingTransaction::getSender)
.filter(sendersToAdd::remove)
.flatMap(sender -> splitByScore(sender, txsBySender.get(sender)).entrySet().stream())
.collect(
Collectors.toMap(
Map.Entry::getKey,
Map.Entry::getValue,
(a, b) -> {
a.addAll(b);
return a;
},
TreeMap::new));
}

private Map<Byte, List<SenderPendingTransactions>> splitByScore(
final Address sender, final NavigableMap<Long, PendingTransaction> txsBySender) {
final var splitByScore = new HashMap<Byte, List<SenderPendingTransactions>>();
byte currScore = txsBySender.firstEntry().getValue().getScore();
var currSplit = new ArrayList<PendingTransaction>();
for (final var entry : txsBySender.entrySet()) {
if (entry.getValue().getScore() < currScore) {
// score decreased, we need to start a new split
splitByScore
.computeIfAbsent(currScore, k -> new ArrayList<>())
.add(new SenderPendingTransactions(sender, currSplit));
currSplit = new ArrayList<>();
currScore = entry.getValue().getScore();
} else {
currSplit.add(entry.getValue());
}
}
splitByScore
.computeIfAbsent(currScore, k -> new ArrayList<>())
.add(new SenderPendingTransactions(sender, currSplit));
return splitByScore;
}

@Override
protected long cacheFreeSpace() {
return Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public BaseFeePrioritizedTransactions(

@Override
protected int compareByFee(final PendingTransaction pt1, final PendingTransaction pt2) {
return Comparator.comparing(PendingTransaction::hasPriority)
return Comparator.comparing(PendingTransaction::getScore)
.thenComparing(PendingTransaction::hasPriority)
.thenComparing(
(PendingTransaction pendingTransaction) ->
pendingTransaction.getTransaction().getEffectivePriorityFeePerGas(nextBlockBaseFee))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ public GasPricePrioritizedTransactions(

@Override
protected int compareByFee(final PendingTransaction pt1, final PendingTransaction pt2) {
return comparing(PendingTransaction::hasPriority)
return comparing(PendingTransaction::getScore)
.thenComparing(PendingTransaction::hasPriority)
.thenComparing(PendingTransaction::getGasPrice)
.thenComparing(PendingTransaction::getSequence)
.compare(pt1, pt2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collector;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -314,43 +316,61 @@ public synchronized List<Transaction> getPriorityTransactions() {
@Override
public void selectTransactions(final PendingTransactions.TransactionSelector selector) {
final List<PendingTransaction> invalidTransactions = new ArrayList<>();
final Set<Address> skipSenders = new HashSet<>();

final List<SenderPendingTransactions> candidateTxsBySender;
final Map<Byte, List<SenderPendingTransactions>> candidateTxsByScore;
synchronized (this) {
// since selecting transactions for block creation is a potential long operation
// we want to avoid to keep the lock for all the process, but we just lock to get
// the candidate transactions
candidateTxsBySender = prioritizedTransactions.getBySender();
candidateTxsByScore = prioritizedTransactions.getByScore();
}

selection:
for (final var senderTxs : candidateTxsBySender) {
LOG.trace("highPrioSenderTxs {}", senderTxs);

for (final var candidatePendingTx : senderTxs.pendingTransactions()) {
final var selectionResult = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(selectionResult)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (selectionResult.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, selectionResult);
}

if (selectionResult.stop()) {
LOG.trace("Stopping selection");
break selection;
}

if (!selectionResult.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
break;
for (final var entry : candidateTxsByScore.entrySet()) {
LOG.trace("Evaluating txs with score {}", entry.getKey());

for (final var senderTxs : entry.getValue()) {
LOG.trace("Evaluating sender txs {}", senderTxs);

if (!skipSenders.contains(senderTxs.sender())) {

for (final var candidatePendingTx : senderTxs.pendingTransactions()) {
final var selectionResult = selector.evaluateTransaction(candidatePendingTx);

LOG.atTrace()
.setMessage("Selection result {} for transaction {}")
.addArgument(selectionResult)
.addArgument(candidatePendingTx::toTraceLog)
.log();

if (selectionResult.discard()) {
invalidTransactions.add(candidatePendingTx);
logDiscardedTransaction(candidatePendingTx, selectionResult);
}

if (selectionResult.penalize()) {
candidatePendingTx.decrementScore();
LOG.atTrace()
.setMessage("Transaction {} penalized, new score is {}")
.addArgument(candidatePendingTx::toTraceLog)
.addArgument(candidatePendingTx::getScore)
.log();
}

if (selectionResult.stop()) {
LOG.trace("Stopping selection");
break selection;
}

if (!selectionResult.selected()) {
// avoid processing other txs from this sender if this one is skipped
// since the following will not be selected due to the nonce gap
LOG.trace("Skipping remaining txs for sender {}", candidatePendingTx.getSender());
skipSenders.add(candidatePendingTx.getSender());
break;
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ public class ReadyTransactions extends AbstractSequentialTransactionsLayer {

private final NavigableSet<PendingTransaction> orderByMaxFee =
new TreeSet<>(
Comparator.comparing(PendingTransaction::hasPriority)
Comparator.comparing(PendingTransaction::getScore)
.thenComparing(PendingTransaction::hasPriority)
.thenComparing((PendingTransaction pt) -> pt.getTransaction().getMaxGasPrice())
.thenComparing(PendingTransaction::getSequence));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public class SparseTransactions extends AbstractTransactionsLayer {
*/
private final NavigableSet<PendingTransaction> sparseEvictionOrder =
new TreeSet<>(
Comparator.comparing(PendingTransaction::hasPriority)
Comparator.comparing(PendingTransaction::getScore)
.thenComparing(PendingTransaction::hasPriority)
.thenComparing(PendingTransaction::getSequence));

private final Map<Address, Integer> gapBySender = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ protected interface Status {
*/
boolean discard();

boolean penalize();

/**
* Name of this status
*
Expand All @@ -52,30 +54,33 @@ protected interface Status {

private enum BaseStatus implements Status {
SELECTED,
BLOCK_FULL(true, false),
BLOBS_FULL(false, false),
BLOCK_OCCUPANCY_ABOVE_THRESHOLD(true, false),
BLOCK_SELECTION_TIMEOUT(true, false),
TX_EVALUATION_TOO_LONG(true, true),
INVALID_TRANSIENT(false, false),
INVALID(false, true);
BLOCK_FULL(true, false, false),
BLOBS_FULL(false, false, false),
BLOCK_OCCUPANCY_ABOVE_THRESHOLD(true, false, false),
BLOCK_SELECTION_TIMEOUT(true, false, false),
TX_EVALUATION_TOO_LONG(true, false, true),
INVALID_TRANSIENT(false, false, true),
INVALID(false, true, false);

private final boolean stop;
private final boolean discard;
private final boolean penalize;

BaseStatus() {
this.stop = false;
this.discard = false;
this.penalize = false;
}

BaseStatus(final boolean stop, final boolean discard) {
BaseStatus(final boolean stop, final boolean discard, final boolean penalize) {
this.stop = stop;
this.discard = discard;
this.penalize = penalize;
}

@Override
public String toString() {
return name() + " (stop=" + stop + ", discard=" + discard + ")";
return name() + " (stop=" + stop + ", discard=" + discard + ", penalize=" + penalize + ")";
}

@Override
Expand All @@ -87,6 +92,11 @@ public boolean stop() {
public boolean discard() {
return discard;
}

@Override
public boolean penalize() {
return penalize;
}
}

/** The transaction has been selected to be included in the new block */
Expand Down Expand Up @@ -215,6 +225,10 @@ public boolean discard() {
return status.discard();
}

public boolean penalize() {
return status.penalize();
}

/**
* Is the candidate transaction selected for block inclusion?
*
Expand Down

0 comments on commit 0fecdf1

Please sign in to comment.