Skip to content

Commit

Permalink
Introduce score for pending transactions
Browse files Browse the repository at this point in the history
Signed-off-by: Fabio Di Fabio <fabio.difabio@consensys.net>
  • Loading branch information
fab-10 committed Jul 22, 2024
1 parent 0fecdf1 commit 9be3e6b
Show file tree
Hide file tree
Showing 15 changed files with 289 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package org.hyperledger.besu.ethereum.blockcreation.txselection;

import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG;

Expand Down Expand Up @@ -419,11 +420,14 @@ private TransactionSelectionResult handleTransactionNotSelected(

final var pendingTransaction = evaluationContext.getPendingTransaction();

// check if this tx took too much to evaluate, and in case remove it from the pool
// check if this tx took too much to evaluate, and in case it was invalid remove it from the
// pool, otherwise penalize it.
final TransactionSelectionResult actualResult =
isTimeout.get()
? transactionTookTooLong(evaluationContext)
? TX_EVALUATION_TOO_LONG
? selectionResult.discard()
? INVALID_TX_EVALUATION_TOO_LONG
: TX_EVALUATION_TOO_LONG
: BLOCK_SELECTION_TIMEOUT
: selectionResult;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
import static org.assertj.core.api.Assertions.entry;
import static org.awaitility.Awaitility.await;
import static org.hyperledger.besu.ethereum.core.MiningParameters.DEFAULT_NON_POA_BLOCK_TXS_SELECTION_MAX_TIME;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE;
import static org.hyperledger.besu.ethereum.transaction.TransactionInvalidReason.NONCE_TOO_LOW;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.BLOCK_SELECTION_TIMEOUT;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.INVALID_TX_EVALUATION_TOO_LONG;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.PRIORITY_FEE_PER_GAS_BELOW_CURRENT_MIN;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.SELECTED;
import static org.hyperledger.besu.plugin.data.TransactionSelectionResult.TX_EVALUATION_TOO_LONG;
Expand Down Expand Up @@ -296,7 +297,7 @@ public void invalidTransactionsAreSkippedButBlockStillFills() {
final Transaction tx = createTransaction(i, Wei.of(7), 100_000);
transactionsToInject.add(tx);
if (i == 1) {
ensureTransactionIsInvalid(tx, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE);
ensureTransactionIsInvalid(tx, TransactionInvalidReason.NONCE_TOO_LOW);
} else {
ensureTransactionIsValid(tx);
}
Expand All @@ -311,8 +312,7 @@ public void invalidTransactionsAreSkippedButBlockStillFills() {
.containsOnly(
entry(
invalidTx,
TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name())));
TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name())));
assertThat(results.getSelectedTransactions().size()).isEqualTo(4);
assertThat(results.getSelectedTransactions().contains(invalidTx)).isFalse();
assertThat(results.getReceipts().size()).isEqualTo(4);
Expand Down Expand Up @@ -568,8 +568,7 @@ public void shouldDiscardTransactionsThatFailValidation() {

ensureTransactionIsValid(validTransaction, 21_000, 0);
final Transaction invalidTransaction = createTransaction(3, Wei.of(10), 21_000);
ensureTransactionIsInvalid(
invalidTransaction, TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE);
ensureTransactionIsInvalid(invalidTransaction, TransactionInvalidReason.NONCE_TOO_LOW);

transactionPool.addRemoteTransactions(List.of(validTransaction, invalidTransaction));

Expand All @@ -582,8 +581,7 @@ public void shouldDiscardTransactionsThatFailValidation() {
.containsOnly(
entry(
invalidTransaction,
TransactionSelectionResult.invalid(
TransactionInvalidReason.UPFRONT_COST_EXCEEDS_BALANCE.name())));
TransactionSelectionResult.invalid(TransactionInvalidReason.NONCE_TOO_LOW.name())));
}

@Test
Expand Down Expand Up @@ -948,7 +946,7 @@ public void subsetOfPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOver(

@ParameterizedTest
@MethodSource("subsetOfPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOver")
public void pendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThePool(
public void pendingTransactionsThatTakesTooLongToEvaluateIsPenalized(
final boolean isPoa,
final boolean preProcessingTooLate,
final boolean processingTooLate,
Expand All @@ -961,7 +959,7 @@ public void pendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThePool(
postProcessingTooLate,
900,
TX_EVALUATION_TOO_LONG,
true);
false);
}

private void internalBlockSelectionTimeoutSimulation(
Expand Down Expand Up @@ -1085,7 +1083,7 @@ public void subsetOfInvalidPendingTransactionsIncludedWhenTxSelectionMaxTimeIsOv
500,
BLOCK_SELECTION_TIMEOUT,
false,
UPFRONT_COST_EXCEEDS_BALANCE);
NONCE_TOO_LOW);
}

@ParameterizedTest
Expand All @@ -1102,9 +1100,9 @@ public void invalidPendingTransactionsThatTakesTooLongToEvaluateIsDroppedFromThe
processingTooLate,
postProcessingTooLate,
900,
TX_EVALUATION_TOO_LONG,
INVALID_TX_EVALUATION_TOO_LONG,
true,
UPFRONT_COST_EXCEEDS_BALANCE);
NONCE_TOO_LOW);
}

private void internalBlockSelectionTimeoutSimulationInvalidTxs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,19 +50,20 @@ public abstract class PendingTransaction
private final Transaction transaction;
private final long addedAt;
private final long sequence; // Allows prioritization based on order transactions are added
private volatile byte score = Byte.MAX_VALUE;
private volatile byte score;

private int memorySize = NOT_INITIALIZED;

private PendingTransaction(
final Transaction transaction, final long addedAt, final long sequence) {
final Transaction transaction, final long addedAt, final long sequence, final byte score) {
this.transaction = transaction;
this.addedAt = addedAt;
this.sequence = sequence;
this.score = score;
}

private PendingTransaction(final Transaction transaction, final long addedAt) {
this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement());
this(transaction, addedAt, TRANSACTIONS_ADDED.getAndIncrement(), Byte.MAX_VALUE);
}

public static PendingTransaction newPendingTransaction(
Expand Down Expand Up @@ -301,13 +302,13 @@ public Local(final Transaction transaction) {
this(transaction, System.currentTimeMillis());
}

private Local(final long sequence, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence);
private Local(final long sequence, final byte score, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence, score);
}

@Override
public PendingTransaction detachedCopy() {
return new Local(getSequence(), getTransaction().detachedCopy());
return new Local(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand All @@ -329,13 +330,13 @@ public Priority(final Transaction transaction, final long addedAt) {
super(transaction, addedAt);
}

public Priority(final long sequence, final Transaction transaction) {
super(sequence, transaction);
public Priority(final long sequence, final byte score, final Transaction transaction) {
super(sequence, score, transaction);
}

@Override
public PendingTransaction detachedCopy() {
return new Priority(getSequence(), getTransaction().detachedCopy());
return new Priority(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand All @@ -355,13 +356,13 @@ public Remote(final Transaction transaction) {
this(transaction, System.currentTimeMillis());
}

private Remote(final long sequence, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence);
private Remote(final long sequence, final byte score, final Transaction transaction) {
super(transaction, System.currentTimeMillis(), sequence, score);
}

@Override
public PendingTransaction detachedCopy() {
return new Remote(getSequence(), getTransaction().detachedCopy());
return new Remote(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand All @@ -383,13 +384,13 @@ public Priority(final Transaction transaction, final long addedAt) {
super(transaction, addedAt);
}

public Priority(final long sequence, final Transaction transaction) {
super(sequence, transaction);
public Priority(final long sequence, final byte score, final Transaction transaction) {
super(sequence, score, transaction);
}

@Override
public PendingTransaction detachedCopy() {
return new Priority(getSequence(), getTransaction().detachedCopy());
return new Priority(getSequence(), getScore(), getTransaction().detachedCopy());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,13 @@ protected void internalRemove(
orderByFee.remove(removedTx);
}

@Override
protected void internalPenalize(final PendingTransaction penalizedTx) {
orderByFee.remove(penalizedTx);
penalizedTx.decrementScore();
orderByFee.add(penalizedTx);
}

@Override
public List<PendingTransaction> promote(
final Predicate<PendingTransaction> promotionFilter,
Expand Down Expand Up @@ -192,7 +199,21 @@ public List<SenderPendingTransactions> getBySender() {
.toList();
}

public Map<Byte, List<SenderPendingTransactions>> getByScore() {
/**
* Returns pending txs by sender and ordered by score desc. In case a sender has pending txs with
* different scores, then in nonce sequence, every time there is a score decrease, his pending txs
* will be put in a new entry with that score. For example if a sender has 3 pending txs (where
* the first number is the nonce and the score is between parenthesis): 0(127), 1(126), 2(127),
* then for he there will be 2 entries:
*
* <ul>
* <li>0(127)
* <li>1(126), 2(127)
* </ul>
*
* @return pending txs by sender and ordered by score desc
*/
public NavigableMap<Byte, List<SenderPendingTransactions>> getByScore() {
final var sendersToAdd = new HashSet<>(txsBySender.keySet());
return orderByFee.descendingSet().stream()
.map(PendingTransaction::getSender)
Expand All @@ -206,7 +227,8 @@ public Map<Byte, List<SenderPendingTransactions>> getByScore() {
a.addAll(b);
return a;
},
TreeMap::new));
TreeMap::new))
.descendingMap();
}

private Map<Byte, List<SenderPendingTransactions>> splitByScore(
Expand All @@ -216,15 +238,14 @@ private Map<Byte, List<SenderPendingTransactions>> splitByScore(
var currSplit = new ArrayList<PendingTransaction>();
for (final var entry : txsBySender.entrySet()) {
if (entry.getValue().getScore() < currScore) {
// score decreased, we need to start a new split
// score decreased, we need to save current split and start a new one
splitByScore
.computeIfAbsent(currScore, k -> new ArrayList<>())
.add(new SenderPendingTransactions(sender, currSplit));
currSplit = new ArrayList<>();
currScore = entry.getValue().getScore();
} else {
currSplit.add(entry.getValue());
}
currSplit.add(entry.getValue());
}
splitByScore
.computeIfAbsent(currScore, k -> new ArrayList<>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,17 @@ final void promoteTransactions() {
}
}

@Override
public void penalize(final PendingTransaction penalizedTransaction) {
if (pendingTransactions.containsKey(penalizedTransaction.getHash())) {
internalPenalize(penalizedTransaction);
} else {
nextLayer.penalize(penalizedTransaction);
}
}

protected abstract void internalPenalize(final PendingTransaction pendingTransaction);

/**
* How many txs of a specified type can be promoted? This make sense when a max number of txs of a
* type can be included in a single block (ex. blob txs), to avoid filling the layer with more txs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import org.hyperledger.besu.datatypes.Wei;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.MiningParameters;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.eth.manager.EthScheduler;
import org.hyperledger.besu.ethereum.eth.transactions.BlobCache;
import org.hyperledger.besu.ethereum.eth.transactions.PendingTransaction;
Expand Down Expand Up @@ -196,8 +195,8 @@ protected String internalLogStats() {
return "Basefee Prioritized: Empty";
}

final Transaction highest = orderByFee.last().getTransaction();
final Transaction lowest = orderByFee.first().getTransaction();
final PendingTransaction highest = orderByFee.last();
final PendingTransaction lowest = orderByFee.first();

return "Basefee Prioritized: "
+ "count: "
Expand All @@ -206,16 +205,26 @@ protected String internalLogStats() {
+ spaceUsed
+ ", unique senders: "
+ txsBySender.size()
+ ", highest priority tx: [max fee: "
+ highest.getMaxGasPrice().toHumanReadableString()
+ ", highest priority tx: [score: "
+ highest.getScore()
+ ", max fee: "
+ highest.getTransaction().getMaxGasPrice().toHumanReadableString()
+ ", curr prio fee: "
+ highest.getEffectivePriorityFeePerGas(nextBlockBaseFee).toHumanReadableString()
+ highest
.getTransaction()
.getEffectivePriorityFeePerGas(nextBlockBaseFee)
.toHumanReadableString()
+ ", hash: "
+ highest.getHash()
+ "], lowest priority tx: [max fee: "
+ lowest.getMaxGasPrice().toHumanReadableString()
+ "], lowest priority tx: [score: "
+ lowest.getScore()
+ ", max fee: "
+ lowest.getTransaction().getMaxGasPrice().toHumanReadableString()
+ ", curr prio fee: "
+ lowest.getEffectivePriorityFeePerGas(nextBlockBaseFee).toHumanReadableString()
+ lowest
.getTransaction()
.getEffectivePriorityFeePerGas(nextBlockBaseFee)
.toHumanReadableString()
+ ", hash: "
+ lowest.getHash()
+ "], next block base fee: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public TransactionAddedResult add(final PendingTransaction pendingTransaction, f
@Override
public void remove(final PendingTransaction pendingTransaction, final RemovalReason reason) {}

@Override
public void penalize(final PendingTransaction penalizedTx) {}

@Override
public void blockAdded(
final FeeMarket feeMarket,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,21 +79,33 @@ protected boolean promotionFilter(final PendingTransaction pendingTransaction) {
}

@Override
public String internalLogStats() {
protected String internalLogStats() {
if (orderByFee.isEmpty()) {
return "GasPrice Prioritized: Empty";
}

final PendingTransaction highest = orderByFee.last();
final PendingTransaction lowest = orderByFee.first();

return "GasPrice Prioritized: "
+ "count: "
+ pendingTransactions.size()
+ " space used: "
+ ", space used: "
+ spaceUsed
+ " unique senders: "
+ ", unique senders: "
+ txsBySender.size()
+ ", highest fee tx: "
+ orderByFee.last().getTransaction().getGasPrice().get().toHumanReadableString()
+ ", lowest fee tx: "
+ orderByFee.first().getTransaction().getGasPrice().get().toHumanReadableString();
+ ", highest priority tx: [score: "
+ highest.getScore()
+ ", gas price: "
+ highest.getTransaction().getGasPrice().get().toHumanReadableString()
+ ", hash: "
+ highest.getHash()
+ "], lowest priority tx: [score: "
+ lowest.getScore()
+ ", gas price: "
+ lowest.getTransaction().getGasPrice().get().toHumanReadableString()
+ ", hash: "
+ lowest.getHash()
+ "]";
}
}
Loading

0 comments on commit 9be3e6b

Please sign in to comment.