Skip to content

Commit

Permalink
Merge pull request #16153 from smartcontractkit/ccip-db-load-fix
Browse files Browse the repository at this point in the history
[CCIP Fix] Updated the Finalizer to reduce the frequency of heavy query
  • Loading branch information
Bwest981 authored Jan 31, 2025
2 parents 37a530b + fab1841 commit 53a2e2e
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 13 deletions.
5 changes: 5 additions & 0 deletions .changeset/giant-dancers-tie.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Updated EVM TXM's Finalizer component to reduce the frequency of heavy DB query #internal
88 changes: 77 additions & 11 deletions core/chains/evm/txmgr/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,10 @@ var (
)

// processHeadTimeout represents a sanity limit on how long ProcessHead should take to complete
const processHeadTimeout = 10 * time.Minute
const (
processHeadTimeout = 10 * time.Minute
attemptsCacheRefreshThreshold = 5
)

type finalizerTxStore interface {
DeleteReceiptByTxHash(ctx context.Context, txHash common.Hash) error
Expand Down Expand Up @@ -103,6 +106,9 @@ type evmFinalizer struct {

lastProcessedFinalizedBlockNum int64
resumeCallback resumeCallback

attemptsCache []TxAttempt
attemptsCacheHitCount int
}

func NewEvmFinalizer(
Expand All @@ -116,15 +122,16 @@ func NewEvmFinalizer(
) *evmFinalizer {
lggr = logger.Named(lggr, "Finalizer")
return &evmFinalizer{
lggr: logger.Sugared(lggr),
chainID: chainID,
rpcBatchSize: int(rpcBatchSize),
forwardersEnabled: forwardersEnabled,
txStore: txStore,
client: client,
headTracker: headTracker,
mb: mailbox.NewSingle[*evmtypes.Head](),
resumeCallback: nil,
lggr: logger.Sugared(lggr),
chainID: chainID,
rpcBatchSize: int(rpcBatchSize),
forwardersEnabled: forwardersEnabled,
txStore: txStore,
client: client,
headTracker: headTracker,
mb: mailbox.NewSingle[*evmtypes.Head](),
resumeCallback: nil,
attemptsCacheHitCount: attemptsCacheRefreshThreshold, // start hit count at threshold to refresh cache on first run
}
}

Expand Down Expand Up @@ -377,7 +384,7 @@ func (f *evmFinalizer) batchCheckReceiptHashesOnchain(ctx context.Context, block
}

func (f *evmFinalizer) FetchAndStoreReceipts(ctx context.Context, head, latestFinalizedHead *evmtypes.Head) error {
attempts, err := f.txStore.FindAttemptsRequiringReceiptFetch(ctx, f.chainID)
attempts, err := f.fetchAttemptsRequiringReceiptFetch(ctx)
if err != nil {
return fmt.Errorf("failed to fetch broadcasted attempts for confirmed transactions: %w", err)
}
Expand Down Expand Up @@ -413,6 +420,8 @@ func (f *evmFinalizer) FetchAndStoreReceipts(ctx context.Context, head, latestFi
errorList = append(errorList, err)
continue
}
// Filter out attempts with found receipts from cache, if needed
f.filterAttemptsCache(receipts)
}
if len(errorList) > 0 {
return errors.Join(errorList...)
Expand Down Expand Up @@ -666,3 +675,60 @@ func (f *evmFinalizer) buildTxHashList(finalizedReceipts []*evmtypes.Receipt) []
}
return txHashes
}

// fetchAttemptsRequiringReceiptFetch is a wrapper around the TxStore call to fetch attempts requiring receipt fetch.
// Attempts are cached and used for subsequent fetches to reduce the load of the query.
// The attempts cache is refreshed every 6 requests.
func (f *evmFinalizer) fetchAttemptsRequiringReceiptFetch(ctx context.Context) ([]TxAttempt, error) {
// Return attempts from attempts cache if it is populated and the hit count has not reached the threshold for refresh
if f.attemptsCacheHitCount < attemptsCacheRefreshThreshold {
f.attemptsCacheHitCount++
return f.attemptsCache, nil
}
attempts, err := f.txStore.FindAttemptsRequiringReceiptFetch(ctx, f.chainID)
if err != nil {
return nil, err
}
// Refresh the cache with the latest results
f.attemptsCache = attempts
// Reset the cache hit count
f.attemptsCacheHitCount = 0
return f.attemptsCache, nil
}

// filterAttemptsCache removes attempts from the cache if a receipt was found for their transaction's ID
func (f *evmFinalizer) filterAttemptsCache(receipts []*evmtypes.Receipt) {
// Skip method if no receipts found
if len(receipts) == 0 {
return
}
// Skip method if refresh threshold has been met
// No need to filter the attempts cache since fresh data will be fetched on the next iteration
if f.attemptsCacheHitCount >= attemptsCacheRefreshThreshold {
return
}
attemptsWithoutReceipts := make([]TxAttempt, 0, len(f.attemptsCache))
txIDsWithReceipts := make([]int64, 0, len(f.attemptsCache))
// Gather the unique tx IDs that receipts were found for
for _, receipt := range receipts {
for _, attempt := range f.attemptsCache {
if attempt.Hash.Cmp(receipt.TxHash) == 0 {
txIDsWithReceipts = append(txIDsWithReceipts, attempt.TxID)
}
}
}
// Filter out attempts for tx with found receipts from the existing attempts cache
for _, attempt := range f.attemptsCache {
foundATxID := false
for _, txID := range txIDsWithReceipts {
if attempt.TxID == txID {
foundATxID = true
break
}
}
if !foundATxID {
attemptsWithoutReceipts = append(attemptsWithoutReceipts, attempt)
}
}
f.attemptsCache = attemptsWithoutReceipts
}
24 changes: 24 additions & 0 deletions core/chains/evm/txmgr/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/testutils"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr/mocks"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
Expand Down Expand Up @@ -871,6 +872,29 @@ func TestFinalizer_FetchAndStoreReceipts(t *testing.T) {
require.Equal(t, txmgrcommon.TxFatalError, etx.State)
require.Equal(t, txmgr.ErrCouldNotGetReceipt, etx.Error.String)
})

t.Run("attempts requiring receipt fetch is not fetched from TxStore every head", func(t *testing.T) {
txStore := mocks.NewEvmTxStore(t)
finalizer := txmgr.NewEvmFinalizer(logger.Test(t), testutils.FixtureChainID, rpcBatchSize, false, txStore, txmClient, ht)

// Mock finalizer txstore calls that are not needed
txStore.On("SaveFetchedReceipts", mock.Anything, mock.Anything).Return(nil).Maybe()
txStore.On("FindTxesPendingCallback", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Maybe()
txStore.On("UpdateTxCallbackCompleted", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
txStore.On("FindConfirmedTxesReceipts", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Maybe()
txStore.On("FindTxesByIDs", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil).Maybe()

// RPC returns nil receipt for attempt
ethClient.On("BatchCallContext", mock.Anything, mock.Anything).Return(nil).Maybe()

// Should fetch attempts list from txstore
attempt := cltest.NewLegacyEthTxAttempt(t, 0)
txStore.On("FindAttemptsRequiringReceiptFetch", mock.Anything, mock.Anything).Return([]txmgr.TxAttempt{attempt}, nil).Once()
require.NoError(t, finalizer.FetchAndStoreReceipts(ctx, head, latestFinalizedHead))
// Should use the attempts cache for receipt fetch
require.NoError(t, finalizer.FetchAndStoreReceipts(ctx, head, latestFinalizedHead))
require.NoError(t, finalizer.FetchAndStoreReceipts(ctx, head, latestFinalizedHead))
})
}

func TestFinalizer_FetchAndStoreReceipts_batching(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion core/internal/features/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ observationSource = """
require.Len(t, outputs, 1)
output := outputs[0]
receipt := output.(map[string]interface{})
assert.Equal(t, "0x13", receipt["blockNumber"])
assert.Equal(t, "0x19", receipt["blockNumber"])
assert.Equal(t, "0x7a120", receipt["gasUsed"])
assert.Equal(t, "0x0", receipt["status"])
})
Expand Down
2 changes: 1 addition & 1 deletion core/services/vrf/v2/integration_v2_reverted_txns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func getTxnReceiptDB(db *sqlx.DB, txesID int64) ([]v2.TxnReceiptDB, error) {
WITH txes AS (
SELECT *
FROM evm.txes
WHERE (state = 'confirmed' OR state = 'unconfirmed')
WHERE (state = 'confirmed' OR state = 'finalized')
AND id = $1
), attempts AS (
SELECT *
Expand Down

0 comments on commit 53a2e2e

Please sign in to comment.