Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(eventindexer): eventindexer post ontake fork #18474

Merged
merged 9 commits into from
Nov 14, 2024
8 changes: 8 additions & 0 deletions packages/eventindexer/cmd/flags/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,13 @@ var (
Category: indexerCategory,
EnvVars: []string{"INDEX_ERC20S"},
}
OntakeForkHeight = &cli.Uint64Flag{
Name: "ontakeForkHeight",
Usage: "Block number ontake fork height happened",
Value: 21134698,
Category: indexerCategory,
EnvVars: []string{"ONTAKE_FORK_HEIGHT"},
}
)

var IndexerFlags = MergeFlags(CommonFlags, []cli.Flag{
Expand All @@ -87,4 +94,5 @@ var IndexerFlags = MergeFlags(CommonFlags, []cli.Flag{
SyncMode,
IndexNFTs,
IndexERC20s,
OntakeForkHeight,
})
2 changes: 2 additions & 0 deletions packages/eventindexer/indexer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
IndexNFTs bool
IndexERC20s bool
Layer string
OntakeForkHeight uint64
OpenDBFunc func() (db.DB, error)
}

Expand All @@ -55,6 +56,7 @@ func NewConfigFromCliContext(c *cli.Context) (*Config, error) {
IndexNFTs: c.Bool(flags.IndexNFTs.Name),
IndexERC20s: c.Bool(flags.IndexERC20s.Name),
Layer: c.String(flags.Layer.Name),
OntakeForkHeight: c.Uint64(flags.OntakeForkHeight.Name),
OpenDBFunc: func() (db.DB, error) {
return db.OpenDBConnection(db.DBConnectionOpts{
Name: c.String(flags.DatabaseUsername.Name),
Expand Down
136 changes: 134 additions & 2 deletions packages/eventindexer/indexer/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,112 @@ func filterFunc(
return nil
}

func filterFuncOntake(
ctx context.Context,
chainID *big.Int,
i *Indexer,
filterOpts *bind.FilterOpts,
) error {
wg, ctx := errgroup.WithContext(ctx)

if i.taikol1 != nil {
wg.Go(func() error {
transitionProvedEvents, err := i.taikol1.FilterTransitionProvedV2(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "i.taikol1.FilterTransitionProved")
}

err = i.saveTransitionProvedEventsV2(ctx, chainID, transitionProvedEvents)
if err != nil {
return errors.Wrap(err, "i.saveTransitionProvedEvents")
}

return nil
})

wg.Go(func() error {
transitionContestedEvents, err := i.taikol1.FilterTransitionContestedV2(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "i.taikol1.FilterTransitionContested")
}

err = i.saveTransitionContestedEventsV2(ctx, chainID, transitionContestedEvents)
if err != nil {
return errors.Wrap(err, "i.saveTransitionContestedEvents")
}

return nil
})

wg.Go(func() error {
blockProposedEvents, err := i.taikol1.FilterBlockProposedV2(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "i.taikol1.FilterBlockProposed")
}

err = i.saveBlockProposedEventsV2(ctx, chainID, blockProposedEvents)
if err != nil {
return errors.Wrap(err, "i.saveBlockProposedEvents")
}

return nil
})

wg.Go(func() error {
blockVerifiedEvents, err := i.taikol1.FilterBlockVerifiedV2(filterOpts, nil, nil)
if err != nil {
return errors.Wrap(err, "i.taikol1.FilterBlockVerified")
}

err = i.saveBlockVerifiedEventsV2(ctx, chainID, blockVerifiedEvents)
if err != nil {
return errors.Wrap(err, "i.saveBlockVerifiedEvents")
}

return nil
})
}

if i.bridge != nil {
wg.Go(func() error {
messagesSent, err := i.bridge.FilterMessageSent(filterOpts, nil)
if err != nil {
return errors.Wrap(err, "i.bridge.FilterMessageSent")
}

err = i.saveMessageSentEvents(ctx, chainID, messagesSent)
if err != nil {
return errors.Wrap(err, "i.saveMessageSentEvents")
}

return nil
})
}

wg.Go(func() error {
if err := i.indexRawBlockData(ctx, chainID, filterOpts.Start, *filterOpts.End); err != nil {
return errors.Wrap(err, "i.indexRawBlockData")
}

return nil
})

err := wg.Wait()

if err != nil {
if errors.Is(err, context.Canceled) {
slog.Error("filter context cancelled")
return err
}

return err
}

return nil
}

func (i *Indexer) filter(
ctx context.Context,
filter FilterFunc,
) error {
endBlockID, err := i.ethClient.BlockNumber(ctx)
if err != nil {
Expand All @@ -138,14 +241,35 @@ func (i *Indexer) filter(
"batchSize", i.blockBatchSize,
)

if i.latestIndexedBlockNumber >= i.ontakeForkHeight {
i.isPostOntakeForkHeightReached = true
}

for j := i.latestIndexedBlockNumber + 1; j <= endBlockID; j += i.blockBatchSize {
end := i.latestIndexedBlockNumber + i.blockBatchSize
end := j + i.blockBatchSize - 1

// if the end of the batch is greater than the latest block number, set end
// to the latest block number
if end > endBlockID {
end = endBlockID
}

if !i.isPostOntakeForkHeightReached && i.taikol1 != nil && i.ontakeForkHeight > i.latestIndexedBlockNumber && i.ontakeForkHeight < end {
slog.Info("ontake fork height reached", "height", i.ontakeForkHeight)

i.isPostOntakeForkHeightReached = true

end = i.ontakeForkHeight - 1

slog.Info("setting end block ID to ontakeForkHeight - 1",
"latestIndexedBlockNumber",
i.latestIndexedBlockNumber,
"ontakeForkHeight", i.ontakeForkHeight,
"endBlockID", end,
"isPostOntakeForkHeightReached", i.isPostOntakeForkHeightReached,
)
}

slog.Info("block batch", "start", j, "end", end)

filterOpts := &bind.FilterOpts{
Expand All @@ -154,6 +278,14 @@ func (i *Indexer) filter(
Context: ctx,
}

var filter FilterFunc

if i.isPostOntakeForkHeightReached {
filter = filterFuncOntake
} else {
filter = filterFunc
}

if err := filter(ctx, new(big.Int).SetUint64(i.srcChainID), i, filterOpts); err != nil {
return errors.Wrap(err, "filter")
}
Expand Down
6 changes: 5 additions & 1 deletion packages/eventindexer/indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type Indexer struct {

contractToMetadata map[common.Address]*eventindexer.ERC20Metadata
contractToMetadataMutex *sync.Mutex

ontakeForkHeight uint64
isPostOntakeForkHeightReached bool
}

func (i *Indexer) Start() error {
Expand Down Expand Up @@ -97,7 +100,7 @@ func (i *Indexer) eventLoop(ctx context.Context) {
slog.Info("event loop context done")
return
case <-t.C:
if err := i.filter(ctx, filterFunc); err != nil {
if err := i.filter(ctx); err != nil {
slog.Error("error filtering", "error", err)
}
}
Expand Down Expand Up @@ -204,6 +207,7 @@ func InitFromConfig(ctx context.Context, i *Indexer, cfg *Config) error {
i.layer = cfg.Layer
i.contractToMetadata = make(map[common.Address]*eventindexer.ERC20Metadata, 0)
i.contractToMetadataMutex = &sync.Mutex{}
i.ontakeForkHeight = cfg.OntakeForkHeight

return nil
}
Expand Down
86 changes: 86 additions & 0 deletions packages/eventindexer/indexer/save_block_proposed_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,89 @@ func (i *Indexer) saveBlockProposedEvent(

return nil
}

func (i *Indexer) saveBlockProposedEventsV2(
ctx context.Context,
chainID *big.Int,
events *taikol1.TaikoL1BlockProposedV2Iterator,
) error {
if !events.Next() || events.Event == nil {
slog.Info("no blockProposedV2 events")
return nil
}

wg, ctx := errgroup.WithContext(ctx)

for {
event := events.Event

wg.Go(func() error {
tx, _, err := i.ethClient.TransactionByHash(ctx, event.Raw.TxHash)
if err != nil {
return errors.Wrap(err, "i.ethClient.TransactionByHash")
}

sender, err := i.ethClient.TransactionSender(ctx, tx, event.Raw.BlockHash, event.Raw.TxIndex)
if err != nil {
return errors.Wrap(err, "i.ethClient.TransactionSender")
}

if err := i.saveBlockProposedEventV2(ctx, chainID, event, sender); err != nil {
eventindexer.BlockProposedEventsProcessedError.Inc()

return errors.Wrap(err, "i.saveBlockProposedEvent")
}

return nil
})

if !events.Next() {
break
}
}

if err := wg.Wait(); err != nil {
return err
}

return nil
}

func (i *Indexer) saveBlockProposedEventV2(
ctx context.Context,
chainID *big.Int,
event *taikol1.TaikoL1BlockProposedV2,
sender common.Address,
) error {
slog.Info("blockProposed", "proposer", sender.Hex())

marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
}

blockID := event.BlockId.Int64()

block, err := i.ethClient.BlockByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber))
if err != nil {
return errors.Wrap(err, "i.ethClient.BlockByNumber")
}

_, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{
Name: eventindexer.EventNameBlockProposed,
Data: string(marshaled),
ChainID: chainID,
Event: eventindexer.EventNameBlockProposed,
Address: sender.Hex(),
BlockID: &blockID,
TransactedAt: time.Unix(int64(block.Time()), 0).UTC(),
EmittedBlockID: event.Raw.BlockNumber,
})
if err != nil {
return errors.Wrap(err, "i.eventRepo.Save")
}

eventindexer.BlockProposedEventsProcessed.Inc()

return nil
}
75 changes: 75 additions & 0 deletions packages/eventindexer/indexer/save_block_verified_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,78 @@ func (i *Indexer) saveBlockVerifiedEvent(

return nil
}

func (i *Indexer) saveBlockVerifiedEventsV2(
ctx context.Context,
chainID *big.Int,
events *taikol1.TaikoL1BlockVerifiedV2Iterator,
) error {
if !events.Next() || events.Event == nil {
slog.Info("no BlockVerified events")
return nil
}

wg, ctx := errgroup.WithContext(ctx)

for {
event := events.Event

wg.Go(func() error {
if err := i.saveBlockVerifiedEventV2(ctx, chainID, event); err != nil {
eventindexer.BlockVerifiedEventsProcessedError.Inc()

return errors.Wrap(err, "i.saveBlockVerifiedEvent")
}

return nil
})

if !events.Next() {
break
}
}

if err := wg.Wait(); err != nil {
return err
}

return nil
}

func (i *Indexer) saveBlockVerifiedEventV2(
ctx context.Context,
chainID *big.Int,
event *taikol1.TaikoL1BlockVerifiedV2,
) error {
slog.Info("new blockVerified event", "blockID", event.BlockId.Int64())

marshaled, err := json.Marshal(event)
if err != nil {
return errors.Wrap(err, "json.Marshal(event)")
}

blockID := event.BlockId.Int64()

block, err := i.ethClient.BlockByNumber(ctx, new(big.Int).SetUint64(event.Raw.BlockNumber))
if err != nil {
return errors.Wrap(err, "i.ethClient.BlockByNumber")
}

_, err = i.eventRepo.Save(ctx, eventindexer.SaveEventOpts{
Name: eventindexer.EventNameBlockVerified,
Data: string(marshaled),
ChainID: chainID,
Event: eventindexer.EventNameBlockVerified,
Address: "",
BlockID: &blockID,
TransactedAt: time.Unix(int64(block.Time()), 0),
EmittedBlockID: event.Raw.BlockNumber,
})
if err != nil {
return errors.Wrap(err, "i.eventRepo.Save")
}

eventindexer.BlockVerifiedEventsProcessed.Inc()

return nil
}
Loading
Loading