diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index e4a1cf0f0e2b..f871a6483990 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -54,6 +54,7 @@ jobs: uses: actions/checkout@v2 with: repository: flashbots/mev-geth-demo + ref: no-megabundles path: e2e - run: cd e2e && yarn install @@ -71,17 +72,9 @@ jobs: cd e2e GETH=`pwd`/../build/bin/geth ./run.sh & # Second node, not mining - P2P_PORT=30302 DATADIR=datadir2 HTTP_PORT=8546 MINER_ARGS='--nodiscover' GETH=`pwd`/../build/bin/geth ./run.sh & + P2P_PORT=30302 DATADIR=datadir2 HTTP_PORT=8546 AUTH_PORT=8552 MINER_ARGS='--nodiscover' GETH=`pwd`/../build/bin/geth ./run.sh & sleep 15 DATADIR1=datadir DATADIR2=datadir2 GETH=`pwd`/../build/bin/geth ./peer_nodes.sh sleep 15 yarn run demo-private-tx pkill -9 geth || true - - name: Run megabundle-only node checking for reverts - run: | - cd e2e - # Disable bundle workers - MINER_ARGS='--miner.etherbase=0xd912aecb07e9f4e1ea8e6b4779e7fb6aa1c3e4d8 --miner.trustedrelays=0xfb11e78C4DaFec86237c2862441817701fdf197F --mine --miner.threads=2 --miner.maxmergedbundles=0' GETH=`pwd`/../build/bin/geth ./run.sh & - sleep 15 - yarn run e2e-reverting-megabundle - pkill -9 geth || true diff --git a/core/txpool/txpool.go b/core/txpool/txpool.go index 66596b5c8eb0..321b965619cc 100644 --- a/core/txpool/txpool.go +++ b/core/txpool/txpool.go @@ -640,59 +640,6 @@ func (pool *TxPool) AddMevBundle(txs types.Transactions, blockNumber *big.Int, m return nil } -// AddMegaBundle adds a megabundle to the pool. Assumes the relay signature has been verified already. -func (pool *TxPool) AddMegabundle(relayAddr common.Address, txs types.Transactions, blockNumber *big.Int, minTimestamp, maxTimestamp uint64, revertingTxHashes []common.Hash) error { - pool.mu.Lock() - defer pool.mu.Unlock() - - fromTrustedRelay := false - for _, trustedAddr := range pool.config.TrustedRelays { - if relayAddr == trustedAddr { - fromTrustedRelay = true - } - } - if !fromTrustedRelay { - return errors.New("megabundle from non-trusted address") - } - - megabundle := types.MevBundle{ - Txs: txs, - BlockNumber: blockNumber, - MinTimestamp: minTimestamp, - MaxTimestamp: maxTimestamp, - RevertingTxHashes: revertingTxHashes, - } - - pool.megabundles[relayAddr] = megabundle - - for _, hook := range pool.NewMegabundleHooks { - go hook(relayAddr, &megabundle) - } - - return nil -} - -// GetMegabundle returns the latest megabundle submitted by a given relay. -func (pool *TxPool) GetMegabundle(relayAddr common.Address, blockNumber *big.Int, blockTimestamp uint64) (types.MevBundle, error) { - pool.mu.Lock() - defer pool.mu.Unlock() - - megabundle, ok := pool.megabundles[relayAddr] - if !ok { - return types.MevBundle{}, errors.New("No megabundle found") - } - if megabundle.BlockNumber.Cmp(blockNumber) != 0 { - return types.MevBundle{}, errors.New("Megabundle does not fit blockNumber constraints") - } - if megabundle.MinTimestamp != 0 && megabundle.MinTimestamp > blockTimestamp { - return types.MevBundle{}, errors.New("Megabundle does not fit minTimestamp constraints") - } - if megabundle.MaxTimestamp != 0 && megabundle.MaxTimestamp < blockTimestamp { - return types.MevBundle{}, errors.New("Megabundle does not fit maxTimestamp constraints") - } - return megabundle, nil -} - // Locals retrieves the accounts currently considered local by the pool. func (pool *TxPool) Locals() []common.Address { pool.mu.Lock() diff --git a/eth/api_backend.go b/eth/api_backend.go index 498c74aef1b2..e5b5f6c9f9fa 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -285,10 +285,6 @@ func (b *EthAPIBackend) SendBundle(ctx context.Context, txs types.Transactions, return b.eth.txPool.AddMevBundle(txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) } -func (b *EthAPIBackend) SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error { - return b.eth.txPool.AddMegabundle(relayAddr, txs, big.NewInt(blockNumber.Int64()), minTimestamp, maxTimestamp, revertingTxHashes) -} - func (b *EthAPIBackend) GetPoolTransactions() (types.Transactions, error) { pending := b.eth.txPool.Pending(false) var txs types.Transactions diff --git a/internal/ethapi/api.go b/internal/ethapi/api.go index ca235ae4fb33..0a64f7f2e925 100644 --- a/internal/ethapi/api.go +++ b/internal/ethapi/api.go @@ -2133,25 +2133,6 @@ type SendBundleArgs struct { RevertingTxHashes []common.Hash `json:"revertingTxHashes"` } -// SendMegabundleArgs represents the arguments for a SendMegabundle call. -type SendMegabundleArgs struct { - Txs []hexutil.Bytes `json:"txs"` - BlockNumber uint64 `json:"blockNumber"` - MinTimestamp *uint64 `json:"minTimestamp"` - MaxTimestamp *uint64 `json:"maxTimestamp"` - RevertingTxHashes []common.Hash `json:"revertingTxHashes"` - RelaySignature hexutil.Bytes `json:"relaySignature"` -} - -// UnsignedMegabundle is used for serialization and subsequent digital signing. -type UnsignedMegabundle struct { - Txs []hexutil.Bytes - BlockNumber uint64 - MinTimestamp uint64 - MaxTimestamp uint64 - RevertingTxHashes []common.Hash -} - // SendBundle will add the signed transaction to the transaction pool. // The sender is responsible for signing the transaction and using the correct nonce and ensuring validity func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs) error { @@ -2182,61 +2163,6 @@ func (s *PrivateTxBundleAPI) SendBundle(ctx context.Context, args SendBundleArgs return s.b.SendBundle(ctx, txs, args.BlockNumber, minTimestamp, maxTimestamp, args.RevertingTxHashes) } -// Recovers the Ethereum address of the trusted relay that signed the megabundle. -func RecoverRelayAddress(args SendMegabundleArgs) (common.Address, error) { - megabundle := UnsignedMegabundle{Txs: args.Txs, BlockNumber: args.BlockNumber, RevertingTxHashes: args.RevertingTxHashes} - if args.MinTimestamp != nil { - megabundle.MinTimestamp = *args.MinTimestamp - } else { - megabundle.MinTimestamp = 0 - } - if args.MaxTimestamp != nil { - megabundle.MaxTimestamp = *args.MaxTimestamp - } else { - megabundle.MaxTimestamp = 0 - } - rlpEncoding, _ := rlp.EncodeToBytes(megabundle) - signature := args.RelaySignature - signature[64] -= 27 // account for Ethereum V - recoveredPubkey, err := crypto.SigToPub(accounts.TextHash(rlpEncoding), args.RelaySignature) - if err != nil { - return common.Address{}, err - } - return crypto.PubkeyToAddress(*recoveredPubkey), nil -} - -// SendMegabundle will add the signed megabundle to one of the workers for evaluation. -func (s *PrivateTxBundleAPI) SendMegabundle(ctx context.Context, args SendMegabundleArgs) error { - log.Info("Received a Megabundle request", "signature", args.RelaySignature) - var txs types.Transactions - if len(args.Txs) == 0 { - return errors.New("megabundle missing txs") - } - if args.BlockNumber == 0 { - return errors.New("megabundle missing blockNumber") - } - for _, encodedTx := range args.Txs { - tx := new(types.Transaction) - if err := tx.UnmarshalBinary(encodedTx); err != nil { - return err - } - txs = append(txs, tx) - } - var minTimestamp, maxTimestamp uint64 - if args.MinTimestamp != nil { - minTimestamp = *args.MinTimestamp - } - if args.MaxTimestamp != nil { - maxTimestamp = *args.MaxTimestamp - } - relayAddr, err := RecoverRelayAddress(args) - log.Info("Megabundle", "relayAddr", relayAddr, "err", err) - if err != nil { - return err - } - return s.b.SendMegabundle(ctx, txs, rpc.BlockNumber(args.BlockNumber), minTimestamp, maxTimestamp, args.RevertingTxHashes, relayAddr) -} - // BundleAPI offers an API for accepting bundled transactions type BundleAPI struct { b Backend diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 4dc9dcdc76b8..2684eeb8ebe9 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -76,7 +76,6 @@ type Backend interface { // Transaction pool API SendTx(ctx context.Context, signedTx *types.Transaction, private bool) error SendBundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash) error - SendMegabundle(ctx context.Context, txs types.Transactions, blockNumber rpc.BlockNumber, minTimestamp uint64, maxTimestamp uint64, revertingTxHashes []common.Hash, relayAddr common.Address) error GetTransaction(ctx context.Context, txHash common.Hash) (*types.Transaction, common.Hash, uint64, uint64, error) GetPoolTransactions() (types.Transactions, error) GetPoolTransaction(txHash common.Hash) *types.Transaction diff --git a/internal/web3ext/web3ext.go b/internal/web3ext/web3ext.go index 48701328b5a6..78733b34b576 100644 --- a/internal/web3ext/web3ext.go +++ b/internal/web3ext/web3ext.go @@ -617,11 +617,6 @@ web3._extend({ params: 3, inputFormatter: [web3._extend.formatters.inputCallFormatter, web3._extend.formatters.inputDefaultBlockNumberFormatter, null], }), - new web3._extend.Method({ - name: 'sendMegabundle', - call: 'eth_sendMegabundle', - params: 1 - }), ], properties: [ new web3._extend.Property({ diff --git a/miner/multi_worker.go b/miner/multi_worker.go index 4330e647695f..cd0068c1e901 100644 --- a/miner/multi_worker.go +++ b/miner/multi_worker.go @@ -97,7 +97,7 @@ func (w *multiWorker) GetSealingBlockAsync(parent common.Hash, timestamp uint64, for _, worker := range append(w.workers, w.regularWorker) { resCh, errCh, err := worker.getSealingBlock(parent, timestamp, coinbase, gasLimit, random, noTxs, noExtra) if err != nil { - log.Error("could not start async block construction", "isFlashbotsWorker", worker.flashbots.isFlashbots, "isMegabundleWorker", worker.flashbots.isMegabundleWorker, "#bundles", worker.flashbots.maxMergedBundles) + log.Error("could not start async block construction", "isFlashbotsWorker", worker.flashbots.isFlashbots, "#bundles", worker.flashbots.maxMergedBundles) continue } resChans = append(resChans, resChPair{resCh, errCh}) @@ -153,38 +153,12 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons for i := 1; i <= config.MaxMergedBundles; i++ { workers = append(workers, newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ - isFlashbots: true, - isMegabundleWorker: false, - queue: queue, - maxMergedBundles: i, + isFlashbots: true, + queue: queue, + maxMergedBundles: i, })) } - relayWorkerMap := make(map[common.Address]*worker) - - for i := 0; i < len(config.TrustedRelays); i++ { - relayWorker := newWorker(config, chainConfig, engine, eth, mux, isLocalBlock, init, &flashbotsData{ - isFlashbots: true, - isMegabundleWorker: true, - queue: queue, - relayAddr: config.TrustedRelays[i], - }) - workers = append(workers, relayWorker) - relayWorkerMap[config.TrustedRelays[i]] = relayWorker - } - - eth.TxPool().NewMegabundleHooks = append(eth.TxPool().NewMegabundleHooks, func(relayAddr common.Address, megabundle *types.MevBundle) { - worker, found := relayWorkerMap[relayAddr] - if !found { - return - } - - select { - case worker.newMegabundleCh <- megabundle: - default: - } - }) - log.Info("creating multi worker", "config.MaxMergedBundles", config.MaxMergedBundles, "config.TrustedRelays", config.TrustedRelays, "worker", len(workers)) return &multiWorker{ regularWorker: regularWorker, @@ -193,9 +167,8 @@ func newMultiWorker(config *Config, chainConfig *params.ChainConfig, engine cons } type flashbotsData struct { - isFlashbots bool - isMegabundleWorker bool - queue chan *task - maxMergedBundles int - relayAddr common.Address + isFlashbots bool + queue chan *task + maxMergedBundles int + relayAddr common.Address } diff --git a/miner/worker.go b/miner/worker.go index bd829fb7c240..dbeb21be0201 100644 --- a/miner/worker.go +++ b/miner/worker.go @@ -164,10 +164,9 @@ type task struct { block *types.Block createdAt time.Time - profit *big.Int - isFlashbots bool - worker int - isMegabundle bool + profit *big.Int + isFlashbots bool + worker int } const ( @@ -233,7 +232,6 @@ type worker struct { exitCh chan struct{} resubmitIntervalCh chan time.Duration resubmitAdjustCh chan *intervalAdjust - newMegabundleCh chan *types.MevBundle wg sync.WaitGroup @@ -583,11 +581,6 @@ func (w *worker) newWorkLoop(recommit time.Duration) { timestamp = time.Now().Unix() commit(false, commitInterruptNewHead) - case <-w.newMegabundleCh: - if w.isRunning() { - commit(true, commitInterruptNone) - } - case <-timer.C: // If sealing is running resubmit a new work cycle periodically to pull in // higher priced transactions. Disable this overhead for pending blocks. @@ -796,7 +789,7 @@ func (w *worker) taskLoop() { // Interrupt previous sealing operation interrupt() stopCh, prev = make(chan struct{}), sealHash - log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker, "isMegabundle", task.isMegabundle) + log.Info("Proposed miner block", "blockNumber", task.block.Number(), "profit", ethIntToFloat(prevProfit), "isFlashbots", task.isFlashbots, "sealhash", sealHash, "parentHash", prevParentHash, "worker", task.worker) if w.skipSealHook != nil && w.skipSealHook(task) { continue } @@ -1309,7 +1302,7 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC return err } } - if w.flashbots.isFlashbots && !w.flashbots.isMegabundleWorker { + if w.flashbots.isFlashbots { bundles, err := w.eth.TxPool().MevBundles(env.header.Number, env.header.Time) if err != nil { log.Error("Failed to fetch pending transactions", "err", err) @@ -1330,40 +1323,6 @@ func (w *worker) fillTransactions(interrupt *int32, env *environment, validatorC } env.profit.Add(env.profit, bundle.ethSentToCoinbase) } - if w.flashbots.isMegabundleWorker { - megabundle, err := w.eth.TxPool().GetMegabundle(w.flashbots.relayAddr, env.header.Number, env.header.Time) - log.Info("Starting to process a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "error", err) - if err != nil { - return err // no valid megabundle for this relay, nothing to do - } - - // Flashbots bundle merging duplicates work by simulating TXes and then committing them once more. - // Megabundles API focuses on speed and runs everything in one cycle. - coinbaseBalanceBefore := env.state.GetBalance(env.coinbase) - if err := w.commitBundle(env, megabundle.Txs, interrupt); err != nil { - log.Info("Could not commit a Megabundle", "relay", w.flashbots.relayAddr, "megabundle", megabundle, "err", err) - return err - } - var txStatuses = map[common.Hash]bool{} - for _, receipt := range env.receipts { - txStatuses[receipt.TxHash] = receipt.Status == types.ReceiptStatusSuccessful - } - for _, tx := range megabundle.Txs { - status, ok := txStatuses[tx.Hash()] - if !ok { - log.Error("No TX receipt after megabundle simulation", "TxHash", tx.Hash()) - return errors.New("no tx receipt after megabundle simulation") - } - if !status && !containsHash(megabundle.RevertingTxHashes, tx.Hash()) { - log.Info("Ignoring megabundle because of failing TX", "relay", w.flashbots.relayAddr, "TxHash", tx.Hash()) - return errors.New("megabundle contains failing tx") - } - } - coinbaseBalanceAfter := env.state.GetBalance(env.coinbase) - coinbaseDelta := big.NewInt(0).Sub(coinbaseBalanceAfter, coinbaseBalanceBefore) - env.profit = coinbaseDelta - log.Info("Megabundle processed", "relay", w.flashbots.relayAddr, "totalProfit", ethIntToFloat(env.profit)) - } if len(localTxs) > 0 { txs := types.NewTransactionsByPriceAndNonce(env.signer, localTxs, env.header.BaseFee) @@ -1524,7 +1483,7 @@ func (w *worker) commit(env *environment, interval func(), update bool, start ti // If we're post merge, just ignore if !w.isTTDReached(block.Header()) { select { - case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles, isMegabundle: w.flashbots.isMegabundleWorker}: + case w.taskCh <- &task{receipts: env.receipts, state: env.state, block: block, createdAt: time.Now(), profit: env.profit, isFlashbots: w.flashbots.isFlashbots, worker: w.flashbots.maxMergedBundles}: w.unconfirmed.Shift(block.NumberU64() - 1) fees := totalFees(block, env.receipts)