diff --git a/cmd/geth/main.go b/cmd/geth/main.go index 03c9f29157a6..50e97d7d978c 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -185,6 +185,11 @@ var ( utils.GRPCPortFlag, } + metroFlags = []cli.Flag{ + utils.MetroGRPCHostFlag, + utils.MetroGRPCPortFlag, + } + metricsFlags = []cli.Flag{ utils.MetricsEnabledFlag, utils.MetricsEnabledExpensiveFlag, @@ -248,6 +253,7 @@ func init() { app.Flags = flags.Merge( nodeFlags, rpcFlags, + metroFlags, consoleFlags, debug.Flags, metricsFlags, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index b61a9b78634b..c7954b06af7b 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -811,6 +811,20 @@ var ( Category: flags.APICategory, } + // Metro grpc address and port overrides + MetroGRPCHostFlag = &cli.StringFlag{ + Name: "metro.addr", + Usage: "Metro gRPC server listening interface", + Value: ethconfig.Defaults.MetroGRPCHost, + Category: flags.APICategory, + } + MetroGRPCPortFlag = &cli.IntFlag{ + Name: "metro.port", + Usage: "Metro gRPC server listening port", + Value: ethconfig.Defaults.MetroGRPCPort, + Category: flags.APICategory, + } + // Network Settings MaxPeersFlag = &cli.IntFlag{ Name: "maxpeers", @@ -1891,6 +1905,14 @@ func SetEthConfig(ctx *cli.Context, stack *node.Node, cfg *ethconfig.Config) { } } + // metro + if ctx.IsSet(MetroGRPCHostFlag.Name) { + cfg.MetroGRPCHost = ctx.String(MetroGRPCHostFlag.Name) + } + if ctx.IsSet(MetroGRPCPortFlag.Name) { + cfg.MetroGRPCPort = ctx.Int(MetroGRPCPortFlag.Name) + } + // Override any default configs for hard coded networks. switch { case ctx.Bool(MainnetFlag.Name): diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 8597f6190f65..c179593e18cc 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -1043,6 +1043,14 @@ func (pool *TxPool) AddRemotesSync(txs []*types.Transaction) []error { return pool.addTxs(txs, false, true) } +// Remove a single transaction from the mempool. +func (pool *TxPool) RemoveTx(hash common.Hash) { + pool.mu.Lock() + defer pool.mu.Unlock() + + pool.removeTx(hash, false) +} + // This is like AddRemotes with a single transaction, but waits for pool reorganization. Tests use this method. func (pool *TxPool) addRemoteSync(tx *types.Transaction) error { errs := pool.AddRemotesSync([]*types.Transaction{tx}) diff --git a/eth/api_backend.go b/eth/api_backend.go index ac160b0736a1..8ed720912495 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -19,6 +19,7 @@ package eth import ( "context" "errors" + "fmt" "math/big" "time" @@ -351,6 +352,10 @@ func (b *EthAPIBackend) UnprotectedAllowed() bool { return b.allowUnprotectedTxs } +func (b *EthAPIBackend) MetroGRPCEndpoint() string { + return fmt.Sprintf("%s:%d", b.eth.config.MetroGRPCHost, b.eth.config.MetroGRPCPort) +} + func (b *EthAPIBackend) RPCGasCap() uint64 { return b.eth.config.RPCGasCap } diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index db686c5d0875..2f64014f5a74 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -90,6 +90,10 @@ var Defaults = Config{ RPCEVMTimeout: 5 * time.Second, GPO: FullNodeGPO, RPCTxFeeCap: 1, // 1 ether + + // Metro GRPC + MetroGRPCHost: "localhost", + MetroGRPCPort: 9090, } func init() { @@ -207,6 +211,10 @@ type Config struct { // OverrideShanghai (TODO: remove after the fork) OverrideShanghai *uint64 `toml:",omitempty"` + + // Metro GRPC Host and Port + MetroGRPCHost string `toml:",omitempty"` + MetroGRPCPort int `toml:",omitempty"` } // CreateConsensusEngine creates a consensus engine for the given chain configuration. diff --git a/grpc/execution/server.go b/grpc/execution/server.go index 11b7f39e82d0..6898a6af164b 100644 --- a/grpc/execution/server.go +++ b/grpc/execution/server.go @@ -92,6 +92,11 @@ func (s *ExecutionServiceServer) DoBlock(ctx context.Context, req *executionv1.D return nil, fmt.Errorf("failed to insert block into blockchain (n=%d)", n) } + // remove txs from original mempool + for _, tx := range block.Transactions() { + s.eth.TxPool().RemoveTx(tx.Hash()) + } + newForkChoice := &engine.ForkchoiceStateV1{ HeadBlockHash: block.Hash(), SafeBlockHash: block.Hash(), diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index 8b5d1893a2aa..51412df2549b 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -1694,12 +1694,19 @@ func SubmitTransaction(ctx context.Context, b Backend, tx *types.Transaction) (c return common.Hash{}, errors.New("only replay-protected (EIP-155) transactions allowed over RPC") } + // save transaction in geth mempool as well, so things like forge can look it up + if err := b.SendTx(ctx, tx); err != nil { + return common.Hash{}, err + } + // send to metro instead of eth mempool txBytes, err := tx.MarshalBinary() if err != nil { return common.Hash{}, err } - if err := submitMetroTransaction(txBytes); err != nil { + + metroAPI := NewMetroAPI(b.MetroGRPCEndpoint()) + if err := metroAPI.SubmitTransaction(txBytes); err != nil { return common.Hash{}, err } diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 0249c8664d3c..35da9e723471 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -97,6 +97,9 @@ type Backend interface { SubscribePendingLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) + + // Metro GRPC endpoint + MetroGRPCEndpoint() string } func GetAPIs(apiBackend Backend) []rpc.API { diff --git a/internal/ethapi/metro.go b/internal/ethapi/metro.go index 9e8de1efc685..ec9c4342ef20 100644 --- a/internal/ethapi/metro.go +++ b/internal/ethapi/metro.go @@ -2,10 +2,20 @@ package ethapi import ( metrotx "github.com/astriaorg/metro-transactions/tx" + "github.com/ethereum/go-ethereum/log" ) const secondaryChainID = "ethereum" -func submitMetroTransaction(tx []byte) error { - return metrotx.BuildAndSendSecondaryTransaction(metrotx.DefaultGRPCEndpoint, secondaryChainID, tx) +type MetroAPI struct { + endpoint string +} + +func NewMetroAPI(endpoint string) *MetroAPI { + log.Info("NewMetroAPI", "endpoint", endpoint) + return &MetroAPI{endpoint: endpoint} +} + +func (api *MetroAPI) SubmitTransaction(tx []byte) error { + return metrotx.BuildAndSendSecondaryTransaction(api.endpoint, secondaryChainID, tx) } diff --git a/internal/ethapi/transaction_args_test.go b/internal/ethapi/transaction_args_test.go index 24c15b777560..77c89199512e 100644 --- a/internal/ethapi/transaction_args_test.go +++ b/internal/ethapi/transaction_args_test.go @@ -264,6 +264,7 @@ func (b *backendMock) FeeHistory(ctx context.Context, blockCount uint64, lastBlo func (b *backendMock) ChainDb() ethdb.Database { return nil } func (b *backendMock) AccountManager() *accounts.Manager { return nil } func (b *backendMock) ExtRPCEnabled() bool { return false } +func (b *backendMock) MetroGRPCEndpoint() string { return "" } func (b *backendMock) RPCGasCap() uint64 { return 0 } func (b *backendMock) RPCEVMTimeout() time.Duration { return time.Second } func (b *backendMock) RPCTxFeeCap() float64 { return 0 } diff --git a/les/api_backend.go b/les/api_backend.go index 2d1fccd9ad43..6b1795eae0ad 100644 --- a/les/api_backend.go +++ b/les/api_backend.go @@ -19,6 +19,7 @@ package les import ( "context" "errors" + "fmt" "math/big" "time" @@ -292,6 +293,10 @@ func (b *LesApiBackend) UnprotectedAllowed() bool { return b.allowUnprotectedTxs } +func (b *LesApiBackend) MetroGRPCEndpoint() string { + return fmt.Sprintf("%s:%d", b.eth.config.MetroGRPCHost, b.eth.config.MetroGRPCPort) +} + func (b *LesApiBackend) RPCGasCap() uint64 { return b.eth.config.RPCGasCap }