Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Change from protobuf to cbor for gRPC #3061

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ require (
github.com/multiformats/go-multihash v0.2.3
github.com/pelletier/go-toml v1.9.5
github.com/pkg/errors v0.9.1
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10
github.com/sourcenetwork/acp_core v0.0.0-20240607160510-47a5306b2ad2
github.com/sourcenetwork/badger/v4 v4.2.1-0.20231113215945-a63444ca5276
github.com/sourcenetwork/corelog v0.0.8
Expand All @@ -66,7 +65,6 @@ require (
golang.org/x/crypto v0.27.0
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa
google.golang.org/grpc v1.67.0
google.golang.org/protobuf v1.34.2
)

require (
Expand Down Expand Up @@ -375,6 +373,7 @@ require (
google.golang.org/genproto v0.0.0-20240227224415-6ceb2ff114de // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240814211410-ddb44dafa142 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1265,8 +1265,6 @@ github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 h1:GFCKgmp0tecUJ0sJuv4pzYCqS9+RGSn52M3FUwPs+uo=
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10/go.mod h1:t/avpk3KcrXxUnYOhZhMXJlSEyie6gQbtLq5NM3loB8=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down
42 changes: 26 additions & 16 deletions internal/kms/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@ import (
"crypto/ecdh"
"encoding/base64"

"github.com/fxamacker/cbor/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
libpeer "github.com/libp2p/go-libp2p/core/peer"
rpc "github.com/sourcenetwork/go-libp2p-pubsub-rpc"
grpcpeer "google.golang.org/grpc/peer"
"google.golang.org/protobuf/proto"

"github.com/sourcenetwork/defradb/crypto"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/encryption"
pb "github.com/sourcenetwork/defradb/net/pb"
)

const pubsubTopic = "encryption"
Expand Down Expand Up @@ -127,10 +126,15 @@ func (s *pubSubService) handleKeyRequestedEvent() {
}
}

type fetchEncryptionKeyRequest struct {
Links [][]byte
EphemeralPublicKey []byte
}

// handleEncryptionMessage handles incoming FetchEncryptionKeyRequest messages from the pubsub network.
func (s *pubSubService) handleRequestFromPeer(peerID libpeer.ID, topic string, msg []byte) ([]byte, error) {
req := new(pb.FetchEncryptionKeyRequest)
if err := proto.Unmarshal(msg, req); err != nil {
req := new(fetchEncryptionKeyRequest)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thought: not very common way of creating a value on heap.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You know that is your code right 😉

if err := cbor.Unmarshal(msg, req); err != nil {
log.ErrorContextE(s.ctx, "Failed to unmarshal pubsub message %s", err)
return nil, err
}
Expand All @@ -141,14 +145,14 @@ func (s *pubSubService) handleRequestFromPeer(peerID libpeer.ID, topic string, m
log.ErrorContextE(s.ctx, "failed attempt to get encryption key", err)
return nil, errors.Wrap("failed attempt to get encryption key", err)
}
return res.MarshalVT()
return cbor.Marshal(res)
}

func (s *pubSubService) prepareFetchEncryptionKeyRequest(
cids []cidlink.Link,
ephemeralPublicKey []byte,
) (*pb.FetchEncryptionKeyRequest, error) {
req := &pb.FetchEncryptionKeyRequest{
) (*fetchEncryptionKeyRequest, error) {
req := &fetchEncryptionKeyRequest{
EphemeralPublicKey: ephemeralPublicKey,
}

Expand Down Expand Up @@ -177,7 +181,7 @@ func (s *pubSubService) requestEncryptionKeyFromPeers(
return err
}

data, err := req.MarshalVT()
data, err := cbor.Marshal(req)
if err != nil {
return errors.Wrap("failed to marshal pubsub message", err)
}
Expand All @@ -194,17 +198,23 @@ func (s *pubSubService) requestEncryptionKeyFromPeers(
return nil
}

type fetchEncryptionKeyReply struct {
Links [][]byte
Blocks [][]byte
EphemeralPublicKey []byte
}

// handleFetchEncryptionKeyResponse handles incoming FetchEncryptionKeyResponse messages
func (s *pubSubService) handleFetchEncryptionKeyResponse(
resp rpc.Response,
req *pb.FetchEncryptionKeyRequest,
req *fetchEncryptionKeyRequest,
privateKey *ecdh.PrivateKey,
result chan<- encryption.Result,
) {
defer close(result)

var keyResp pb.FetchEncryptionKeyReply
if err := proto.Unmarshal(resp.Data, &keyResp); err != nil {
var keyResp fetchEncryptionKeyReply
if err := cbor.Unmarshal(resp.Data, &keyResp); err != nil {
log.ErrorContextE(s.ctx, "Failed to unmarshal encryption key response", err)
result <- encryption.Result{Error: err}
return
Expand Down Expand Up @@ -238,7 +248,7 @@ func (s *pubSubService) handleFetchEncryptionKeyResponse(
}

// makeAssociatedData creates the associated data for the encryption key request
func makeAssociatedData(req *pb.FetchEncryptionKeyRequest, peerID libpeer.ID) []byte {
func makeAssociatedData(req *fetchEncryptionKeyRequest, peerID libpeer.ID) []byte {
return encodeToBase64(bytes.Join([][]byte{
req.EphemeralPublicKey,
[]byte(peerID),
Expand All @@ -247,8 +257,8 @@ func makeAssociatedData(req *pb.FetchEncryptionKeyRequest, peerID libpeer.ID) []

func (s *pubSubService) tryGenEncryptionKeyLocally(
ctx context.Context,
req *pb.FetchEncryptionKeyRequest,
) (*pb.FetchEncryptionKeyReply, error) {
req *fetchEncryptionKeyRequest,
) (*fetchEncryptionKeyReply, error) {
blocks, err := s.getEncryptionKeysLocally(ctx, req)
if err != nil || len(blocks) == 0 {
return nil, err
Expand All @@ -264,7 +274,7 @@ func (s *pubSubService) tryGenEncryptionKeyLocally(
return nil, err
}

res := &pb.FetchEncryptionKeyReply{
res := &fetchEncryptionKeyReply{
Links: req.Links,
EphemeralPublicKey: privKey.PublicKey().Bytes(),
}
Expand Down Expand Up @@ -293,7 +303,7 @@ func (s *pubSubService) tryGenEncryptionKeyLocally(
// It returns the encryption keys and the targets for which the keys were found.
func (s *pubSubService) getEncryptionKeysLocally(
ctx context.Context,
req *pb.FetchEncryptionKeyRequest,
req *fetchEncryptionKeyRequest,
) ([][]byte, error) {
blocks := make([][]byte, 0, len(req.Links))
for _, link := range req.Links {
Expand Down
23 changes: 8 additions & 15 deletions net/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/event"
pb "github.com/sourcenetwork/defradb/net/pb"
)

var (
Expand All @@ -32,19 +31,6 @@ var (
// pushLog creates a pushLog request and sends it to another node
// over libp2p grpc connection
func (s *server) pushLog(evt event.Update, pid peer.ID) error {
body := &pb.PushLogRequest_Body{
DocID: []byte(evt.DocID),
Cid: evt.Cid.Bytes(),
SchemaRoot: []byte(evt.SchemaRoot),
Creator: s.peer.host.ID().String(),
Log: &pb.Log{
Block: evt.Block,
},
}
req := &pb.PushLogRequest{
Body: body,
}

client, err := s.dial(pid) // grpc dial over P2P stream
if err != nil {
return NewErrPushLog(err)
Expand All @@ -53,7 +39,14 @@ func (s *server) pushLog(evt event.Update, pid peer.ID) error {
ctx, cancel := context.WithTimeout(s.peer.ctx, PushTimeout)
defer cancel()

if _, err := client.PushLog(ctx, req); err != nil {
req := pushLogRequest{
DocID: evt.DocID,
CID: evt.Cid.Bytes(),
SchemaRoot: evt.SchemaRoot,
Creator: s.peer.host.ID().String(),
Block: evt.Block,
}
if err := client.Invoke(ctx, servicePushLogName, req, nil); err != nil {
return NewErrPushLog(
err,
errors.NewKV("CID", evt.Cid),
Expand Down
40 changes: 40 additions & 0 deletions net/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package net

import (
"github.com/fxamacker/cbor/v2"
"google.golang.org/grpc/encoding"
)

const cborCodecName = "cbor"

// cborCodec is a gRPC Codec implementation with CBOR encoding.
type cborCodec struct{}

func (c *cborCodec) Marshal(v any) ([]byte, error) {
return cbor.Marshal(v)
}

func (c *cborCodec) Unmarshal(data []byte, v any) error {
if v == nil {
return nil
}
return cbor.Unmarshal(data, v)
}

func (c *cborCodec) Name() string {
return cborCodecName
}

func init() {
encoding.RegisterCodec(&cborCodec{})
}
7 changes: 3 additions & 4 deletions net/dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,10 @@ import (

"github.com/sourcenetwork/defradb/errors"
corenet "github.com/sourcenetwork/defradb/internal/core/net"
pb "github.com/sourcenetwork/defradb/net/pb"
)

// dial attempts to open a gRPC connection over libp2p to a peer.
func (s *server) dial(peerID libpeer.ID) (pb.ServiceClient, error) {
func (s *server) dial(peerID libpeer.ID) (*grpc.ClientConn, error) {
s.mu.Lock()
defer s.mu.Unlock()
conn, ok := s.conns[peerID]
Expand All @@ -37,7 +36,7 @@ func (s *server) dial(peerID libpeer.ID) (pb.ServiceClient, error) {
return nil, err
}
} else {
return pb.NewServiceClient(conn), nil
return conn, nil
}
}
// We need the "passthrough:" in the beginning of the target,
Expand All @@ -54,7 +53,7 @@ func (s *server) dial(peerID libpeer.ID) (pb.ServiceClient, error) {
return nil, err
}
s.conns[peerID] = conn
return pb.NewServiceClient(conn), nil
return conn, nil
}

// getLibp2pDialer returns a WithContextDialer option for libp2p dialing.
Expand Down
105 changes: 105 additions & 0 deletions net/grpc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package net

import (
"context"

"google.golang.org/grpc"
)

const (
grpcServiceName = "defradb.net.Service"

serviceGetDocGraphName = "/" + grpcServiceName + "/GetDocGraph"
servicePushDocGraphName = "/" + grpcServiceName + "/PushDocGraph"
serviceGetLogName = "/" + grpcServiceName + "/GetLog"
servicePushLogName = "/" + grpcServiceName + "/PushLog"
serviceGetHeadLogName = "/" + grpcServiceName + "/GetHeadLog"
)

type getDocGraphRequest struct{}

type getDocGraphReply struct{}

type getHeadLogRequest struct{}

type getHeadLogReply struct{}

type getLogRequest struct{}

type getLogReply struct{}

type pushDocGraphRequest struct{}

type pushDocGraphReply struct{}

type pushLogRequest struct {
DocID string
CID []byte
SchemaRoot string
Creator string
Block []byte
}

type pushLogReply struct{}

type serviceServer interface {
// GetDocGraph from this peer.
GetDocGraph(context.Context, *getDocGraphRequest) (*getDocGraphReply, error)
// PushDocGraph to this peer.
PushDocGraph(context.Context, *pushDocGraphRequest) (*pushDocGraphReply, error)
// GetLog from this peer.
GetLog(context.Context, *getLogRequest) (*getLogReply, error)
// PushLog to this peer.
PushLog(context.Context, *pushLogRequest) (*pushLogReply, error)
// GetHeadLog from this peer
GetHeadLog(context.Context, *getHeadLogRequest) (*getHeadLogReply, error)
}

func pushLogHandler(
srv any,
ctx context.Context,
dec func(any) error,
interceptor grpc.UnaryServerInterceptor,
) (any, error) {
in := new(pushLogRequest)
if err := dec(in); err != nil {
return nil, err

Check warning on line 76 in net/grpc.go

View check run for this annotation

Codecov / codecov/patch

net/grpc.go#L76

Added line #L76 was not covered by tests
}
if interceptor == nil {
return srv.(serviceServer).PushLog(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: servicePushLogName,

Check warning on line 83 in net/grpc.go

View check run for this annotation

Codecov / codecov/patch

net/grpc.go#L81-L83

Added lines #L81 - L83 were not covered by tests
}
handler := func(ctx context.Context, req any) (any, error) {
return srv.(serviceServer).PushLog(ctx, req.(*pushLogRequest))

Check warning on line 86 in net/grpc.go

View check run for this annotation

Codecov / codecov/patch

net/grpc.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}
return interceptor(ctx, in, info, handler)

Check warning on line 88 in net/grpc.go

View check run for this annotation

Codecov / codecov/patch

net/grpc.go#L88

Added line #L88 was not covered by tests
}

func registerServiceServer(s grpc.ServiceRegistrar, srv serviceServer) {
desc := &grpc.ServiceDesc{
ServiceName: grpcServiceName,
HandlerType: (*serviceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "PushLog",
Handler: pushLogHandler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "defradb.cbor",
}
s.RegisterService(desc, srv)
}
Loading
Loading