diff --git a/scenarios/blob-combined/blob_combined.go b/scenarios/blob-combined/blob_combined.go index 7da15d4..3e6f753 100644 --- a/scenarios/blob-combined/blob_combined.go +++ b/scenarios/blob-combined/blob_combined.go @@ -6,6 +6,7 @@ import ( "math/big" "math/rand" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/core/types" @@ -94,13 +95,12 @@ func (s *Scenario) Init(testerCfg *tester.TesterConfig) error { func (s *Scenario) Run(tester *tester.Tester) error { s.tester = tester txIdxCounter := uint64(0) - counterMutex := sync.Mutex{} - waitGroup := sync.WaitGroup{} - pendingCount := uint64(0) - txCount := uint64(0) + pendingCount := atomic.Int64{} + txCount := atomic.Uint64{} startTime := time.Now() + var lastChan chan bool - s.logger.Infof("starting scenario: combined") + s.logger.Infof("starting scenario: blob-combined") for { txIdx := txIdxCounter @@ -110,17 +110,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { // await pending transactions s.pendingChan <- true } - waitGroup.Add(1) - counterMutex.Lock() - pendingCount++ - counterMutex.Unlock() + pendingCount.Add(1) + currentChan := make(chan bool, 1) - go func(txIdx uint64) { + go func(txIdx uint64, lastChan, currentChan chan bool) { defer func() { - counterMutex.Lock() - pendingCount-- - counterMutex.Unlock() - waitGroup.Done() + pendingCount.Add(-1) + currentChan <- true }() logger := s.logger @@ -134,6 +130,10 @@ func (s *Scenario) Run(tester *tester.Tester) error { if wallet != nil { logger = logger.WithField("wallet", s.tester.GetWalletIndex(wallet.GetAddress())) } + if lastChan != nil { + <-lastChan + close(lastChan) + } if err != nil { logger.Warnf("blob tx %6d.0 failed: %v", txIdx+1, err) if s.pendingChan != nil { @@ -142,13 +142,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { return } - counterMutex.Lock() - txCount++ - counterMutex.Unlock() + txCount.Add(1) logger.Infof("blob tx %6d.0 sent: %v (%v sidecars)", txIdx+1, tx.Hash().String(), len(tx.BlobTxSidecar().Blobs)) - }(txIdx) + }(txIdx, lastChan, currentChan) + + lastChan = currentChan - count := txCount + pendingCount + count := txCount.Load() + uint64(pendingCount.Load()) if s.options.TotalCount > 0 && count >= s.options.TotalCount { break } @@ -158,7 +158,8 @@ func (s *Scenario) Run(tester *tester.Tester) error { } } } - waitGroup.Wait() + <-lastChan + close(lastChan) s.logger.Infof("finished sending transactions, awaiting block inclusion...") s.pendingWGroup.Wait() @@ -279,6 +280,7 @@ func (s *Scenario) sendBlobTx(txIdx uint64, replacementIdx uint64, txNonce uint6 awaitConfirmation = false if replacementIdx == 0 { if s.pendingChan != nil { + time.Sleep(100 * time.Millisecond) <-s.pendingChan } } @@ -311,7 +313,7 @@ func (s *Scenario) sendBlobTx(txIdx uint64, replacementIdx uint64, txNonce uint6 gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000)) gweiBlobFee := new(big.Int).Div(blobGasPrice, big.NewInt(1000000000)) - s.logger.WithField("client", client.GetName()).Infof("blob tx %6d.%v confirmed in block #%v! total fee: %v gwei (base: %v, blob: %v)", txIdx+1, replacementIdx, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee) + s.logger.WithField("client", client.GetName()).Debugf("blob tx %6d.%v confirmed in block #%v! total fee: %v gwei (base: %v, blob: %v)", txIdx+1, replacementIdx, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee) }, LogFn: func(client *txbuilder.Client, retry int, rebroadcast int, err error) { logger := s.logger.WithField("client", client.GetName()) diff --git a/scenarios/blob-conflicting/blob_conflicting.go b/scenarios/blob-conflicting/blob_conflicting.go index 92c1f39..ffcd514 100644 --- a/scenarios/blob-conflicting/blob_conflicting.go +++ b/scenarios/blob-conflicting/blob_conflicting.go @@ -6,6 +6,7 @@ import ( "math/big" "math/rand" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/core/types" @@ -90,13 +91,12 @@ func (s *Scenario) Init(testerCfg *tester.TesterConfig) error { func (s *Scenario) Run(tester *tester.Tester) error { s.tester = tester txIdxCounter := uint64(0) - counterMutex := sync.Mutex{} - waitGroup := sync.WaitGroup{} - pendingCount := uint64(0) - txCount := uint64(0) + pendingCount := atomic.Int64{} + txCount := atomic.Uint64{} startTime := time.Now() + var lastChan chan bool - s.logger.Infof("starting scenario: conflicting") + s.logger.Infof("starting scenario: blob-conflicting") for { txIdx := txIdxCounter @@ -106,17 +106,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { // await pending transactions s.pendingChan <- true } - waitGroup.Add(1) - counterMutex.Lock() - pendingCount++ - counterMutex.Unlock() + pendingCount.Add(1) + currentChan := make(chan bool, 1) - go func(txIdx uint64) { + go func(txIdx uint64, lastChan, currentChan chan bool) { defer func() { - counterMutex.Lock() - pendingCount-- - counterMutex.Unlock() - waitGroup.Done() + pendingCount.Add(-1) + currentChan <- true }() logger := s.logger @@ -130,19 +126,23 @@ func (s *Scenario) Run(tester *tester.Tester) error { if wallet != nil { logger = logger.WithField("wallet", s.tester.GetWalletIndex(wallet.GetAddress())) } + if lastChan != nil { + <-lastChan + close(lastChan) + } if err != nil { logger.Warnf("could not send blob transaction: %v", err) <-s.pendingChan return } - counterMutex.Lock() - txCount++ - counterMutex.Unlock() + txCount.Add(1) logger.Infof("sent blob tx #%6d: %v (%v sidecars)", txIdx+1, tx.Hash().String(), len(tx.BlobTxSidecar().Blobs)) - }(txIdx) + }(txIdx, lastChan, currentChan) + + lastChan = currentChan - count := txCount + pendingCount + count := txCount.Load() + uint64(pendingCount.Load()) if s.options.TotalCount > 0 && count >= s.options.TotalCount { break } @@ -152,7 +152,8 @@ func (s *Scenario) Run(tester *tester.Tester) error { } } } - waitGroup.Wait() + <-lastChan + close(lastChan) s.logger.Infof("finished sending transactions, awaiting block inclusion...") s.pendingWGroup.Wait() @@ -271,6 +272,7 @@ func (s *Scenario) sendBlobTx(txIdx uint64) (*types.Transaction, *txbuilder.Clie OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) { defer func() { if s.pendingChan != nil { + time.Sleep(100 * time.Millisecond) <-s.pendingChan } s.pendingWGroup.Done() @@ -385,5 +387,5 @@ func (s *Scenario) processTxReceipt(txIdx uint64, tx *types.Transaction, receipt gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000)) gweiBlobFee := new(big.Int).Div(blobGasPrice, big.NewInt(1000000000)) - s.logger.WithField("client", client.GetName()).Infof(" transaction %d/%v confirmed in block #%v. total fee: %v gwei (base: %v, blob: %v)", txIdx+1, txLabel, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee) + s.logger.WithField("client", client.GetName()).Debugf(" transaction %d/%v confirmed in block #%v. total fee: %v gwei (base: %v, blob: %v)", txIdx+1, txLabel, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee) } diff --git a/scenarios/blob-replacements/blob_replacements.go b/scenarios/blob-replacements/blob_replacements.go index df90f4f..caff320 100644 --- a/scenarios/blob-replacements/blob_replacements.go +++ b/scenarios/blob-replacements/blob_replacements.go @@ -6,6 +6,7 @@ import ( "math/big" "math/rand" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/core/types" @@ -94,13 +95,12 @@ func (s *Scenario) Init(testerCfg *tester.TesterConfig) error { func (s *Scenario) Run(tester *tester.Tester) error { s.tester = tester txIdxCounter := uint64(0) - counterMutex := sync.Mutex{} - waitGroup := sync.WaitGroup{} - pendingCount := uint64(0) - txCount := uint64(0) + pendingCount := atomic.Int64{} + txCount := atomic.Uint64{} startTime := time.Now() + var lastChan chan bool - s.logger.Infof("starting scenario: replacements") + s.logger.Infof("starting scenario: blob-replacements") for { txIdx := txIdxCounter @@ -110,17 +110,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { // await pending transactions s.pendingChan <- true } - waitGroup.Add(1) - counterMutex.Lock() - pendingCount++ - counterMutex.Unlock() + pendingCount.Add(1) + currentChan := make(chan bool, 1) - go func(txIdx uint64) { + go func(txIdx uint64, lastChan, currentChan chan bool) { defer func() { - counterMutex.Lock() - pendingCount-- - counterMutex.Unlock() - waitGroup.Done() + pendingCount.Add(-1) + currentChan <- true }() logger := s.logger @@ -136,13 +132,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { return } - counterMutex.Lock() - txCount++ - counterMutex.Unlock() + txCount.Add(1) logger.Infof("blob tx %6d.0 sent: %v (%v sidecars), wallet: %v, nonce: %v", txIdx+1, tx.Hash().String(), len(tx.BlobTxSidecar().Blobs), s.tester.GetWalletIndex(wallet.GetAddress()), tx.Nonce()) - }(txIdx) + }(txIdx, lastChan, currentChan) - count := txCount + pendingCount + lastChan = currentChan + + count := txCount.Load() + uint64(pendingCount.Load()) if s.options.TotalCount > 0 && count >= s.options.TotalCount { break } @@ -152,7 +148,8 @@ func (s *Scenario) Run(tester *tester.Tester) error { } } } - waitGroup.Wait() + <-lastChan + close(lastChan) s.logger.Infof("finished sending transactions, awaiting block inclusion...") s.pendingWGroup.Wait() @@ -273,6 +270,7 @@ func (s *Scenario) sendBlobTx(txIdx uint64, replacementIdx uint64, txNonce uint6 awaitConfirmation = false if replacementIdx == 0 { if s.pendingChan != nil { + time.Sleep(100 * time.Millisecond) <-s.pendingChan } } @@ -305,7 +303,7 @@ func (s *Scenario) sendBlobTx(txIdx uint64, replacementIdx uint64, txNonce uint6 gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000)) gweiBlobFee := new(big.Int).Div(blobGasPrice, big.NewInt(1000000000)) - s.logger.WithField("client", client.GetName()).Infof("blob tx %6d.%v confirmed in block #%v! total fee: %v gwei (base: %v, blob: %v)", txIdx+1, replacementIdx, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee) + s.logger.WithField("client", client.GetName()).Debugf("blob tx %6d.%v confirmed in block #%v! total fee: %v gwei (base: %v, blob: %v)", txIdx+1, replacementIdx, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee) }, LogFn: func(client *txbuilder.Client, retry int, rebroadcast int, err error) { logger := s.logger.WithField("client", client.GetName()) diff --git a/scenarios/blobs/blobs.go b/scenarios/blobs/blobs.go index 05046c7..d00de54 100644 --- a/scenarios/blobs/blobs.go +++ b/scenarios/blobs/blobs.go @@ -6,6 +6,7 @@ import ( "math/big" "math/rand" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/core/types" @@ -90,11 +91,10 @@ func (s *Scenario) Init(testerCfg *tester.TesterConfig) error { func (s *Scenario) Run(tester *tester.Tester) error { s.tester = tester txIdxCounter := uint64(0) - counterMutex := sync.Mutex{} - waitGroup := sync.WaitGroup{} - pendingCount := uint64(0) - txCount := uint64(0) + pendingCount := atomic.Int64{} + txCount := atomic.Uint64{} startTime := time.Now() + var lastChan chan bool s.logger.Infof("starting scenario: normal") @@ -106,17 +106,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { // await pending transactions s.pendingChan <- true } - waitGroup.Add(1) - counterMutex.Lock() - pendingCount++ - counterMutex.Unlock() + pendingCount.Add(1) + currentChan := make(chan bool, 1) - go func(txIdx uint64) { + go func(txIdx uint64, lastChan, currentChan chan bool) { defer func() { - counterMutex.Lock() - pendingCount-- - counterMutex.Unlock() - waitGroup.Done() + pendingCount.Add(-1) + currentChan <- true }() logger := s.logger @@ -130,19 +126,23 @@ func (s *Scenario) Run(tester *tester.Tester) error { if wallet != nil { logger = logger.WithField("wallet", s.tester.GetWalletIndex(wallet.GetAddress())) } + if lastChan != nil { + <-lastChan + close(lastChan) + } if err != nil { logger.Warnf("could not send blob transaction: %v", err) <-s.pendingChan return } - counterMutex.Lock() - txCount++ - counterMutex.Unlock() + txCount.Add(1) logger.Infof("sent blob tx #%6d: %v (%v sidecars)", txIdx+1, tx.Hash().String(), len(tx.BlobTxSidecar().Blobs)) - }(txIdx) + }(txIdx, lastChan, currentChan) + + lastChan = currentChan - count := txCount + pendingCount + count := txCount.Load() + uint64(pendingCount.Load()) if s.options.TotalCount > 0 && count >= s.options.TotalCount { break } @@ -152,7 +152,8 @@ func (s *Scenario) Run(tester *tester.Tester) error { } } } - waitGroup.Wait() + <-lastChan + close(lastChan) s.logger.Infof("finished sending transactions, awaiting block inclusion...") s.pendingWGroup.Wait() @@ -230,7 +231,7 @@ func (s *Scenario) sendBlobTx(txIdx uint64) (*types.Transaction, *txbuilder.Clie Value: uint256.NewInt(0), }, blobRefs) if err != nil { - return nil, nil, wallet, err + return nil, client, wallet, err } tx, err := wallet.BuildBlobTx(blobTx) @@ -251,6 +252,7 @@ func (s *Scenario) sendBlobTx(txIdx uint64) (*types.Transaction, *txbuilder.Clie OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) { defer func() { if s.pendingChan != nil { + time.Sleep(100 * time.Millisecond) <-s.pendingChan } s.pendingWGroup.Done() @@ -280,7 +282,7 @@ func (s *Scenario) sendBlobTx(txIdx uint64) (*types.Transaction, *txbuilder.Clie gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000)) gweiBlobFee := new(big.Int).Div(blobGasPrice, big.NewInt(1000000000)) - s.logger.WithField("client", client.GetName()).Infof(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v, blob: %v)", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee) + s.logger.WithField("client", client.GetName()).Debugf(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v, blob: %v)", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, gweiBlobFee) }, LogFn: func(client *txbuilder.Client, retry int, rebroadcast int, err error) { logger := s.logger.WithField("client", client.GetName()) diff --git a/scenarios/deploy-destruct/deploy_destruct.go b/scenarios/deploy-destruct/deploy_destruct.go index 1a4bc89..7e199d4 100644 --- a/scenarios/deploy-destruct/deploy_destruct.go +++ b/scenarios/deploy-destruct/deploy_destruct.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -97,11 +98,10 @@ func (s *Scenario) Init(testerCfg *tester.TesterConfig) error { func (s *Scenario) Run(tester *tester.Tester) error { s.tester = tester txIdxCounter := uint64(0) - counterMutex := sync.Mutex{} - waitGroup := sync.WaitGroup{} - pendingCount := uint64(0) - txCount := uint64(0) + pendingCount := atomic.Int64{} + txCount := atomic.Uint64{} startTime := time.Now() + var lastChan chan bool s.logger.Infof("starting scenario: deploy-destruct") contractReceipt, _, err := s.sendDeploymentTx() @@ -120,17 +120,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { // await pending transactions s.pendingChan <- true } - waitGroup.Add(1) - counterMutex.Lock() - pendingCount++ - counterMutex.Unlock() + pendingCount.Add(1) + currentChan := make(chan bool, 1) - go func(txIdx uint64) { + go func(txIdx uint64, lastChan, currentChan chan bool) { defer func() { - counterMutex.Lock() - pendingCount-- - counterMutex.Unlock() - waitGroup.Done() + pendingCount.Add(-1) + currentChan <- true }() logger := s.logger @@ -144,19 +140,27 @@ func (s *Scenario) Run(tester *tester.Tester) error { if wallet != nil { logger = logger.WithField("wallet", s.tester.GetWalletIndex(wallet.GetAddress())) } + if lastChan != nil { + <-lastChan + close(lastChan) + } if err != nil { logger.Warnf("could not send transaction: %v", err) <-s.pendingChan return } - counterMutex.Lock() - txCount++ - counterMutex.Unlock() + txCount.Add(1) + if lastChan != nil { + <-lastChan + close(lastChan) + } logger.Infof("sent tx #%6d: %v", txIdx+1, tx.Hash().String()) - }(txIdx) + }(txIdx, lastChan, currentChan) + + lastChan = currentChan - count := txCount + pendingCount + count := txCount.Load() + uint64(pendingCount.Load()) if s.options.TotalCount > 0 && count >= s.options.TotalCount { break } @@ -166,9 +170,6 @@ func (s *Scenario) Run(tester *tester.Tester) error { } } } - waitGroup.Wait() - - s.logger.Infof("finished sending transactions, awaiting block inclusion...") s.pendingWGroup.Wait() s.logger.Infof("finished sending transactions, awaiting block inclusion...") @@ -314,6 +315,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) { defer func() { if s.pendingChan != nil { + time.Sleep(100 * time.Millisecond) <-s.pendingChan } s.pendingWGroup.Done() @@ -338,7 +340,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, gweiTotalFee := new(big.Int).Div(feeAmount, big.NewInt(1000000000)) gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000)) - s.logger.WithField("client", client.GetName()).Infof(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) + s.logger.WithField("client", client.GetName()).Debugf(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) }, LogFn: func(client *txbuilder.Client, retry int, rebroadcast int, err error) { logger := s.logger.WithField("client", client.GetName()) diff --git a/scenarios/deploytx/deploytx.go b/scenarios/deploytx/deploytx.go index 28425a2..8528c31 100644 --- a/scenarios/deploytx/deploytx.go +++ b/scenarios/deploytx/deploytx.go @@ -8,6 +8,7 @@ import ( "os" "strings" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -120,11 +121,10 @@ func (s *Scenario) Init(testerCfg *tester.TesterConfig) error { func (s *Scenario) Run(tester *tester.Tester) error { s.tester = tester txIdxCounter := uint64(0) - counterMutex := sync.Mutex{} - waitGroup := sync.WaitGroup{} - pendingCount := uint64(0) - txCount := uint64(0) + pendingCount := atomic.Int64{} + txCount := atomic.Uint64{} startTime := time.Now() + var lastChan chan bool s.logger.Infof("starting scenario: deploytx") @@ -136,17 +136,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { // await pending transactions s.pendingChan <- true } - waitGroup.Add(1) - counterMutex.Lock() - pendingCount++ - counterMutex.Unlock() + pendingCount.Add(1) + currentChan := make(chan bool, 1) - go func(txIdx uint64) { + go func(txIdx uint64, lastChan, currentChan chan bool) { defer func() { - counterMutex.Lock() - pendingCount-- - counterMutex.Unlock() - waitGroup.Done() + pendingCount.Add(-1) + currentChan <- true }() logger := s.logger @@ -160,19 +156,23 @@ func (s *Scenario) Run(tester *tester.Tester) error { if wallet != nil { logger = logger.WithField("wallet", s.tester.GetWalletIndex(wallet.GetAddress())) } + if lastChan != nil { + <-lastChan + close(lastChan) + } if err != nil { logger.Warnf("could not send transaction: %v", err) <-s.pendingChan return } - counterMutex.Lock() - txCount++ - counterMutex.Unlock() + txCount.Add(1) logger.Infof("sent tx #%6d: %v", txIdx+1, tx.Hash().String()) - }(txIdx) + }(txIdx, lastChan, currentChan) + + lastChan = currentChan - count := txCount + pendingCount + count := txCount.Load() + uint64(pendingCount.Load()) if s.options.TotalCount > 0 && count >= s.options.TotalCount { break } @@ -182,9 +182,6 @@ func (s *Scenario) Run(tester *tester.Tester) error { } } } - waitGroup.Wait() - - s.logger.Infof("finished sending transactions, awaiting block inclusion...") s.pendingWGroup.Wait() s.logger.Infof("finished sending transactions, awaiting block inclusion...") @@ -251,6 +248,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) { defer func() { if s.pendingChan != nil { + time.Sleep(100 * time.Millisecond) <-s.pendingChan } s.pendingWGroup.Done() @@ -275,7 +273,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, gweiTotalFee := new(big.Int).Div(feeAmount, big.NewInt(1000000000)) gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000)) - s.logger.WithField("client", client.GetName()).Infof(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) + s.logger.WithField("client", client.GetName()).Debugf(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) }, LogFn: func(client *txbuilder.Client, retry int, rebroadcast int, err error) { logger := s.logger.WithField("client", client.GetName()) diff --git a/scenarios/eoatx/eoatx.go b/scenarios/eoatx/eoatx.go index 996086b..62a628e 100644 --- a/scenarios/eoatx/eoatx.go +++ b/scenarios/eoatx/eoatx.go @@ -7,6 +7,7 @@ import ( "math/big" "strings" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/common" @@ -98,14 +99,14 @@ func (s *Scenario) Init(testerCfg *tester.TesterConfig) error { func (s *Scenario) Run(tester *tester.Tester) error { s.tester = tester txIdxCounter := uint64(0) - counterMutex := sync.Mutex{} - waitGroup := sync.WaitGroup{} - pendingCount := uint64(0) - txCount := uint64(0) + pendingCount := atomic.Int64{} + txCount := atomic.Uint64{} startTime := time.Now() s.logger.Infof("starting scenario: eoatx") + var lastChan chan bool + for { txIdx := txIdxCounter txIdxCounter++ @@ -114,17 +115,14 @@ func (s *Scenario) Run(tester *tester.Tester) error { // await pending transactions s.pendingChan <- true } - waitGroup.Add(1) - counterMutex.Lock() - pendingCount++ - counterMutex.Unlock() + pendingCount.Add(1) + + currentChan := make(chan bool, 1) - go func(txIdx uint64) { + go func(txIdx uint64, lastChan, currentChan chan bool) { defer func() { - counterMutex.Lock() - pendingCount-- - counterMutex.Unlock() - waitGroup.Done() + pendingCount.Add(-1) + currentChan <- true }() logger := s.logger @@ -138,19 +136,23 @@ func (s *Scenario) Run(tester *tester.Tester) error { if wallet != nil { logger = logger.WithField("wallet", s.tester.GetWalletIndex(wallet.GetAddress())) } + if lastChan != nil { + <-lastChan + close(lastChan) + } if err != nil { logger.Warnf("could not send transaction: %v", err) <-s.pendingChan return } - counterMutex.Lock() - txCount++ - counterMutex.Unlock() + txCount.Add(1) logger.Infof("sent tx #%6d: %v", txIdx+1, tx.Hash().String()) - }(txIdx) + }(txIdx, lastChan, currentChan) - count := txCount + pendingCount + lastChan = currentChan + + count := txCount.Load() + uint64(pendingCount.Load()) if s.options.TotalCount > 0 && count >= s.options.TotalCount { break } @@ -160,7 +162,9 @@ func (s *Scenario) Run(tester *tester.Tester) error { } } } - waitGroup.Wait() + + <-lastChan + close(lastChan) s.logger.Infof("finished sending transactions, awaiting block inclusion...") s.pendingWGroup.Wait() @@ -255,6 +259,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) { defer func() { if s.pendingChan != nil { + time.Sleep(100 * time.Millisecond) <-s.pendingChan } s.pendingWGroup.Done() @@ -279,7 +284,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, gweiTotalFee := new(big.Int).Div(feeAmount, big.NewInt(1000000000)) gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000)) - s.logger.WithField("client", client.GetName()).Infof(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) + s.logger.WithField("client", client.GetName()).Debugf(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) }, LogFn: func(client *txbuilder.Client, retry int, rebroadcast int, err error) { logger := s.logger.WithField("client", client.GetName()) diff --git a/scenarios/erctx/erctx.go b/scenarios/erctx/erctx.go index 6b5e925..3dcf28f 100644 --- a/scenarios/erctx/erctx.go +++ b/scenarios/erctx/erctx.go @@ -6,6 +6,7 @@ import ( "fmt" "math/big" "sync" + "sync/atomic" "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -97,11 +98,10 @@ func (s *Scenario) Init(testerCfg *tester.TesterConfig) error { func (s *Scenario) Run(tester *tester.Tester) error { s.tester = tester txIdxCounter := uint64(0) - counterMutex := sync.Mutex{} - waitGroup := sync.WaitGroup{} - pendingCount := uint64(0) - txCount := uint64(0) + pendingCount := atomic.Int64{} + txCount := atomic.Uint64{} startTime := time.Now() + var lastChan chan bool s.logger.Infof("starting scenario: erctx") contractReceipt, _, err := s.sendDeploymentTx() @@ -120,17 +120,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { // await pending transactions s.pendingChan <- true } - waitGroup.Add(1) - counterMutex.Lock() - pendingCount++ - counterMutex.Unlock() + pendingCount.Add(1) + currentChan := make(chan bool, 1) - go func(txIdx uint64) { + go func(txIdx uint64, lastChan, currentChan chan bool) { defer func() { - counterMutex.Lock() - pendingCount-- - counterMutex.Unlock() - waitGroup.Done() + pendingCount.Add(-1) + currentChan <- true }() logger := s.logger @@ -144,19 +140,23 @@ func (s *Scenario) Run(tester *tester.Tester) error { if wallet != nil { logger = logger.WithField("wallet", s.tester.GetWalletIndex(wallet.GetAddress())) } + if lastChan != nil { + <-lastChan + close(lastChan) + } if err != nil { logger.Warnf("could not send transaction: %v", err) <-s.pendingChan return } - counterMutex.Lock() - txCount++ - counterMutex.Unlock() + txCount.Add(1) logger.Infof("sent tx #%6d: %v", txIdx+1, tx.Hash().String()) - }(txIdx) + }(txIdx, lastChan, currentChan) + + lastChan = currentChan - count := txCount + pendingCount + count := txCount.Load() + uint64(pendingCount.Load()) if s.options.TotalCount > 0 && count >= s.options.TotalCount { break } @@ -166,7 +166,9 @@ func (s *Scenario) Run(tester *tester.Tester) error { } } } - waitGroup.Wait() + + <-lastChan + close(lastChan) s.logger.Infof("finished sending transactions, awaiting block inclusion...") s.pendingWGroup.Wait() @@ -322,6 +324,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) { defer func() { if s.pendingChan != nil { + time.Sleep(100 * time.Millisecond) <-s.pendingChan } s.pendingWGroup.Done() @@ -346,7 +349,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, gweiTotalFee := new(big.Int).Div(feeAmount, big.NewInt(1000000000)) gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000)) - s.logger.WithField("client", client.GetName()).Infof(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) + s.logger.WithField("client", client.GetName()).Debugf(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) }, LogFn: func(client *txbuilder.Client, retry int, rebroadcast int, err error) { logger := s.logger.WithField("client", client.GetName()) diff --git a/scenarios/gasburnertx/gasburnertx.go b/scenarios/gasburnertx/gasburnertx.go index a068cb1..2768c8c 100644 --- a/scenarios/gasburnertx/gasburnertx.go +++ b/scenarios/gasburnertx/gasburnertx.go @@ -5,6 +5,7 @@ import ( "fmt" "math/big" "sync" + "sync/atomic" "time" "github.com/ethpandaops/spamoor/utils" @@ -92,11 +93,10 @@ func (s *Scenario) Init(testerCfg *tester.TesterConfig) error { func (s *Scenario) Run(tester *tester.Tester) error { s.tester = tester txIdxCounter := uint64(0) - counterMutex := sync.Mutex{} - waitGroup := sync.WaitGroup{} - pendingCount := uint64(0) - txCount := uint64(0) + pendingCount := atomic.Int64{} + txCount := atomic.Uint64{} startTime := time.Now() + var lastChan chan bool s.logger.Infof("starting scenario: gasburnertx") @@ -118,17 +118,13 @@ func (s *Scenario) Run(tester *tester.Tester) error { // await pending transactions s.pendingChan <- true } - waitGroup.Add(1) - counterMutex.Lock() - pendingCount++ - counterMutex.Unlock() + pendingCount.Add(1) + currentChan := make(chan bool, 1) - go func(txIdx uint64) { + go func(txIdx uint64, lastChan, currentChan chan bool) { defer func() { - counterMutex.Lock() - pendingCount-- - counterMutex.Unlock() - waitGroup.Done() + pendingCount.Add(-1) + currentChan <- true }() logger := s.logger @@ -142,19 +138,23 @@ func (s *Scenario) Run(tester *tester.Tester) error { if wallet != nil { logger = logger.WithField("wallet", s.tester.GetWalletIndex(wallet.GetAddress())) } + if lastChan != nil { + <-lastChan + close(lastChan) + } if err != nil { logger.Warnf("could not send transaction: %v", err) <-s.pendingChan return } - counterMutex.Lock() - txCount++ - counterMutex.Unlock() + txCount.Add(1) logger.Infof("sent tx #%6d: %v", txIdx+1, tx.Hash().String()) - }(txIdx) + }(txIdx, lastChan, currentChan) + + lastChan = currentChan - count := txCount + pendingCount + count := txCount.Load() + uint64(pendingCount.Load()) if s.options.TotalCount > 0 && count >= s.options.TotalCount { break } @@ -164,9 +164,10 @@ func (s *Scenario) Run(tester *tester.Tester) error { } } } - waitGroup.Wait() - s.logger.Infof("finished sending transactions, awaiting block inclusion...") + <-lastChan + close(lastChan) + s.pendingWGroup.Wait() s.logger.Infof("finished sending transactions, awaiting block inclusion...") @@ -300,6 +301,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, OnConfirm: func(tx *types.Transaction, receipt *types.Receipt, err error) { defer func() { if s.pendingChan != nil { + time.Sleep(100 * time.Millisecond) <-s.pendingChan } s.pendingWGroup.Done() @@ -324,7 +326,7 @@ func (s *Scenario) sendTx(txIdx uint64) (*types.Transaction, *txbuilder.Client, gweiTotalFee := new(big.Int).Div(feeAmount, big.NewInt(1000000000)) gweiBaseFee := new(big.Int).Div(effectiveGasPrice, big.NewInt(1000000000)) - s.logger.WithField("client", client.GetName()).Infof(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) + s.logger.WithField("client", client.GetName()).Debugf(" transaction %d confirmed in block #%v. total fee: %v gwei (base: %v) logs: %v", txIdx+1, receipt.BlockNumber.String(), gweiTotalFee, gweiBaseFee, len(receipt.Logs)) }, LogFn: func(client *txbuilder.Client, retry int, rebroadcast int, err error) { logger := s.logger.WithField("client", client.GetName()) diff --git a/txbuilder/txpool.go b/txbuilder/txpool.go index 7a8a889..3df7079 100644 --- a/txbuilder/txpool.go +++ b/txbuilder/txpool.go @@ -128,6 +128,7 @@ func (pool *TxPool) processBlock(blockNumber uint64) error { return nil } + t1 := time.Now() blockBody := pool.getBlockBody(blockNumber) if blockBody == nil { return fmt.Errorf("could not load block body") @@ -139,9 +140,12 @@ func (pool *TxPool) processBlock(blockNumber uint64) error { return fmt.Errorf("could not load block receipts") } - logrus.Debugf("processing block %v (%v transactions)", blockNumber, txCount) + loadingTime := time.Since(t1) + t1 = time.Now() signer := types.LatestSignerForChainID(chainId) + confirmCount := 0 + walletMap := map[common.Address]bool{} for idx, tx := range blockBody.Transactions() { receipt := receipts[idx] @@ -158,6 +162,8 @@ func (pool *TxPool) processBlock(blockNumber uint64) error { fromWallet := pool.getWallet(txFrom) if fromWallet != nil { + confirmCount++ + walletMap[txFrom] = true pool.processTransactionInclusion(blockNumber, fromWallet, tx, receipt) } @@ -170,6 +176,8 @@ func (pool *TxPool) processBlock(blockNumber uint64) error { } } + logrus.Infof("processed block %v: %v total tx, %v tx confirmed from %v wallets (%v, %v)", blockNumber, txCount, confirmCount, len(walletMap), loadingTime, time.Since(t1)) + return nil } @@ -282,7 +290,6 @@ func (pool *TxPool) SendTransaction(ctx context.Context, wallet *Wallet, tx *typ if options.OnConfirm != nil { defer func() { - time.Sleep(100 * time.Millisecond) // wait 100ms to get "cleaner" log outputs (all confirmations first, then new submissions) options.OnConfirm(tx, receipt, err) }() }