Skip to content

Commit

Permalink
fix(serverv2/cometbft): properly decode and route gRPC transactions (#…
Browse files Browse the repository at this point in the history
…20808)

Co-authored-by: marbar3778 <marbar3778@yahoo.com>
Co-authored-by: Marko <marko@baricevic.me>
  • Loading branch information
3 people authored Jul 16, 2024
1 parent 511fb07 commit 86ea861
Show file tree
Hide file tree
Showing 7 changed files with 84 additions and 44 deletions.
9 changes: 9 additions & 0 deletions runtime/v2/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"

gogoproto "github.com/cosmos/gogoproto/proto"
"golang.org/x/exp/slices"

runtimev2 "cosmossdk.io/api/cosmos/app/runtime/v2"
Expand Down Expand Up @@ -44,6 +45,10 @@ type App[T transaction.Tx] struct {
interfaceRegistrar registry.InterfaceRegistrar
amino legacy.Amino
moduleManager *MM[T]

// GRPCQueryDecoders maps gRPC method name to a function that decodes the request
// bytes into a gogoproto.Message, which then can be passed to appmanager.
GRPCQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)
}

// Logger returns the app logger.
Expand Down Expand Up @@ -109,3 +114,7 @@ func (a *App[T]) ExecuteGenesisTx(_ []byte) error {
func (a *App[T]) GetAppManager() *appmanager.AppManager[T] {
return a.AppManager
}

func (a *App[T]) GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error) {
return a.GRPCQueryDecoders
}
43 changes: 33 additions & 10 deletions runtime/v2/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
"reflect"
"sort"

gogoproto "github.com/cosmos/gogoproto/proto"
Expand Down Expand Up @@ -555,17 +556,28 @@ func (m *MM[T]) assertNoForgottenModules(

func registerServices[T transaction.Tx](s appmodule.HasServices, app *App[T], registry *protoregistry.Files) error {
c := &configurator{
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
err: nil,
grpcQueryDecoders: map[string]func([]byte) (gogoproto.Message, error){},
stfQueryRouter: app.queryRouterBuilder,
stfMsgRouter: app.msgRouterBuilder,
registry: registry,
err: nil,
}
return s.RegisterServices(c)

err := s.RegisterServices(c)
if err != nil {
return fmt.Errorf("unable to register services: %w", err)
}
app.GRPCQueryDecoders = c.grpcQueryDecoders
return nil
}

var _ grpc.ServiceRegistrar = (*configurator)(nil)

type configurator struct {
// grpcQueryDecoders is required because module expose queries through gRPC
// this provides a way to route to modules using gRPC.
grpcQueryDecoders map[string]func([]byte) (gogoproto.Message, error)

stfQueryRouter *stf.MsgRouterBuilder
stfMsgRouter *stf.MsgRouterBuilder
registry *protoregistry.Files
Expand Down Expand Up @@ -596,17 +608,28 @@ func (c *configurator) RegisterService(sd *grpc.ServiceDesc, ss interface{}) {
func (c *configurator) registerQueryHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
for _, md := range sd.Methods {
// TODO(tip): what if a query is not deterministic?
err := registerMethod(c.stfQueryRouter, sd, md, ss)
requestFullName, err := registerMethod(c.stfQueryRouter, sd, md, ss)
if err != nil {
return fmt.Errorf("unable to register query handler %s: %w", md.MethodName, err)
}

// register gRPC query method.
typ := gogoproto.MessageType(requestFullName)
if typ == nil {
return fmt.Errorf("unable to find message in gogotype registry: %w", err)
}
decoderFunc := func(bytes []byte) (gogoproto.Message, error) {
msg := reflect.New(typ.Elem()).Interface().(gogoproto.Message)
return msg, gogoproto.Unmarshal(bytes, msg)
}
c.grpcQueryDecoders[md.MethodName] = decoderFunc
}
return nil
}

func (c *configurator) registerMsgHandlers(sd *grpc.ServiceDesc, ss interface{}) error {
for _, md := range sd.Methods {
err := registerMethod(c.stfMsgRouter, sd, md, ss)
_, err := registerMethod(c.stfMsgRouter, sd, md, ss)
if err != nil {
return fmt.Errorf("unable to register msg handler %s: %w", md.MethodName, err)
}
Expand All @@ -633,13 +656,13 @@ func registerMethod(
sd *grpc.ServiceDesc,
md grpc.MethodDesc,
ss interface{},
) error {
) (string, error) {
requestName, err := requestFullNameFromMethodDesc(sd, md)
if err != nil {
return err
return "", err
}

return stfRouter.RegisterHandler(string(requestName), func(
return string(requestName), stfRouter.RegisterHandler(string(requestName), func(
ctx context.Context,
msg appmodulev2.Message,
) (resp appmodulev2.Message, err error) {
Expand Down
52 changes: 31 additions & 21 deletions server/v2/cometbft/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

abci "github.com/cometbft/cometbft/abci/types"
abciproto "github.com/cometbft/cometbft/api/cometbft/abci/v1"
gogoproto "github.com/cosmos/gogoproto/proto"

coreappmgr "cosmossdk.io/core/app"
"cosmossdk.io/core/comet"
Expand All @@ -31,6 +32,9 @@ import (
var _ abci.Application = (*Consensus[transaction.Tx])(nil)

type Consensus[T transaction.Tx] struct {
// legacy support for gRPC
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error)

app *appmanager.AppManager[T]
cfg Config
store types.Store
Expand All @@ -56,18 +60,28 @@ type Consensus[T transaction.Tx] struct {
func NewConsensus[T transaction.Tx](
app *appmanager.AppManager[T],
mp mempool.Mempool[T],
grpcQueryDecoders map[string]func(requestBytes []byte) (gogoproto.Message, error),
store types.Store,
cfg Config,
txCodec transaction.Codec[T],
logger log.Logger,
) *Consensus[T] {
return &Consensus[T]{
mempool: mp,
store: store,
app: app,
cfg: cfg,
txCodec: txCodec,
logger: logger,
grpcQueryDecoders: grpcQueryDecoders,
app: app,
cfg: cfg,
store: store,
logger: logger,
txCodec: txCodec,
streaming: streaming.Manager{},
snapshotManager: nil,
mempool: mp,
lastCommittedHeight: atomic.Int64{},
prepareProposalHandler: nil,
processProposalHandler: nil,
verifyVoteExt: nil,
extendVote: nil,
chainID: "",
}
}

Expand Down Expand Up @@ -150,18 +164,16 @@ func (c *Consensus[T]) Info(ctx context.Context, _ *abciproto.InfoRequest) (*abc

// Query implements types.Application.
// It is called by cometbft to query application state.
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (*abciproto.QueryResponse, error) {
// follow the query path from here
decodedMsg, err := c.txCodec.Decode(req.Data)
protoMsg, ok := any(decodedMsg).(transaction.Msg)
if !ok {
return nil, fmt.Errorf("decoded type T %T must implement core/transaction.Msg", decodedMsg)
}

// if no error is returned then we can handle the query with the appmanager
// otherwise it is a KV store query
if err == nil {
res, err := c.app.Query(ctx, uint64(req.Height), protoMsg)
func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (resp *abciproto.QueryResponse, err error) {
// check if it's a gRPC method
grpcQueryDecoder, isGRPC := c.grpcQueryDecoders[req.Path]
if isGRPC {
protoRequest, err := grpcQueryDecoder(req.Data)
if err != nil {
return nil, fmt.Errorf("unable to decode gRPC request with path %s from ABCI.Query: %w", req.Path, err)
}
res, err := c.app.Query(ctx, uint64(req.Height), protoRequest)

if err != nil {
resp := queryResult(err)
resp.Height = req.Height
Expand All @@ -179,8 +191,6 @@ func (c *Consensus[T]) Query(ctx context.Context, req *abciproto.QueryRequest) (
return QueryResult(errorsmod.Wrap(cometerrors.ErrUnknownRequest, "no query path provided"), c.cfg.Trace), nil
}

var resp *abciproto.QueryResponse

switch path[0] {
case cmtservice.QueryPathApp:
resp, err = c.handlerQueryApp(ctx, path, req)
Expand Down Expand Up @@ -391,7 +401,7 @@ func (c *Consensus[T]) FinalizeBlock(
// ProposerAddress: req.ProposerAddress,
// LastCommit: req.DecidedLastCommit,
// },
//}
// }
//
// ctx = context.WithValue(ctx, corecontext.CometInfoKey, &comet.Info{
// Evidence: sdktypes.ToSDKEvidence(req.Misbehavior),
Expand Down
2 changes: 1 addition & 1 deletion server/v2/cometbft/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (s *CometBFTServer[T]) Init(appI serverv2.AppI[T], v *viper.Viper, logger l

// create consensus
store := appI.GetStore().(types.Store)
consensus := NewConsensus[T](appI.GetAppManager(), s.options.Mempool, store, s.config, s.initTxCodec, s.logger)
consensus := NewConsensus[T](appI.GetAppManager(), s.options.Mempool, appI.GetGRPCQueryDecoders(), store, s.config, s.initTxCodec, s.logger)

consensus.prepareProposalHandler = s.options.PrepareProposalHandler
consensus.processProposalHandler = s.options.ProcessProposalHandler
Expand Down
2 changes: 2 additions & 0 deletions server/v2/types.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package serverv2

import (
gogoproto "github.com/cosmos/gogoproto/proto"
"github.com/spf13/viper"

coreapp "cosmossdk.io/core/app"
Expand All @@ -15,5 +16,6 @@ type AppI[T transaction.Tx] interface {
GetAppManager() *appmanager.AppManager[T]
GetConsensusAuthority() string
InterfaceRegistry() coreapp.InterfaceRegistry
GetGRPCQueryDecoders() map[string]func(requestBytes []byte) (gogoproto.Message, error)
GetStore() any
}
1 change: 0 additions & 1 deletion simapp/v2/app_di.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,6 @@ func NewSimApp[T transaction.Tx](
if err := app.LoadLatest(); err != nil {
panic(err)
}

return app
}

Expand Down
19 changes: 8 additions & 11 deletions simapp/v2/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.22.2

require (
cosmossdk.io/api v0.7.5
cosmossdk.io/collections v0.4.0 // indirect
cosmossdk.io/client/v2 v2.0.0-00010101000000-000000000000
cosmossdk.io/core v0.12.1-0.20231114100755-569e3ff6a0d7
cosmossdk.io/depinject v1.0.0-alpha.4
cosmossdk.io/log v1.3.1
Expand All @@ -13,6 +13,7 @@ require (
cosmossdk.io/server/v2 v2.0.0-00010101000000-000000000000
cosmossdk.io/server/v2/cometbft v0.0.0-00010101000000-000000000000
cosmossdk.io/store/v2 v2.0.0
cosmossdk.io/tools/confix v0.0.0-00010101000000-000000000000
cosmossdk.io/x/accounts v0.0.0-20240226161501-23359a0b6d91
cosmossdk.io/x/auth v0.0.0-00010101000000-000000000000
cosmossdk.io/x/authz v0.0.0-00010101000000-000000000000
Expand All @@ -29,28 +30,18 @@ require (
cosmossdk.io/x/protocolpool v0.0.0-20230925135524-a1bc045b3190
cosmossdk.io/x/slashing v0.0.0-00010101000000-000000000000
cosmossdk.io/x/staking v0.0.0-00010101000000-000000000000
cosmossdk.io/x/tx v0.13.3 // indirect
cosmossdk.io/x/upgrade v0.0.0-20230613133644-0a778132a60f
github.com/cometbft/cometbft v1.0.0-rc1
github.com/cosmos/cosmos-db v1.0.2
// this version is not used as it is always replaced by the latest Cosmos SDK version
github.com/cosmos/cosmos-sdk v0.51.0
github.com/cosmos/gogoproto v1.5.0 // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.19.0
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.7.0 // indirect
google.golang.org/protobuf v1.34.2
)

require (
cosmossdk.io/client/v2 v2.0.0-00010101000000-000000000000
cosmossdk.io/tools/confix v0.0.0-00010101000000-000000000000
)

require (
buf.build/gen/go/cometbft/cometbft/protocolbuffers/go v1.34.2-20240701160653-fedbb9acfd2f.2 // indirect
buf.build/gen/go/cosmos/gogo-proto/protocolbuffers/go v1.34.2-20240130113600-88ef6483f90f.2 // indirect
Expand All @@ -60,6 +51,7 @@ require (
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.8 // indirect
cloud.google.com/go/storage v1.42.0 // indirect
cosmossdk.io/collections v0.4.0 // indirect
cosmossdk.io/core/testing v0.0.0-00010101000000-000000000000 // indirect
cosmossdk.io/errors v1.0.1 // indirect
cosmossdk.io/schema v0.1.1 // indirect
Expand All @@ -69,6 +61,7 @@ require (
cosmossdk.io/x/accounts/defaults/lockup v0.0.0-20240417181816-5e7aae0db1f5 // indirect
cosmossdk.io/x/accounts/defaults/multisig v0.0.0-00010101000000-000000000000 // indirect
cosmossdk.io/x/epochs v0.0.0-20240522060652-a1ae4c3e0337 // indirect
cosmossdk.io/x/tx v0.13.3 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
Expand Down Expand Up @@ -97,6 +90,7 @@ require (
github.com/cosmos/crypto v0.1.1 // indirect
github.com/cosmos/go-bip39 v1.0.0 // indirect
github.com/cosmos/gogogateway v1.2.0 // indirect
github.com/cosmos/gogoproto v1.5.0 // indirect
github.com/cosmos/iavl v1.2.0 // indirect
github.com/cosmos/ics23/go v0.10.0 // indirect
github.com/cosmos/ledger-cosmos-go v0.13.3 // indirect
Expand Down Expand Up @@ -125,6 +119,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.2.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/mock v1.6.0 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
Expand Down Expand Up @@ -200,6 +195,7 @@ require (
github.com/sasha-s/go-deadlock v0.3.1 // indirect
github.com/sourcegraph/conc v0.3.0 // indirect
github.com/spf13/afero v1.11.0 // indirect
github.com/spf13/cast v1.6.0 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
github.com/supranational/blst v0.3.12 // indirect
github.com/syndtr/goleveldb v1.0.1-0.20220721030215-126854af5e6d // indirect
Expand All @@ -223,6 +219,7 @@ require (
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/oauth2 v0.21.0 // indirect
golang.org/x/sync v0.7.0 // indirect
golang.org/x/sys v0.22.0 // indirect
golang.org/x/term v0.22.0 // indirect
golang.org/x/text v0.16.0 // indirect
Expand Down

0 comments on commit 86ea861

Please sign in to comment.