Skip to content
This repository was archived by the owner on Oct 5, 2023. It is now read-only.

Commit 7abddda

Browse files
committed
Implement PubSub Api
1 parent 0110569 commit 7abddda

File tree

3 files changed

+131
-2
lines changed

3 files changed

+131
-2
lines changed

api.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,5 +166,5 @@ func (api *HttpApi) Swarm() iface.SwarmAPI {
166166
}
167167

168168
func (api *HttpApi) PubSub() iface.PubSubAPI {
169-
return nil
169+
return (*PubsubAPI)(api)
170170
}

api_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (NodeProvider) MakeAPISwarm(ctx context.Context, fullIdentity bool, n int)
4747
return nil, err
4848
}
4949

50-
startArgs := []string{"iptb", "--IPTB_ROOT", dir, "start", "-wait", "--", "--offline=" + strconv.FormatBool(n == 1)}
50+
startArgs := []string{"iptb", "--IPTB_ROOT", dir, "start", "-wait", "--", "--enable-pubsub-experiment", "--offline=" + strconv.FormatBool(n == 1)}
5151
if err := c.Run(startArgs); err != nil {
5252
return nil, err
5353
}

pubsub.go

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package httpapi
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"io"
8+
9+
"github.com/ipfs/go-ipfs/core/coreapi/interface"
10+
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
11+
12+
"github.com/libp2p/go-libp2p-peer"
13+
)
14+
15+
type PubsubAPI HttpApi
16+
17+
func (api *PubsubAPI) Ls(ctx context.Context) ([]string, error) {
18+
var out struct {
19+
Strings []string
20+
}
21+
22+
if err := api.core().request("pubsub/ls").Exec(ctx, &out); err != nil {
23+
return nil, err
24+
}
25+
26+
return out.Strings, nil
27+
}
28+
29+
func (api *PubsubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
30+
options, err := caopts.PubSubPeersOptions(opts...)
31+
if err != nil {
32+
return nil, err
33+
}
34+
35+
var out struct {
36+
Strings []string
37+
}
38+
39+
if err := api.core().request("pubsub/peers", options.Topic).Exec(ctx, &out); err != nil {
40+
return nil, err
41+
}
42+
43+
res := make([]peer.ID, len(out.Strings))
44+
for i, sid := range out.Strings {
45+
id, err := peer.IDB58Decode(sid)
46+
if err != nil {
47+
return nil, err
48+
}
49+
res[i] = id
50+
}
51+
return res, nil
52+
}
53+
54+
func (api *PubsubAPI) Publish(ctx context.Context, topic string, message []byte) error {
55+
return api.core().request("pubsub/pub", topic).
56+
FileBody(bytes.NewReader(message)).
57+
Exec(ctx, nil)
58+
}
59+
60+
type pubsubSub struct {
61+
io.Closer
62+
dec *json.Decoder
63+
}
64+
65+
type pubsubMessage struct {
66+
JFrom []byte `json:"from,omitempty"`
67+
JData []byte `json:"data,omitempty"`
68+
JSeqno []byte `json:"seqno,omitempty"`
69+
JTopicIDs []string `json:"topicIDs,omitempty"`
70+
}
71+
72+
func (msg *pubsubMessage) valid() error {
73+
_, err := peer.IDFromBytes(msg.JFrom)
74+
return err
75+
}
76+
77+
func (msg *pubsubMessage) From() peer.ID {
78+
id, _ := peer.IDFromBytes(msg.JFrom)
79+
return id
80+
}
81+
82+
func (msg *pubsubMessage) Data() []byte {
83+
return msg.JData
84+
}
85+
86+
func (msg *pubsubMessage) Seq() []byte {
87+
return msg.JSeqno
88+
}
89+
90+
func (msg *pubsubMessage) Topics() []string {
91+
return msg.JTopicIDs
92+
}
93+
94+
func (s *pubsubSub) Next(ctx context.Context) (iface.PubSubMessage, error) {
95+
// TODO: handle ctx
96+
97+
var msg pubsubMessage
98+
if err := s.dec.Decode(&msg); err != nil {
99+
return nil, err
100+
}
101+
return &msg, msg.valid()
102+
}
103+
104+
func (api *PubsubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (iface.PubSubSubscription, error) {
105+
options, err := caopts.PubSubSubscribeOptions(opts...)
106+
if err != nil {
107+
return nil, err
108+
}
109+
110+
resp, err := api.core().request("pubsub/sub", topic).
111+
Option("discover", options.Discover).
112+
Send(ctx)
113+
if err != nil {
114+
return nil, err
115+
}
116+
if resp.Error != nil {
117+
return nil, resp.Error
118+
}
119+
120+
return &pubsubSub{
121+
Closer: resp,
122+
dec: json.NewDecoder(resp.Output),
123+
}, nil
124+
}
125+
126+
func (api *PubsubAPI) core() *HttpApi {
127+
return (*HttpApi)(api)
128+
}
129+

0 commit comments

Comments
 (0)