diff --git a/op-node/flags/flags.go b/op-node/flags/flags.go index 1521b86a870a..c81aa8460fd3 100644 --- a/op-node/flags/flags.go +++ b/op-node/flags/flags.go @@ -124,18 +124,6 @@ var ( EnvVars: prefixEnvVars("L1_HTTP_POLL_INTERVAL"), Value: time.Second * 12, } - L1PrefetchingWindow = &cli.Uint64Flag{ - Name: "l1.prefetching-window", - Usage: "Number of L1 blocks to prefetch in the background. Disabled if 0.", - EnvVars: prefixEnvVars("L1_PREFETCHING_WINDOW"), - Value: 0, - } - L1PrefetchingTimeout = &cli.DurationFlag{ - Name: "l1.prefetching-timeout", - Usage: "Timeout for L1 prefetching. Disabled if 0.", - EnvVars: prefixEnvVars("L1_PREFETCHING_TIMEOUT"), - Value: time.Second * 30, - } L2EngineJWTSecret = &cli.StringFlag{ Name: "l2.jwt-secret", Usage: "Path to JWT secret key. Keys are 32 bytes, hex encoded in a file. A new key will be generated if left empty.", @@ -321,8 +309,6 @@ var optionalFlags = []cli.Flag{ L1RPCMaxBatchSize, L1RPCMaxConcurrency, L1HTTPPollInterval, - L1PrefetchingWindow, - L1PrefetchingTimeout, L2EngineJWTSecret, VerifierL1Confs, SequencerEnabledFlag, diff --git a/op-node/node/client.go b/op-node/node/client.go index 621f307c63b4..1fb1c44f59ba 100644 --- a/op-node/node/client.go +++ b/op-node/node/client.go @@ -159,14 +159,6 @@ type L1EndpointConfig struct { // It is recommended to use websockets or IPC for efficient following of the changing block. // Setting this to 0 disables polling. HttpPollInterval time.Duration - - // PrefetchingWindow specifies the number of blocks to prefetch from the L1 RPC. - // Setting this to 0 disables prefetching. - PrefetchingWindow uint64 - - // PrefetchingTimeout specifies the timeout for prefetching from the L1 RPC. - // Setting this to 0 disables prefetching. - PrefetchingTimeout time.Duration } var _ L1EndpointSetup = (*L1EndpointConfig)(nil) @@ -200,8 +192,6 @@ func (cfg *L1EndpointConfig) Setup(ctx context.Context, log log.Logger, rollupCf rpcCfg := sources.L1ClientDefaultConfig(rollupCfg, cfg.L1TrustRPC, cfg.L1RPCKind) rpcCfg.MaxRequestsPerBatch = cfg.BatchSize rpcCfg.MaxConcurrentRequests = cfg.MaxConcurrency - rpcCfg.PrefetchingWindow = cfg.PrefetchingWindow - rpcCfg.PrefetchingTimeout = cfg.PrefetchingTimeout return l1Node, rpcCfg, nil } diff --git a/op-node/service.go b/op-node/service.go index a0178aa0bd28..e412d92e2338 100644 --- a/op-node/service.go +++ b/op-node/service.go @@ -123,15 +123,13 @@ func NewConfig(ctx *cli.Context, log log.Logger) (*node.Config, error) { func NewL1EndpointConfig(ctx *cli.Context) *node.L1EndpointConfig { return &node.L1EndpointConfig{ - L1NodeAddr: ctx.String(flags.L1NodeAddr.Name), - L1TrustRPC: ctx.Bool(flags.L1TrustRPC.Name), - L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.String(flags.L1RPCProviderKind.Name))), - PrefetchingWindow: ctx.Uint64(flags.L1PrefetchingWindow.Name), - PrefetchingTimeout: ctx.Duration(flags.L1PrefetchingTimeout.Name), - RateLimit: ctx.Float64(flags.L1RPCRateLimit.Name), - BatchSize: ctx.Int(flags.L1RPCMaxBatchSize.Name), - HttpPollInterval: ctx.Duration(flags.L1HTTPPollInterval.Name), - MaxConcurrency: ctx.Int(flags.L1RPCMaxConcurrency.Name), + L1NodeAddr: ctx.String(flags.L1NodeAddr.Name), + L1TrustRPC: ctx.Bool(flags.L1TrustRPC.Name), + L1RPCKind: sources.RPCProviderKind(strings.ToLower(ctx.String(flags.L1RPCProviderKind.Name))), + RateLimit: ctx.Float64(flags.L1RPCRateLimit.Name), + BatchSize: ctx.Int(flags.L1RPCMaxBatchSize.Name), + HttpPollInterval: ctx.Duration(flags.L1HTTPPollInterval.Name), + MaxConcurrency: ctx.Int(flags.L1RPCMaxConcurrency.Name), } } diff --git a/op-service/sources/l1_client.go b/op-service/sources/l1_client.go index bf2c7a1ed298..3cda3a73304e 100644 --- a/op-service/sources/l1_client.go +++ b/op-service/sources/l1_client.go @@ -20,8 +20,6 @@ type L1ClientConfig struct { EthClientConfig L1BlockRefsCacheSize int - PrefetchingWindow uint64 - PrefetchingTimeout time.Duration } func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProviderKind) *L1ClientConfig { @@ -47,8 +45,6 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide }, // Not bounded by span, to cover find-sync-start range fully for speedy recovery after errors. L1BlockRefsCacheSize: fullSpan, - PrefetchingWindow: 0, // no prefetching by default - PrefetchingTimeout: 0, // no prefetching by default } } @@ -56,7 +52,7 @@ func L1ClientDefaultConfig(config *rollup.Config, trustRPC bool, kind RPCProvide // with optimized batch requests, cached results, and flag to not trust the RPC // (i.e. to verify all returned contents against corresponding block hashes). type L1Client struct { - EthClientInterface + *EthClient // cache L1BlockRef by hash // common.Hash -> eth.L1BlockRef @@ -70,23 +66,10 @@ func NewL1Client(client client.RPC, log log.Logger, metrics caching.Metrics, con return nil, err } - var clientToUse EthClientInterface - - if config.PrefetchingTimeout > 0 && config.PrefetchingWindow > 0 { - prefetchingEthClient, err := NewPrefetchingEthClient(ethClient, config.PrefetchingWindow, config.PrefetchingTimeout) - if err != nil { - return nil, err - } - clientToUse = prefetchingEthClient - } else { - clientToUse = ethClient - } - return &L1Client{ - EthClientInterface: clientToUse, - l1BlockRefsCache: caching.NewLRUCache[common.Hash, eth.L1BlockRef](metrics, "blockrefs", config.L1BlockRefsCacheSize), + EthClient: ethClient, + l1BlockRefsCache: caching.NewLRUCache[common.Hash, eth.L1BlockRef](metrics, "blockrefs", config.L1BlockRefsCacheSize), }, nil - } // L1BlockRefByLabel returns the [eth.L1BlockRef] for the given block label. diff --git a/op-service/sources/prefetching_eth_client.go b/op-service/sources/prefetching_eth_client.go deleted file mode 100644 index 31da80289026..000000000000 --- a/op-service/sources/prefetching_eth_client.go +++ /dev/null @@ -1,276 +0,0 @@ -package sources - -import ( - "context" - "math/big" - "sync" - "time" - - "github.com/ethereum-optimism/optimism/op-service/eth" - "github.com/ethereum/go-ethereum" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" -) - -type EthClientInterface interface { - SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) - ChainID(ctx context.Context) (*big.Int, error) - InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) - InfoByNumber(ctx context.Context, number uint64) (eth.BlockInfo, error) - InfoByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, error) - InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) - InfoAndTxsByNumber(ctx context.Context, number uint64) (eth.BlockInfo, types.Transactions, error) - InfoAndTxsByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, types.Transactions, error) - PayloadByHash(ctx context.Context, hash common.Hash) (*eth.ExecutionPayload, error) - PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error) - PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*eth.ExecutionPayload, error) - FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) - GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error) - GetStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockTag string) (common.Hash, error) - ReadStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockHash common.Hash) (common.Hash, error) - Close() -} - -type PrefetchingEthClient struct { - inner EthClientInterface - PrefetchingRange uint64 - PrefetchingTimeout time.Duration - runningCtx context.Context - runningCancel context.CancelFunc - highestHeadRequesting uint64 - highestHeadLock sync.Mutex - wg *sync.WaitGroup // used for testing -} - -// NewPrefetchingEthClient creates a new [PrefetchingEthClient] with the given underlying [EthClient] -// and a prefetching range. -func NewPrefetchingEthClient(inner EthClientInterface, prefetchingRange uint64, timeout time.Duration) (*PrefetchingEthClient, error) { - // Create a new context for the prefetching goroutines - runningCtx, runningCancel := context.WithCancel(context.Background()) - return &PrefetchingEthClient{ - inner: inner, - PrefetchingRange: prefetchingRange, - PrefetchingTimeout: timeout, - runningCtx: runningCtx, - runningCancel: runningCancel, - highestHeadRequesting: 0, - }, nil -} - -func (p *PrefetchingEthClient) updateRequestingHead(start, end uint64) (newStart uint64, shouldFetch bool) { - // Acquire lock before reading/updating highestHeadRequesting - p.highestHeadLock.Lock() - defer p.highestHeadLock.Unlock() - if start <= p.highestHeadRequesting { - start = p.highestHeadRequesting + 1 - } - if p.highestHeadRequesting < end { - p.highestHeadRequesting = end - } - return start, start <= end -} - -func (p *PrefetchingEthClient) FetchWindow(start, end uint64) { - if p.wg != nil { - defer p.wg.Done() - } - - start, shouldFetch := p.updateRequestingHead(start, end) - if !shouldFetch { - return - } - - ctx, cancel := context.WithTimeout(p.runningCtx, p.PrefetchingTimeout) - defer cancel() - for i := start; i <= end; i++ { - p.FetchBlockAndReceipts(ctx, i) - } -} - -func (p *PrefetchingEthClient) FetchBlockAndReceipts(ctx context.Context, number uint64) { - blockInfo, _, err := p.inner.InfoAndTxsByNumber(ctx, number) - if err != nil { - return - } - _, _, _ = p.inner.FetchReceipts(ctx, blockInfo.Hash()) -} - -func (p *PrefetchingEthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { - return p.inner.SubscribeNewHead(ctx, ch) -} - -func (p *PrefetchingEthClient) ChainID(ctx context.Context) (*big.Int, error) { - return p.inner.ChainID(ctx) -} - -func (p *PrefetchingEthClient) InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) { - // Fetch the block information for the given hash - blockInfo, err := p.inner.InfoByHash(ctx, hash) - if err != nil { - return blockInfo, err - } - - if p.wg != nil { - p.wg.Add(1) - } - // Prefetch the next n blocks and their receipts starting from the block number of the fetched block - go p.FetchWindow(blockInfo.NumberU64()+1, blockInfo.NumberU64()+p.PrefetchingRange) - - return blockInfo, nil -} - -func (p *PrefetchingEthClient) InfoByNumber(ctx context.Context, number uint64) (eth.BlockInfo, error) { - if p.wg != nil { - p.wg.Add(1) - } - // Trigger prefetching in the background - go p.FetchWindow(number+1, number+p.PrefetchingRange) - - // Fetch the requested block - return p.inner.InfoByNumber(ctx, number) -} - -func (p *PrefetchingEthClient) InfoByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, error) { - // Fetch the block information for the given label - blockInfo, err := p.inner.InfoByLabel(ctx, label) - if err != nil { - return blockInfo, err - } - - if p.wg != nil { - p.wg.Add(1) - } - // Prefetch the next n blocks and their receipts starting from the block number of the fetched block - go p.FetchWindow(blockInfo.NumberU64()+1, blockInfo.NumberU64()+p.PrefetchingRange) - - return blockInfo, nil -} - -func (p *PrefetchingEthClient) InfoAndTxsByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, types.Transactions, error) { - // Fetch the block info and transactions for the requested hash - blockInfo, txs, err := p.inner.InfoAndTxsByHash(ctx, hash) - if err != nil { - return blockInfo, txs, err - } - - if p.wg != nil { - p.wg.Add(1) - } - // Prefetch the next n blocks and their receipts - go p.FetchWindow(blockInfo.NumberU64()+1, blockInfo.NumberU64()+p.PrefetchingRange) - - return blockInfo, txs, nil -} - -func (p *PrefetchingEthClient) InfoAndTxsByNumber(ctx context.Context, number uint64) (eth.BlockInfo, types.Transactions, error) { - // Fetch the block info and transactions for the requested number - blockInfo, txs, err := p.inner.InfoAndTxsByNumber(ctx, number) - if err != nil { - return blockInfo, txs, err - } - - if p.wg != nil { - p.wg.Add(1) - } - // Prefetch the next n blocks and their receipts - go p.FetchWindow(number+1, number+p.PrefetchingRange) - - return blockInfo, txs, nil -} - -func (p *PrefetchingEthClient) InfoAndTxsByLabel(ctx context.Context, label eth.BlockLabel) (eth.BlockInfo, types.Transactions, error) { - // Fetch the block info and transactions for the requested label - blockInfo, txs, err := p.inner.InfoAndTxsByLabel(ctx, label) - if err != nil { - return blockInfo, txs, err - } - - if p.wg != nil { - p.wg.Add(1) - } - // Prefetch the next n blocks and their receipts - go p.FetchWindow(blockInfo.NumberU64()+1, blockInfo.NumberU64()+p.PrefetchingRange) - - return blockInfo, txs, nil -} - -func (p *PrefetchingEthClient) PayloadByHash(ctx context.Context, hash common.Hash) (*eth.ExecutionPayload, error) { - // Fetch the payload for the requested hash - payload, err := p.inner.PayloadByHash(ctx, hash) - if err != nil { - return payload, err - } - - if p.wg != nil { - p.wg.Add(1) - } - // Prefetch the next n blocks and their receipts - go p.FetchWindow(uint64(payload.BlockNumber)+1, uint64(payload.BlockNumber)+p.PrefetchingRange) - - return payload, nil -} - -func (p *PrefetchingEthClient) PayloadByNumber(ctx context.Context, number uint64) (*eth.ExecutionPayload, error) { - // Fetch the payload for the requested number - payload, err := p.inner.PayloadByNumber(ctx, number) - if err != nil { - return payload, err - } - - if p.wg != nil { - p.wg.Add(1) - } - // Prefetch the next n blocks and their receipts - go p.FetchWindow(number+1, number+p.PrefetchingRange) - - return payload, nil -} - -func (p *PrefetchingEthClient) PayloadByLabel(ctx context.Context, label eth.BlockLabel) (*eth.ExecutionPayload, error) { - // Fetch the payload for the requested label - payload, err := p.inner.PayloadByLabel(ctx, label) - if err != nil { - return payload, err - } - - if p.wg != nil { - p.wg.Add(1) - } - // Prefetch the next n blocks and their receipts - go p.FetchWindow(uint64(payload.BlockNumber)+1, uint64(payload.BlockNumber)+p.PrefetchingRange) - - return payload, nil -} - -func (p *PrefetchingEthClient) FetchReceipts(ctx context.Context, blockHash common.Hash) (eth.BlockInfo, types.Receipts, error) { - // Fetch the block info and receipts for the requested hash - blockInfo, receipts, err := p.inner.FetchReceipts(ctx, blockHash) - if err != nil { - return blockInfo, receipts, err - } - - if p.wg != nil { - p.wg.Add(1) - } - // Prefetch the next n blocks and their receipts - go p.FetchWindow(blockInfo.NumberU64(), blockInfo.NumberU64()+p.PrefetchingRange) - - return blockInfo, receipts, nil -} - -func (p *PrefetchingEthClient) GetProof(ctx context.Context, address common.Address, storage []common.Hash, blockTag string) (*eth.AccountResult, error) { - return p.inner.GetProof(ctx, address, storage, blockTag) -} - -func (p *PrefetchingEthClient) GetStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockTag string) (common.Hash, error) { - return p.inner.GetStorageAt(ctx, address, storageSlot, blockTag) -} - -func (p *PrefetchingEthClient) ReadStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockHash common.Hash) (common.Hash, error) { - return p.inner.ReadStorageAt(ctx, address, storageSlot, blockHash) -} - -func (p *PrefetchingEthClient) Close() { - p.runningCancel() - p.inner.Close() -} diff --git a/op-service/sources/prefetching_eth_client_test.go b/op-service/sources/prefetching_eth_client_test.go deleted file mode 100644 index 680baf8982d8..000000000000 --- a/op-service/sources/prefetching_eth_client_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package sources - -import ( - "context" - "fmt" - "math/rand" - "sync" - "testing" - "time" - - "github.com/ethereum-optimism/optimism/op-service/testutils" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/core/types" - "github.com/stretchr/testify/require" -) - -// TestPrefetchingEthClient runs all test cases for each prefetching range. -func TestPrefetchingEthClient(t *testing.T) { - prefetchingRanges := []uint64{0, 1, 5} - for _, prefetchingRange := range prefetchingRanges { - testName := fmt.Sprintf("range-%d", prefetchingRange) - t.Run(testName, func(t *testing.T) { - ctx := context.Background() - mockEthClient := new(testutils.MockEthClient) - client, err := NewPrefetchingEthClient(mockEthClient, prefetchingRange, 30*time.Second) - require.NoError(t, err) - defer client.Close() - client.wg = new(sync.WaitGroup) // Initialize the WaitGroup for testing - - // set up a random block to get from the client - randomness := rand.New(rand.NewSource(123)) - block, _ := randomRpcBlockAndReceipts(randomness, 2) - rhdr := block.rpcHeader - expectedTxs := block.Transactions - expectedInfo, err := rhdr.Info(true, false) - require.NoError(t, err) - mockEthClient.ExpectInfoAndTxsByNumber(uint64(rhdr.Number), expectedInfo, expectedTxs, nil) - - // also set up a window of random blocks and receipts to prefetch - windowEnd := (uint64(rhdr.Number) + client.PrefetchingRange) - for i := uint64(rhdr.Number) + 1; i <= windowEnd; i++ { - // set up different info per iteration - fillerBlock, fillerReceipts := randomRpcBlockAndReceipts(randomness, 2) - fillerBlock.rpcHeader.Number = hexutil.Uint64(i) - fillerInfo, err := fillerBlock.rpcHeader.Info(true, false) - require.NoError(t, err) - mockEthClient.ExpectInfoAndTxsByNumber(i, fillerInfo, fillerBlock.Transactions, nil) - mockEthClient.ExpectFetchReceipts(fillerBlock.Hash, fillerInfo, fillerReceipts, nil) - } - - info, txs, err := client.InfoAndTxsByNumber(ctx, uint64(rhdr.Number)) - require.NoError(t, err) - require.Equal(t, info, expectedInfo) - require.Equal(t, txs, types.Transactions(expectedTxs)) - client.wg.Wait() // Wait for all goroutines to complete before asserting expectations - mockEthClient.AssertExpectations(t) - }) - } -} - -func TestUpdateRequestingHead_NormalRange(t *testing.T) { - client := &PrefetchingEthClient{ - highestHeadRequesting: 10, - PrefetchingTimeout: 30 * time.Second, - } - - start, end := uint64(11), uint64(15) - newStart, shouldFetch := client.updateRequestingHead(start, end) - - require.Equal(t, newStart, start) - require.True(t, shouldFetch) - require.Equal(t, client.highestHeadRequesting, end) -} - -func TestUpdateRequestingHead_OverlappingRange(t *testing.T) { - highestHeadBeforeUpdate := uint64(10) - client := &PrefetchingEthClient{ - highestHeadRequesting: highestHeadBeforeUpdate, - PrefetchingTimeout: 30 * time.Second, - } - - start, end := uint64(8), uint64(12) - newStart, shouldFetch := client.updateRequestingHead(start, end) - - require.Equal(t, newStart, highestHeadBeforeUpdate+1) - require.True(t, shouldFetch) - require.Equal(t, client.highestHeadRequesting, end) -} diff --git a/op-service/testutils/mock_eth_client.go b/op-service/testutils/mock_eth_client.go index 8e0d3ee1f126..aa96da3ddeca 100644 --- a/op-service/testutils/mock_eth_client.go +++ b/op-service/testutils/mock_eth_client.go @@ -2,11 +2,9 @@ package testutils import ( "context" - "math/big" "github.com/stretchr/testify/mock" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -17,15 +15,6 @@ type MockEthClient struct { mock.Mock } -func (m *MockEthClient) ChainID(ctx context.Context) (*big.Int, error) { - out := m.Mock.MethodCalled("ChainID") - return out[0].(*big.Int), *out[1].(*error) -} - -func (m *MockEthClient) ExpectChainID(chainID *big.Int, err error) { - m.Mock.On("ChainID").Once().Return(chainID, &err) -} - func (m *MockEthClient) InfoByHash(ctx context.Context, hash common.Hash) (eth.BlockInfo, error) { out := m.Mock.MethodCalled("InfoByHash", hash) return *out[0].(*eth.BlockInfo), *out[1].(*error) @@ -139,9 +128,3 @@ func (m *MockEthClient) ReadStorageAt(ctx context.Context, address common.Addres func (m *MockEthClient) ExpectReadStorageAt(ctx context.Context, address common.Address, storageSlot common.Hash, blockHash common.Hash, result common.Hash, err error) { m.Mock.On("ReadStorageAt", address, storageSlot, blockHash).Once().Return(result, &err) } - -func (m *MockEthClient) Close() {} - -func (m *MockEthClient) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { - return nil, nil -}