Skip to content

Commit 681bfed

Browse files
authored
feat(ipld): vouchers as plain ipld.Node (#325)
* feat(ipld): vouchers as plain ipld.Node * feat: add ValidationResult#Equals() utility * feat(ipld): introduce TypedVoucher tuple type * chore(ipld): ipld.Node -> datamodel.Node * chore: remove RegisterVoucherResultType * fix: minor staticcheck fixes
1 parent 282c7ee commit 681bfed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+938
-1214
lines changed

README.md

+3-3
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func (vl *myValidator) ValidatePush(
8585
sender peer.ID,
8686
voucher datatransfer.Voucher,
8787
baseCid cid.Cid,
88-
selector ipld.Node) error {
88+
selector datamodel.Node) error {
8989
9090
v := voucher.(*myVoucher)
9191
if v.data == "" || v.data != "validpush" {
@@ -99,7 +99,7 @@ func (vl *myValidator) ValidatePull(
9999
receiver peer.ID,
100100
voucher datatransfer.Voucher,
101101
baseCid cid.Cid,
102-
selector ipld.Node) error {
102+
selector datamodel.Node) error {
103103
104104
v := voucher.(*myVoucher)
105105
if v.data == "" || v.data != "validpull" {
@@ -135,7 +135,7 @@ must be sent with the request. Using the trivial examples above:
135135
For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/impl_test.go).
136136

137137
### Open a Push or Pull Request
138-
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These
138+
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `datamodel.Node`. These
139139
calls return a `datatransfer.ChannelID` and any error:
140140
```go
141141
channelID, err := dtm.OpenPullDataChannel(ctx, recipient, voucher, baseCid, selector)

benchmarks/benchmark_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
105105
timer := time.NewTimer(30 * time.Second)
106106
start := time.Now()
107107
for j := 0; j < numfiles; j++ {
108-
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewFakeDTType(), allCids[j], allSelector)
108+
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewTestTypedVoucher(), allCids[j], allSelector)
109109
if err != nil {
110110
b.Fatalf("received error on request: %s", err.Error())
111111
}

benchmarks/testinstance/testinstance.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -188,8 +188,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
188188
sv := testutil.NewStubbedValidator()
189189
sv.StubSuccessPull()
190190
sv.StubSuccessPush()
191-
dt.RegisterVoucherType(testutil.NewFakeDTType(), sv)
192-
dt.RegisterVoucherResultType(testutil.NewFakeDTType())
191+
dt.RegisterVoucherType(testutil.TestVoucherType, sv)
193192
return Instance{
194193
Adapter: dtNet,
195194
Peer: p,

channels/channel_state.go

+42-37
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,19 @@ import (
44
"bytes"
55

66
"github.com/ipfs/go-cid"
7-
"github.com/ipld/go-ipld-prime"
87
"github.com/ipld/go-ipld-prime/codec/dagcbor"
8+
"github.com/ipld/go-ipld-prime/datamodel"
99
basicnode "github.com/ipld/go-ipld-prime/node/basic"
1010
peer "github.com/libp2p/go-libp2p-core/peer"
1111

1212
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
1313
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
14+
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
1415
)
1516

1617
// channelState is immutable channel data plus mutable state
1718
type channelState struct {
1819
ic internal.ChannelState
19-
20-
// additional voucherResults
21-
voucherResultDecoder DecoderByTypeFunc
22-
voucherDecoder DecoderByTypeFunc
2320
}
2421

2522
// EmptyChannelState is the zero value for channel state, meaning not present
@@ -45,7 +42,7 @@ func (c channelState) BaseCID() cid.Cid { return c.ic.BaseCid }
4542

4643
// Selector returns the IPLD selector for this data transfer (represented as
4744
// an IPLD node)
48-
func (c channelState) Selector() ipld.Node {
45+
func (c channelState) Selector() datamodel.Node {
4946
builder := basicnode.Prototype.Any.NewBuilder()
5047
reader := bytes.NewReader(c.ic.Selector.Raw)
5148
err := dagcbor.Decode(builder, reader)
@@ -56,13 +53,15 @@ func (c channelState) Selector() ipld.Node {
5653
}
5754

5855
// Voucher returns the voucher for this data transfer
59-
func (c channelState) Voucher() datatransfer.Voucher {
56+
func (c channelState) Voucher() (datatransfer.TypedVoucher, error) {
6057
if len(c.ic.Vouchers) == 0 {
61-
return nil
58+
return datatransfer.TypedVoucher{}, nil
59+
}
60+
node, err := ipldutils.DeferredToNode(c.ic.Vouchers[0].Voucher)
61+
if err != nil {
62+
return datatransfer.TypedVoucher{}, err
6263
}
63-
decoder, _ := c.voucherDecoder(c.ic.Vouchers[0].Type)
64-
encodable, _ := decoder.DecodeFromCbor(c.ic.Vouchers[0].Voucher.Raw)
65-
return encodable.(datatransfer.Voucher)
64+
return datatransfer.TypedVoucher{Voucher: node, Type: c.ic.Vouchers[0].Type}, nil
6665
}
6766

6867
// ReceivedCidsTotal returns the number of (non-unique) cids received so far
@@ -108,36 +107,46 @@ func (c channelState) Message() string {
108107
return c.ic.Message
109108
}
110109

111-
func (c channelState) Vouchers() []datatransfer.Voucher {
112-
vouchers := make([]datatransfer.Voucher, 0, len(c.ic.Vouchers))
110+
func (c channelState) Vouchers() ([]datatransfer.TypedVoucher, error) {
111+
vouchers := make([]datatransfer.TypedVoucher, 0, len(c.ic.Vouchers))
113112
for _, encoded := range c.ic.Vouchers {
114-
decoder, _ := c.voucherDecoder(encoded.Type)
115-
encodable, _ := decoder.DecodeFromCbor(encoded.Voucher.Raw)
116-
vouchers = append(vouchers, encodable.(datatransfer.Voucher))
113+
node, err := ipldutils.DeferredToNode(encoded.Voucher)
114+
if err != nil {
115+
return nil, err
116+
}
117+
vouchers = append(vouchers, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
117118
}
118-
return vouchers
119+
return vouchers, nil
119120
}
120121

121-
func (c channelState) LastVoucher() datatransfer.Voucher {
122-
decoder, _ := c.voucherDecoder(c.ic.Vouchers[len(c.ic.Vouchers)-1].Type)
123-
encodable, _ := decoder.DecodeFromCbor(c.ic.Vouchers[len(c.ic.Vouchers)-1].Voucher.Raw)
124-
return encodable.(datatransfer.Voucher)
122+
func (c channelState) LastVoucher() (datatransfer.TypedVoucher, error) {
123+
ev := c.ic.Vouchers[len(c.ic.Vouchers)-1]
124+
node, err := ipldutils.DeferredToNode(ev.Voucher)
125+
if err != nil {
126+
return datatransfer.TypedVoucher{}, err
127+
}
128+
return datatransfer.TypedVoucher{Voucher: node, Type: ev.Type}, nil
125129
}
126130

127-
func (c channelState) LastVoucherResult() datatransfer.VoucherResult {
128-
decoder, _ := c.voucherResultDecoder(c.ic.VoucherResults[len(c.ic.VoucherResults)-1].Type)
129-
encodable, _ := decoder.DecodeFromCbor(c.ic.VoucherResults[len(c.ic.VoucherResults)-1].VoucherResult.Raw)
130-
return encodable.(datatransfer.VoucherResult)
131+
func (c channelState) LastVoucherResult() (datatransfer.TypedVoucher, error) {
132+
evr := c.ic.VoucherResults[len(c.ic.VoucherResults)-1]
133+
node, err := ipldutils.DeferredToNode(evr.VoucherResult)
134+
if err != nil {
135+
return datatransfer.TypedVoucher{}, err
136+
}
137+
return datatransfer.TypedVoucher{Voucher: node, Type: evr.Type}, nil
131138
}
132139

133-
func (c channelState) VoucherResults() []datatransfer.VoucherResult {
134-
voucherResults := make([]datatransfer.VoucherResult, 0, len(c.ic.VoucherResults))
140+
func (c channelState) VoucherResults() ([]datatransfer.TypedVoucher, error) {
141+
voucherResults := make([]datatransfer.TypedVoucher, 0, len(c.ic.VoucherResults))
135142
for _, encoded := range c.ic.VoucherResults {
136-
decoder, _ := c.voucherResultDecoder(encoded.Type)
137-
encodable, _ := decoder.DecodeFromCbor(encoded.VoucherResult.Raw)
138-
voucherResults = append(voucherResults, encodable.(datatransfer.VoucherResult))
143+
node, err := ipldutils.DeferredToNode(encoded.VoucherResult)
144+
if err != nil {
145+
return nil, err
146+
}
147+
voucherResults = append(voucherResults, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
139148
}
140-
return voucherResults
149+
return voucherResults, nil
141150
}
142151

143152
func (c channelState) SelfPeer() peer.ID {
@@ -174,12 +183,8 @@ func (c channelState) Stages() *datatransfer.ChannelStages {
174183
return c.ic.Stages
175184
}
176185

177-
func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
178-
return channelState{
179-
ic: c,
180-
voucherResultDecoder: voucherResultDecoder,
181-
voucherDecoder: voucherDecoder,
182-
}
186+
func fromInternalChannelState(c internal.ChannelState) datatransfer.ChannelState {
187+
return channelState{ic: c}
183188
}
184189

185190
var _ datatransfer.ChannelState = channelState{}

channels/channels.go

+15-27
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import (
77

88
"github.com/ipfs/go-cid"
99
"github.com/ipfs/go-datastore"
10-
"github.com/ipld/go-ipld-prime"
10+
"github.com/ipld/go-ipld-prime/datamodel"
1111
peer "github.com/libp2p/go-libp2p-core/peer"
1212
cbg "github.com/whyrusleeping/cbor-gen"
1313
"golang.org/x/xerrors"
@@ -20,11 +20,9 @@ import (
2020
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
2121
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
2222
"github.com/filecoin-project/go-data-transfer/v2/channels/internal/migrations"
23-
"github.com/filecoin-project/go-data-transfer/v2/encoding"
23+
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
2424
)
2525

26-
type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)
27-
2826
type Notifier func(datatransfer.Event, datatransfer.ChannelState)
2927

3028
// ErrNotFound is returned when a channel cannot be found with a given channel ID
@@ -46,8 +44,6 @@ var ErrWrongType = errors.New("Cannot change type of implementation specific dat
4644
// Channels is a thread safe list of channels
4745
type Channels struct {
4846
notifier Notifier
49-
voucherDecoder DecoderByTypeFunc
50-
voucherResultDecoder DecoderByTypeFunc
5147
blockIndexCache *blockIndexCache
5248
progressCache *progressCache
5349
stateMachines fsm.Group
@@ -65,16 +61,10 @@ type ChannelEnvironment interface {
6561
// New returns a new thread safe list of channels
6662
func New(ds datastore.Batching,
6763
notifier Notifier,
68-
voucherDecoder DecoderByTypeFunc,
69-
voucherResultDecoder DecoderByTypeFunc,
7064
env ChannelEnvironment,
7165
selfPeer peer.ID) (*Channels, error) {
7266

73-
c := &Channels{
74-
notifier: notifier,
75-
voucherDecoder: voucherDecoder,
76-
voucherResultDecoder: voucherResultDecoder,
77-
}
67+
c := &Channels{notifier: notifier}
7868
c.blockIndexCache = newBlockIndexCache()
7969
c.progressCache = newProgressCache()
8070
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer)
@@ -121,19 +111,19 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
121111

122112
// CreateNew creates a new channel id and channel state and saves to channels.
123113
// returns error if the channel exists already.
124-
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
114+
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector datamodel.Node, voucher datatransfer.TypedVoucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
125115
var responder peer.ID
126116
if dataSender == initiator {
127117
responder = dataReceiver
128118
} else {
129119
responder = dataSender
130120
}
131121
chid := datatransfer.ChannelID{Initiator: initiator, Responder: responder, ID: tid}
132-
voucherBytes, err := encoding.Encode(voucher)
122+
initialVoucher, err := ipldutils.NodeToDeferred(voucher.Voucher)
133123
if err != nil {
134124
return datatransfer.ChannelID{}, err
135125
}
136-
selBytes, err := encoding.Encode(selector)
126+
selBytes, err := ipldutils.NodeToBytes(selector)
137127
if err != nil {
138128
return datatransfer.ChannelID{}, err
139129
}
@@ -149,10 +139,8 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
149139
Stages: &datatransfer.ChannelStages{},
150140
Vouchers: []internal.EncodedVoucher{
151141
{
152-
Type: voucher.Type(),
153-
Voucher: &cbg.Deferred{
154-
Raw: voucherBytes,
155-
},
142+
Type: voucher.Type,
143+
Voucher: initialVoucher,
156144
},
157145
},
158146
Status: datatransfer.Requested,
@@ -289,21 +277,21 @@ func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error {
289277
}
290278

291279
// NewVoucher records a new voucher for this channel
292-
func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.Voucher) error {
293-
voucherBytes, err := encoding.Encode(voucher)
280+
func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.TypedVoucher) error {
281+
voucherBytes, err := ipldutils.NodeToBytes(voucher.Voucher)
294282
if err != nil {
295283
return err
296284
}
297-
return c.send(chid, datatransfer.NewVoucher, voucher.Type(), voucherBytes)
285+
return c.send(chid, datatransfer.NewVoucher, voucher.Type, voucherBytes)
298286
}
299287

300288
// NewVoucherResult records a new voucher result for this channel
301-
func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.VoucherResult) error {
302-
voucherResultBytes, err := encoding.Encode(voucherResult)
289+
func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.TypedVoucher) error {
290+
voucherResultBytes, err := ipldutils.NodeToBytes(voucherResult.Voucher)
303291
if err != nil {
304292
return err
305293
}
306-
return c.send(chid, datatransfer.NewVoucherResult, voucherResult.Type(), voucherResultBytes)
294+
return c.send(chid, datatransfer.NewVoucherResult, voucherResult.Type, voucherResultBytes)
307295
}
308296

309297
// Complete indicates responder has completed sending/receiving data
@@ -485,5 +473,5 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran
485473

486474
// Convert from the internally used channel state format to the externally exposed ChannelState
487475
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
488-
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder)
476+
return fromInternalChannelState(ch)
489477
}

0 commit comments

Comments
 (0)