diff --git a/waku.go b/waku.go index 16ca5f4ff..43fd158d3 100644 --- a/waku.go +++ b/waku.go @@ -118,7 +118,7 @@ func main() { }, &cli.StringFlag{ Name: "nat", - Usage: "TODO", + Usage: "TODO - Not implemented yet.", // This was added so js-waku test don't fail Destination: &options.NAT, }, &cli.StringFlag{ diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index bfcb98a00..e1b64c7a8 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p" "go.uber.org/zap" @@ -95,19 +96,24 @@ func defaultStoreFactory(w *WakuNode) store.Store { func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { params := new(WakuNodeParameters) - ctx, cancel := context.WithCancel(ctx) - params.libP2POpts = DefaultLibP2POptions opts = append(DefaultWakuNodeOptions, opts...) for _, opt := range opts { err := opt(params) if err != nil { - cancel() return nil, err } } + if params.privKey == nil { + prvKey, err := crypto.GenerateKey() + if err != nil { + return nil, err + } + params.privKey = prvKey + } + if params.enableWSS { params.libP2POpts = append(params.libP2POpts, libp2p.Transport(ws.New, ws.WithTLSConfig(params.tlsConfig))) } else if params.enableWS { @@ -118,7 +124,6 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { if params.hostAddr == nil { err := WithHostAddress(&net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 0})(params) if err != nil { - cancel() return nil, err } } @@ -126,9 +131,7 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { params.libP2POpts = append(params.libP2POpts, libp2p.ListenAddrs(params.multiAddr...)) } - if params.privKey != nil { - params.libP2POpts = append(params.libP2POpts, params.Identity()) - } + params.libP2POpts = append(params.libP2POpts, params.Identity()) if params.addressFactory != nil { params.libP2POpts = append(params.libP2POpts, libp2p.AddrsFactory(params.addressFactory)) @@ -136,10 +139,11 @@ func New(ctx context.Context, opts ...WakuNodeOption) (*WakuNode, error) { host, err := libp2p.New(params.libP2POpts...) if err != nil { - cancel() return nil, err } + ctx, cancel := context.WithCancel(ctx) + w := new(WakuNode) w.bcaster = v2.NewBroadcaster(1024) w.host = host diff --git a/waku/v2/rpc/filter.go b/waku/v2/rpc/filter.go index 4a9ab2629..a06e2e981 100644 --- a/waku/v2/rpc/filter.go +++ b/waku/v2/rpc/filter.go @@ -119,7 +119,10 @@ func (f *FilterService) GetV1Messages(req *http.Request, args *ContentTopicArgs, return fmt.Errorf("topic %s not subscribed", args.ContentTopic) } - reply.Messages = f.messages[args.ContentTopic] + for i := range f.messages[args.ContentTopic] { + *reply = append(*reply, ProtoWakuMessageToRPCWakuMessage(f.messages[args.ContentTopic][i])) + } + f.messages[args.ContentTopic] = make([]*pb.WakuMessage, 0) return nil } diff --git a/waku/v2/rpc/filter_test.go b/waku/v2/rpc/filter_test.go index 5edaba5b5..b3b151492 100644 --- a/waku/v2/rpc/filter_test.go +++ b/waku/v2/rpc/filter_test.go @@ -134,7 +134,7 @@ func TestFilterGetV1Messages(t *testing.T) { &messagesReply, ) require.NoError(t, err) - require.Len(t, messagesReply.Messages, 1) + require.Len(t, messagesReply, 1) err = serviceB.GetV1Messages( makeRequest(t), @@ -142,5 +142,5 @@ func TestFilterGetV1Messages(t *testing.T) { &messagesReply, ) require.NoError(t, err) - require.Len(t, messagesReply.Messages, 0) + require.Len(t, messagesReply, 0) } diff --git a/waku/v2/rpc/private.go b/waku/v2/rpc/private.go index 1f7b678e0..b4d5c5ea9 100644 --- a/waku/v2/rpc/private.go +++ b/waku/v2/rpc/private.go @@ -3,13 +3,15 @@ package rpc import ( "crypto/ecdsa" "crypto/rand" - "encoding/hex" "fmt" "net/http" + "strings" "sync" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/status-im/go-waku/waku/v2/node" + "github.com/status-im/go-waku/waku/v2/protocol" "github.com/status-im/go-waku/waku/v2/protocol/pb" "go.uber.org/zap" ) @@ -18,31 +20,28 @@ type PrivateService struct { node *node.WakuNode log *zap.Logger - symmetricMessages map[string][]*pb.WakuMessage - symmetricMessagesMutex sync.RWMutex + messages map[string][]*pb.WakuMessage + messagesMutex sync.RWMutex - asymmetricMessages map[string][]*pb.WakuMessage - asymmetricMessagesMutex sync.RWMutex + runner *runnerService } -type SymmetricKeyReply struct { - Key string `json:"key"` -} +type SymmetricKeyReply string type KeyPairReply struct { PrivateKey string `json:"privateKey"` - PulicKey string `json:"publicKey"` + PublicKey string `json:"publicKey"` } type SymmetricMessageArgs struct { Topic string `json:"topic"` - Message pb.WakuMessage `json:"message"` + Message RPCWakuMessage `json:"message"` SymKey string `json:"symkey"` } type AsymmetricMessageArgs struct { Topic string `json:"topic"` - Message pb.WakuMessage `json:"message"` + Message RPCWakuMessage `json:"message"` PublicKey string `json:"publicKey"` } @@ -57,12 +56,31 @@ type AsymmetricMessagesArgs struct { } func NewPrivateService(node *node.WakuNode, log *zap.Logger) *PrivateService { - return &PrivateService{ - node: node, - symmetricMessages: make(map[string][]*pb.WakuMessage), - asymmetricMessages: make(map[string][]*pb.WakuMessage), - log: log.Named("private"), + p := &PrivateService{ + node: node, + messages: make(map[string][]*pb.WakuMessage), + log: log.Named("private"), + } + p.runner = newRunnerService(node.Broadcaster(), p.addEnvelope) + + // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these + for _, topic := range node.Relay().Topics() { + p.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) + p.messages[topic] = make([]*pb.WakuMessage, 0) } + + return p +} + +func (p *PrivateService) addEnvelope(envelope *protocol.Envelope) { + p.messagesMutex.Lock() + defer p.messagesMutex.Unlock() + + if _, ok := p.messages[envelope.PubsubTopic()]; !ok { + p.messages[envelope.PubsubTopic()] = make([]*pb.WakuMessage, 0) + } + + p.messages[envelope.PubsubTopic()] = append(p.messages[envelope.PubsubTopic()], envelope.Message()) } func (p *PrivateService) GetV1SymmetricKey(req *http.Request, args *Empty, reply *SymmetricKeyReply) error { @@ -71,7 +89,7 @@ func (p *PrivateService) GetV1SymmetricKey(req *http.Request, args *Empty, reply if err != nil { return err } - reply.Key = hex.EncodeToString(key[:]) + *reply = SymmetricKeyReply(hexutil.Encode(key[:])) return nil } @@ -89,44 +107,46 @@ func (p *PrivateService) GetV1AsymmetricKeypair(req *http.Request, args *Empty, } publicKeyBytes := crypto.FromECDSAPub(publicKeyECDSA) - reply.PrivateKey = hex.EncodeToString(privateKeyBytes[:]) - reply.PulicKey = hex.EncodeToString(publicKeyBytes[:]) + reply.PrivateKey = hexutil.Encode(privateKeyBytes[:]) + reply.PublicKey = hexutil.Encode(publicKeyBytes[:]) return nil } func (p *PrivateService) PostV1SymmetricMessage(req *http.Request, args *SymmetricMessageArgs, reply *SuccessReply) error { + symKeyBytes, err := hexutil.Decode(args.SymKey) + if err != nil { + return fmt.Errorf("invalid symmetric key: %w", err) + } + keyInfo := new(node.KeyInfo) keyInfo.Kind = node.Symmetric - keyInfo.SymKey = []byte(args.SymKey) + keyInfo.SymKey = symKeyBytes + + msg := args.Message.toProto() + msg.Version = 1 - err := node.EncodeWakuMessage(&args.Message, keyInfo) + err = node.EncodeWakuMessage(msg, keyInfo) if err != nil { reply.Error = err.Error() reply.Success = false return nil } - err = p.node.Publish(req.Context(), &args.Message) + err = p.node.Publish(req.Context(), msg) if err != nil { reply.Error = err.Error() reply.Success = false return nil } - p.symmetricMessagesMutex.Lock() - defer p.symmetricMessagesMutex.Unlock() - if _, ok := p.symmetricMessages[args.Topic]; !ok { - p.symmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0) - } - p.symmetricMessages[args.Topic] = append(p.symmetricMessages[args.Topic], &args.Message) - reply.Success = true return nil } -func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *AsymmetricMessageArgs, reply *SuccessReply) error { +func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *AsymmetricMessageArgs, reply *bool) error { keyInfo := new(node.KeyInfo) keyInfo.Kind = node.Asymmetric - pubKeyBytes, err := hex.DecodeString(args.PublicKey) + + pubKeyBytes, err := hexutil.Decode(args.PublicKey) if err != nil { return fmt.Errorf("public key cannot be decoded: %v", err) } @@ -135,54 +155,110 @@ func (p *PrivateService) PostV1AsymmetricMessage(req *http.Request, args *Asymme if err != nil { return fmt.Errorf("public key cannot be unmarshalled: %v", err) } + keyInfo.PubKey = *pubKey - err = node.EncodeWakuMessage(&args.Message, keyInfo) + msg := args.Message.toProto() + msg.Version = 1 + + err = node.EncodeWakuMessage(msg, keyInfo) if err != nil { - reply.Error = err.Error() - reply.Success = false - return nil + return err } - err = p.node.Publish(req.Context(), &args.Message) + + err = p.node.Publish(req.Context(), msg) if err != nil { - reply.Error = err.Error() - reply.Success = false - return nil + return err } - p.asymmetricMessagesMutex.Lock() - defer p.asymmetricMessagesMutex.Unlock() - if _, ok := p.asymmetricMessages[args.Topic]; !ok { - p.asymmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0) - } - p.asymmetricMessages[args.Topic] = append(p.asymmetricMessages[args.Topic], &args.Message) + *reply = true - reply.Success = true return nil } func (p *PrivateService) GetV1SymmetricMessages(req *http.Request, args *SymmetricMessagesArgs, reply *MessagesReply) error { - p.symmetricMessagesMutex.Lock() - defer p.symmetricMessagesMutex.Unlock() + p.messagesMutex.Lock() - if _, ok := p.symmetricMessages[args.Topic]; !ok { + if _, ok := p.messages[args.Topic]; !ok { + p.messagesMutex.Unlock() return fmt.Errorf("topic %s not subscribed", args.Topic) } - reply.Messages = p.symmetricMessages[args.Topic] - p.symmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0) + symKeyBytes, err := hexutil.Decode(args.SymKey) + if err != nil { + return fmt.Errorf("invalid symmetric key: %w", err) + } + + messages := make([]*pb.WakuMessage, len(p.messages[args.Topic])) + copy(messages, p.messages[args.Topic]) + p.messages[args.Topic] = make([]*pb.WakuMessage, 0) + p.messagesMutex.Unlock() + + var decodedMessages []*pb.WakuMessage + for _, msg := range messages { + p.log.Info("GETTING MESSAGEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE") + err := node.DecodeWakuMessage(msg, &node.KeyInfo{ + Kind: node.Symmetric, + SymKey: symKeyBytes, + }) + if err != nil { + p.log.Info("COULD NOT DECODE") + continue + } + decodedMessages = append(decodedMessages, msg) + } + + for i := range decodedMessages { + *reply = append(*reply, ProtoWakuMessageToRPCWakuMessage(decodedMessages[i])) + } + return nil } func (p *PrivateService) GetV1AsymmetricMessages(req *http.Request, args *AsymmetricMessagesArgs, reply *MessagesReply) error { - p.asymmetricMessagesMutex.Lock() - defer p.asymmetricMessagesMutex.Unlock() + p.messagesMutex.Lock() - if _, ok := p.asymmetricMessages[args.Topic]; !ok { + if _, ok := p.messages[args.Topic]; !ok { + p.messagesMutex.Unlock() return fmt.Errorf("topic %s not subscribed", args.Topic) } - reply.Messages = p.asymmetricMessages[args.Topic] - p.asymmetricMessages[args.Topic] = make([]*pb.WakuMessage, 0) + messages := make([]*pb.WakuMessage, len(p.messages[args.Topic])) + copy(messages, p.messages[args.Topic]) + p.messages[args.Topic] = make([]*pb.WakuMessage, 0) + p.messagesMutex.Unlock() + + privKey, err := crypto.HexToECDSA(strings.TrimPrefix(args.PrivateKey, "0x")) + if err != nil { + return fmt.Errorf("invalid asymmetric key: %w", err) + } + + var decodedMessages []*pb.WakuMessage + for _, msg := range messages { + p.log.Info("GETTING MESSAGEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE") + + err := node.DecodeWakuMessage(msg, &node.KeyInfo{ + Kind: node.Asymmetric, + PrivKey: privKey, + }) + if err != nil { + p.log.Info("COULD NOT DECODE") + continue + } + decodedMessages = append(decodedMessages, msg) + } + + for i := range decodedMessages { + *reply = append(*reply, ProtoWakuMessageToRPCWakuMessage(decodedMessages[i])) + } + return nil } + +func (p *PrivateService) Start() { + p.runner.Start() +} + +func (p *PrivateService) Stop() { + p.runner.Stop() +} diff --git a/waku/v2/rpc/private_test.go b/waku/v2/rpc/private_test.go index 41c7a86e4..b110facd2 100644 --- a/waku/v2/rpc/private_test.go +++ b/waku/v2/rpc/private_test.go @@ -5,7 +5,6 @@ import ( "testing" "github.com/status-im/go-waku/waku/v2/node" - "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" ) @@ -30,7 +29,7 @@ func TestGetV1SymmetricKey(t *testing.T) { &reply, ) require.NoError(t, err) - require.NotEmpty(t, reply.Key) + require.NotEmpty(t, reply) } func TestGetV1AsymmetricKey(t *testing.T) { @@ -44,7 +43,7 @@ func TestGetV1AsymmetricKey(t *testing.T) { &reply, ) require.NoError(t, err) - require.NotEmpty(t, reply.PulicKey) + require.NotEmpty(t, reply.PublicKey) require.NotEmpty(t, reply.PrivateKey) } @@ -57,7 +56,7 @@ func TestPostV1SymmetricMessage(t *testing.T) { makeRequest(t), &SymmetricMessageArgs{ Topic: "test", - Message: pb.WakuMessage{Payload: []byte("test")}, + Message: RPCWakuMessage{Payload: []byte("test")}, SymKey: "abc", }, &reply, @@ -70,18 +69,18 @@ func TestPostV1AsymmetricMessage(t *testing.T) { d := makePrivateService(t) defer d.node.Stop() - var reply SuccessReply + var reply bool err := d.PostV1AsymmetricMessage( makeRequest(t), &AsymmetricMessageArgs{ Topic: "test", - Message: pb.WakuMessage{Payload: []byte("test")}, + Message: RPCWakuMessage{Payload: []byte("test")}, PublicKey: "045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5", }, &reply, ) require.NoError(t, err) - require.True(t, reply.Success) + require.True(t, reply) } func TestGetV1SymmetricMessages(t *testing.T) { @@ -93,7 +92,7 @@ func TestGetV1SymmetricMessages(t *testing.T) { makeRequest(t), &SymmetricMessageArgs{ Topic: "test", - Message: pb.WakuMessage{Payload: []byte("test")}, + Message: RPCWakuMessage{Payload: []byte("test")}, SymKey: "abc", }, &reply, @@ -108,25 +107,25 @@ func TestGetV1SymmetricMessages(t *testing.T) { &getReply, ) require.NoError(t, err) - require.Len(t, getReply.Messages, 1) + require.Len(t, getReply, 1) } func TestGetV1AsymmetricMessages(t *testing.T) { d := makePrivateService(t) defer d.node.Stop() - var reply SuccessReply + var reply bool err := d.PostV1AsymmetricMessage( makeRequest(t), &AsymmetricMessageArgs{ Topic: "test", - Message: pb.WakuMessage{Payload: []byte("test")}, + Message: RPCWakuMessage{Payload: []byte("test")}, PublicKey: "045ded6a56c88173e87a88c55b96956964b1bd3351b5fcb70950a4902fbc1bc0ceabb0ac846c3a4b8f2f6024c0e19f0a7f6a4865035187de5463f34012304fc7c5", }, &reply, ) require.NoError(t, err) - require.True(t, reply.Success) + require.True(t, reply) var getReply MessagesReply err = d.GetV1AsymmetricMessages( @@ -138,5 +137,5 @@ func TestGetV1AsymmetricMessages(t *testing.T) { &getReply, ) require.NoError(t, err) - require.Len(t, getReply.Messages, 1) + require.Len(t, getReply, 1) } diff --git a/waku/v2/rpc/relay.go b/waku/v2/rpc/relay.go index 610b1dc19..7bff50944 100644 --- a/waku/v2/rpc/relay.go +++ b/waku/v2/rpc/relay.go @@ -23,8 +23,8 @@ type RelayService struct { } type RelayMessageArgs struct { - Topic string `json:"topic,omitempty"` - Message pb.WakuMessage `json:"message,omitempty"` + Topic string `json:"topic,omitempty"` + Message RPCWakuRelayMessage `json:"message,omitempty"` } type TopicsArgs struct { @@ -43,6 +43,13 @@ func NewRelayService(node *node.WakuNode, log *zap.Logger) *RelayService { } s.runner = newRunnerService(node.Broadcaster(), s.addEnvelope) + + // Node may already be subscribed to some topics when Relay API handlers are installed. Let's add these + for _, topic := range node.Relay().Topics() { + s.log.Info("adding topic handler for existing subscription", zap.String("topic", topic)) + s.messages[topic] = make([]*pb.WakuMessage, 0) + } + return s } @@ -67,10 +74,13 @@ func (r *RelayService) Stop() { func (r *RelayService) PostV1Message(req *http.Request, args *RelayMessageArgs, reply *SuccessReply) error { var err error + + msg := args.Message.toProto() + if args.Topic == "" { - _, err = r.node.Relay().Publish(req.Context(), &args.Message) + _, err = r.node.Relay().Publish(req.Context(), msg) } else { - _, err = r.node.Relay().PublishToTopic(req.Context(), &args.Message, args.Topic) + _, err = r.node.Relay().PublishToTopic(req.Context(), msg, args.Topic) } if err != nil { r.log.Error("publishing message", zap.Error(err)) @@ -121,7 +131,7 @@ func (r *RelayService) DeleteV1Subscription(req *http.Request, args *TopicsArgs, return nil } -func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *MessagesReply) error { +func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply *RelayMessagesReply) error { r.messagesMutex.Lock() defer r.messagesMutex.Unlock() @@ -129,7 +139,10 @@ func (r *RelayService) GetV1Messages(req *http.Request, args *TopicArgs, reply * return fmt.Errorf("topic %s not subscribed", args.Topic) } - reply.Messages = r.messages[args.Topic] + for i := range r.messages[args.Topic] { + *reply = append(*reply, ProtoWakuMessageToRPCWakuRelayMessage(r.messages[args.Topic][i])) + } + r.messages[args.Topic] = make([]*pb.WakuMessage, 0) return nil } diff --git a/waku/v2/rpc/relay_test.go b/waku/v2/rpc/relay_test.go index b6a7c6ff7..0647c6a71 100644 --- a/waku/v2/rpc/relay_test.go +++ b/waku/v2/rpc/relay_test.go @@ -8,7 +8,6 @@ import ( "github.com/multiformats/go-multiaddr" "github.com/status-im/go-waku/waku/v2/node" - "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/utils" "github.com/stretchr/testify/require" ) @@ -97,7 +96,7 @@ func TestRelayGetV1Messages(t *testing.T) { makeRequest(t), &RelayMessageArgs{ Topic: "test", - Message: pb.WakuMessage{ + Message: RPCWakuRelayMessage{ Payload: []byte("test"), }, }, @@ -109,14 +108,14 @@ func TestRelayGetV1Messages(t *testing.T) { // Wait for the message to be received time.Sleep(1 * time.Second) - var messagesReply MessagesReply + var messagesReply RelayMessagesReply err = serviceB.GetV1Messages( makeRequest(t), &TopicArgs{"test"}, &messagesReply, ) require.NoError(t, err) - require.Len(t, messagesReply.Messages, 1) + require.Len(t, messagesReply, 1) err = serviceB.GetV1Messages( makeRequest(t), @@ -124,5 +123,5 @@ func TestRelayGetV1Messages(t *testing.T) { &messagesReply, ) require.NoError(t, err) - require.Len(t, messagesReply.Messages, 0) + require.Len(t, messagesReply, 0) } diff --git a/waku/v2/rpc/rpc_type.go b/waku/v2/rpc/rpc_type.go index 391bee207..322cef3f7 100644 --- a/waku/v2/rpc/rpc_type.go +++ b/waku/v2/rpc/rpc_type.go @@ -1,7 +1,5 @@ package rpc -import "github.com/status-im/go-waku/waku/v2/protocol/pb" - type SuccessReply struct { Success bool `json:"success,omitempty"` Error string `json:"error,omitempty"` @@ -10,6 +8,6 @@ type SuccessReply struct { type Empty struct { } -type MessagesReply struct { - Messages []*pb.WakuMessage `json:"messages,omitempty"` -} +type MessagesReply []*RPCWakuMessage + +type RelayMessagesReply []*RPCWakuRelayMessage diff --git a/waku/v2/rpc/store.go b/waku/v2/rpc/store.go index 690dcc20c..f5d011de6 100644 --- a/waku/v2/rpc/store.go +++ b/waku/v2/rpc/store.go @@ -33,7 +33,7 @@ type StoreMessagesArgs struct { } type StoreMessagesReply struct { - Messages []*pb.WakuMessage `json:"messages,omitempty"` + Messages []RPCWakuMessage `json:"messages,omitempty"` PagingInfo StorePagingOptions `json:"pagingInfo,omitempty"` Error string `json:"error,omitempty"` } @@ -60,7 +60,12 @@ func (s *StoreService) GetV1Messages(req *http.Request, args *StoreMessagesArgs, reply.Error = err.Error() return nil } - reply.Messages = res.Messages + + reply.Messages = make([]RPCWakuMessage, len(res.Messages)) + for i := range res.Messages { + reply.Messages[i] = *ProtoWakuMessageToRPCWakuMessage(res.Messages[i]) + } + reply.PagingInfo = StorePagingOptions{ PageSize: args.PagingOptions.PageSize, Cursor: res.Cursor(), diff --git a/waku/v2/rpc/utils.go b/waku/v2/rpc/utils.go new file mode 100644 index 000000000..96fa14462 --- /dev/null +++ b/waku/v2/rpc/utils.go @@ -0,0 +1,135 @@ +package rpc + +import ( + "encoding/hex" + "fmt" + "strings" + + "github.com/status-im/go-waku/waku/v2/protocol/pb" +) + +// HexBytes is marshalled to a hex string +type HexBytes []byte + +// ByteArray is marshalled to a uint8 array +type ByteArray []byte + +type RPCWakuMessage struct { + Payload ByteArray `json:"payload,omitempty"` + ContentTopic string `json:"contentTopic,omitempty"` + Version uint32 `json:"version"` + Timestamp int64 `json:"timestamp,omitempty"` + Proof HexBytes `json:"proof,omitempty"` +} + +type RPCWakuRelayMessage struct { + Payload HexBytes `json:"payload,omitempty"` + ContentTopic string `json:"contentTopic,omitempty"` + Timestamp int64 `json:"timestamp,omitempty"` + Proof HexBytes `json:"proof,omitempty"` + Version uint32 `json:"version"` +} + +func ProtoWakuMessageToRPCWakuMessage(input *pb.WakuMessage) *RPCWakuMessage { + if input == nil { + return nil + } + + return &RPCWakuMessage{ + Payload: input.Payload, + ContentTopic: input.ContentTopic, + Version: input.Version, + Timestamp: input.Timestamp, + Proof: input.Proof, + } +} + +func (r *RPCWakuMessage) toProto() *pb.WakuMessage { + if r == nil { + return nil + } + + return &pb.WakuMessage{ + Payload: r.Payload, + ContentTopic: r.ContentTopic, + Version: r.Version, + Timestamp: r.Timestamp, + Proof: r.Proof, + } +} + +func (u HexBytes) MarshalJSON() ([]byte, error) { + var result string + if u == nil { + result = "null" + } else { + result = strings.Join(strings.Fields(fmt.Sprintf("%d", u)), ",") + } + return []byte(result), nil +} + +func (h *HexBytes) UnmarshalText(b []byte) error { + hexString := "" + if b != nil { + hexString = string(b) + } + + decoded, err := hex.DecodeString(hexString) + if err != nil { + return err + } + + *h = decoded + + return nil +} + +func ProtoWakuMessageToRPCWakuRelayMessage(input *pb.WakuMessage) *RPCWakuRelayMessage { + if input == nil { + return nil + } + + return &RPCWakuRelayMessage{ + Payload: input.Payload, + ContentTopic: input.ContentTopic, + Timestamp: input.Timestamp, + Proof: input.Proof, + } +} + +func (r *RPCWakuRelayMessage) toProto() *pb.WakuMessage { + if r == nil { + return nil + } + + return &pb.WakuMessage{ + Payload: r.Payload, + ContentTopic: r.ContentTopic, + Timestamp: r.Timestamp, + Proof: r.Proof, + } +} + +func (h ByteArray) MarshalText() ([]byte, error) { + if h == nil { + return []byte{}, nil + } + + return []byte(hex.EncodeToString(h)), nil +} + +func (h *ByteArray) UnmarshalText(b []byte) error { + hexString := "" + if b != nil { + hexString = string(b) + } + + decoded, err := hex.DecodeString(hexString) + if err != nil { + return err + } + + *h = decoded + + return nil +} diff --git a/waku/v2/rpc/waku_rpc.go b/waku/v2/rpc/waku_rpc.go index affde951a..b1e0845ef 100644 --- a/waku/v2/rpc/waku_rpc.go +++ b/waku/v2/rpc/waku_rpc.go @@ -88,6 +88,9 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, server.RegisterOnShutdown(func() { filterService.Stop() relayService.Stop() + if wrpc.privateService != nil { + wrpc.privateService.Stop() + } }) wrpc.node = node @@ -101,6 +104,9 @@ func NewWakuRpc(node *node.WakuNode, address string, port int, enableAdmin bool, func (r *WakuRpc) Start() { go r.relayService.Start() go r.filterService.Start() + if r.privateService != nil { + go r.privateService.Start() + } go func() { _ = r.server.ListenAndServe() }()