Skip to content

Commit

Permalink
feat(schema): indexing API (#20647)
Browse files Browse the repository at this point in the history
  • Loading branch information
aaronc authored Jul 9, 2024
1 parent ff5df35 commit 4857ea1
Show file tree
Hide file tree
Showing 38 changed files with 581 additions and 10 deletions.
79 changes: 79 additions & 0 deletions baseapp/streaming.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package baseapp

import (
"context"
"fmt"
"sort"
"strings"

abci "github.com/cometbft/cometbft/api/cometbft/abci/v1"
"github.com/spf13/cast"

"cosmossdk.io/schema"
"cosmossdk.io/schema/appdata"
"cosmossdk.io/schema/decoding"
"cosmossdk.io/schema/indexer"
"cosmossdk.io/store/streaming"
storetypes "cosmossdk.io/store/types"

Expand All @@ -22,6 +28,31 @@ const (
StreamingABCIStopNodeOnErrTomlKey = "stop-node-on-err"
)

// EnableIndexer enables the built-in indexer with the provided options (usually from the app.toml indexer key),
// kv-store keys, and app modules. Using the built-in indexer framework is mutually exclusive from using other
// types of streaming listeners.
func (app *BaseApp) EnableIndexer(indexerOpts interface{}, keys map[string]*storetypes.KVStoreKey, appModules map[string]any) error {
listener, err := indexer.StartManager(indexer.ManagerOptions{
Config: indexerOpts,
Resolver: decoding.ModuleSetDecoderResolver(appModules),
SyncSource: nil,
Logger: app.logger.With("module", "indexer"),
})
if err != nil {
return err
}

exposedKeys := exposeStoreKeysSorted([]string{"*"}, keys)
app.cms.AddListeners(exposedKeys)

app.streamingManager = storetypes.StreamingManager{
ABCIListeners: []storetypes.ABCIListener{listenerWrapper{listener}},
StopNodeOnErr: true,
}

return nil
}

// RegisterStreamingServices registers streaming services with the BaseApp.
func (app *BaseApp) RegisterStreamingServices(appOpts servertypes.AppOptions, keys map[string]*storetypes.KVStoreKey) error {
// register streaming services
Expand Down Expand Up @@ -110,3 +141,51 @@ func exposeStoreKeysSorted(keysStr []string, keys map[string]*storetypes.KVStore

return exposeStoreKeys
}

type listenerWrapper struct {
listener appdata.Listener
}

func (p listenerWrapper) ListenFinalizeBlock(_ context.Context, req abci.FinalizeBlockRequest, res abci.FinalizeBlockResponse) error {
if p.listener.StartBlock != nil {
err := p.listener.StartBlock(appdata.StartBlockData{
Height: uint64(req.Height),
})
if err != nil {
return err
}
}

//// TODO txs, events

return nil
}

func (p listenerWrapper) ListenCommit(ctx context.Context, res abci.CommitResponse, changeSet []*storetypes.StoreKVPair) error {
if cb := p.listener.OnKVPair; cb != nil {
updates := make([]appdata.ModuleKVPairUpdate, len(changeSet))
for i, pair := range changeSet {
updates[i] = appdata.ModuleKVPairUpdate{
ModuleName: pair.StoreKey,
Update: schema.KVPairUpdate{
Key: pair.Key,
Value: pair.Value,
Delete: pair.Delete,
},
}
}
err := cb(appdata.KVPairData{Updates: updates})
if err != nil {
return err
}
}

if p.listener.Commit != nil {
err := p.listener.Commit(appdata.CommitData{})
if err != nil {
return err
}
}

return nil
}
6 changes: 5 additions & 1 deletion client/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,10 @@ require (
pgregory.net/rapid v1.1.0 // indirect
)

require github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
require (
cosmossdk.io/schema v0.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
)

replace github.com/cosmos/cosmos-sdk => ./../../

Expand All @@ -181,6 +184,7 @@ replace (
cosmossdk.io/core/testing => ../../core/testing
cosmossdk.io/depinject => ./../../depinject
cosmossdk.io/log => ./../../log
cosmossdk.io/schema => ./../../schema
cosmossdk.io/store => ./../../store
cosmossdk.io/x/accounts => ./../../x/accounts
cosmossdk.io/x/auth => ./../../x/auth
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
cosmossdk.io/errors v1.0.1
cosmossdk.io/log v1.3.1
cosmossdk.io/math v1.3.0
cosmossdk.io/schema v0.0.0
cosmossdk.io/store v1.1.1-0.20240418092142-896cdf1971bc
cosmossdk.io/x/auth v0.0.0-00010101000000-000000000000
cosmossdk.io/x/bank v0.0.0-20240226161501-23359a0b6d91
Expand Down Expand Up @@ -189,6 +190,7 @@ replace (
cosmossdk.io/core/testing => ./core/testing
cosmossdk.io/depinject => ./depinject
cosmossdk.io/log => ./log
cosmossdk.io/schema => ./schema
cosmossdk.io/store => ./store
cosmossdk.io/x/accounts => ./x/accounts
cosmossdk.io/x/auth => ./x/auth
Expand Down
66 changes: 66 additions & 0 deletions schema/decoding/resolver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package decoding

import (
"sort"

"cosmossdk.io/schema"
)

// DecoderResolver is an interface that allows indexers to discover and use module decoders.
type DecoderResolver interface {
// IterateAll iterates over all available module decoders.
IterateAll(func(moduleName string, cdc schema.ModuleCodec) error) error

// LookupDecoder looks up a specific module decoder.
LookupDecoder(moduleName string) (decoder schema.ModuleCodec, found bool, err error)
}

// ModuleSetDecoderResolver returns DecoderResolver that will discover modules implementing
// DecodeableModule in the provided module set.
func ModuleSetDecoderResolver(moduleSet map[string]interface{}) DecoderResolver {
return &moduleSetDecoderResolver{
moduleSet: moduleSet,
}
}

type moduleSetDecoderResolver struct {
moduleSet map[string]interface{}
}

func (a moduleSetDecoderResolver) IterateAll(f func(string, schema.ModuleCodec) error) error {
keys := make([]string, 0, len(a.moduleSet))
for k := range a.moduleSet {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
module := a.moduleSet[k]
dm, ok := module.(schema.HasModuleCodec)
if ok {
decoder, err := dm.ModuleCodec()
if err != nil {
return err
}
err = f(k, decoder)
if err != nil {
return err
}
}
}
return nil
}

func (a moduleSetDecoderResolver) LookupDecoder(moduleName string) (schema.ModuleCodec, bool, error) {
mod, ok := a.moduleSet[moduleName]
if !ok {
return schema.ModuleCodec{}, false, nil
}

dm, ok := mod.(schema.HasModuleCodec)
if !ok {
return schema.ModuleCodec{}, false, nil
}

decoder, err := dm.ModuleCodec()
return decoder, true, err
}
124 changes: 124 additions & 0 deletions schema/decoding/resolver_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package decoding

import (
"fmt"
"testing"

"cosmossdk.io/schema"
)

type modA struct{}

func (m modA) ModuleCodec() (schema.ModuleCodec, error) {
return schema.ModuleCodec{
Schema: schema.ModuleSchema{ObjectTypes: []schema.ObjectType{{Name: "A"}}},
}, nil
}

type modB struct{}

func (m modB) ModuleCodec() (schema.ModuleCodec, error) {
return schema.ModuleCodec{
Schema: schema.ModuleSchema{ObjectTypes: []schema.ObjectType{{Name: "B"}}},
}, nil
}

type modC struct{}

var moduleSet = map[string]interface{}{
"modA": modA{},
"modB": modB{},
"modC": modC{},
}

var resolver = ModuleSetDecoderResolver(moduleSet)

func TestModuleSetDecoderResolver_IterateAll(t *testing.T) {
objectTypes := map[string]bool{}
err := resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
objectTypes[cdc.Schema.ObjectTypes[0].Name] = true
return nil
})
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if len(objectTypes) != 2 {
t.Fatalf("expected 2 object types, got %d", len(objectTypes))
}

if !objectTypes["A"] {
t.Fatalf("expected object type A")
}

if !objectTypes["B"] {
t.Fatalf("expected object type B")
}
}

func TestModuleSetDecoderResolver_LookupDecoder(t *testing.T) {
decoder, found, err := resolver.LookupDecoder("modA")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !found {
t.Fatalf("expected to find decoder for modA")
}

if decoder.Schema.ObjectTypes[0].Name != "A" {
t.Fatalf("expected object type A, got %s", decoder.Schema.ObjectTypes[0].Name)
}

decoder, found, err = resolver.LookupDecoder("modB")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !found {
t.Fatalf("expected to find decoder for modB")
}

if decoder.Schema.ObjectTypes[0].Name != "B" {
t.Fatalf("expected object type B, got %s", decoder.Schema.ObjectTypes[0].Name)
}

decoder, found, err = resolver.LookupDecoder("modC")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if found {
t.Fatalf("expected not to find decoder")
}

decoder, found, err = resolver.LookupDecoder("modD")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

if found {
t.Fatalf("expected not to find decoder")
}
}

type modD struct{}

func (m modD) ModuleCodec() (schema.ModuleCodec, error) {
return schema.ModuleCodec{}, fmt.Errorf("an error")
}

func TestModuleSetDecoderResolver_IterateAll_Error(t *testing.T) {
resolver := ModuleSetDecoderResolver(map[string]interface{}{
"modD": modD{},
})
err := resolver.IterateAll(func(moduleName string, cdc schema.ModuleCodec) error {
if moduleName == "modD" {
t.Fatalf("expected error")
}
return nil
})
if err == nil {
t.Fatalf("expected error")
}
}
8 changes: 8 additions & 0 deletions schema/decoding/sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package decoding

// SyncSource is an interface that allows indexers to start indexing modules with pre-existing state.
// It should generally be a wrapper around the key-value store.
type SyncSource interface {
// IterateAllKVPairs iterates over all key-value pairs for a given module.
IterateAllKVPairs(moduleName string, fn func(key, value []byte) error) error
}
15 changes: 15 additions & 0 deletions schema/indexer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Indexer Framework

# Defining an Indexer

Indexer implementations should be registered with the `indexer.Register` function with a unique type name. Indexers take the configuration options defined by `indexer.Config` which defines a common set of configuration options as well as indexer-specific options under the `config` sub-key. Indexers do not need to manage the common filtering options specified in `Config` - the indexer manager will manage these for the indexer. Indexer implementations just need to return a correct `InitResult` response.

# Integrating the Indexer Manager

The indexer manager should be used for managing all indexers and should be integrated directly with applications wishing to support indexing. The `StartManager` function is used to start the manager. The configuration options for the manager and all indexer targets should be passed as the ManagerOptions.Config field and should match the json structure of ManagerConfig. An example configuration section in `app.toml` might look like this:

```toml
[indexer.target.postgres]
type = "postgres"
config.database_url = "postgres://user:password@localhost:5432/dbname"
```
Loading

0 comments on commit 4857ea1

Please sign in to comment.