Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ipld): vouchers as plain ipld.Node #325

Merged
merged 6 commits into from
Jun 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (vl *myValidator) ValidatePush(
sender peer.ID,
voucher datatransfer.Voucher,
baseCid cid.Cid,
selector ipld.Node) error {
selector datamodel.Node) error {

v := voucher.(*myVoucher)
if v.data == "" || v.data != "validpush" {
Expand All @@ -99,7 +99,7 @@ func (vl *myValidator) ValidatePull(
receiver peer.ID,
voucher datatransfer.Voucher,
baseCid cid.Cid,
selector ipld.Node) error {
selector datamodel.Node) error {

v := voucher.(*myVoucher)
if v.data == "" || v.data != "validpull" {
Expand Down Expand Up @@ -135,7 +135,7 @@ must be sent with the request. Using the trivial examples above:
For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/impl_test.go).

### Open a Push or Pull Request
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `datamodel.Node`. These
calls return a `datatransfer.ChannelID` and any error:
```go
channelID, err := dtm.OpenPullDataChannel(ctx, recipient, voucher, baseCid, selector)
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
timer := time.NewTimer(30 * time.Second)
start := time.Now()
for j := 0; j < numfiles; j++ {
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewFakeDTType(), allCids[j], allSelector)
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewTestTypedVoucher(), allCids[j], allSelector)
if err != nil {
b.Fatalf("received error on request: %s", err.Error())
}
Expand Down
3 changes: 1 addition & 2 deletions benchmarks/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,8 +188,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
sv := testutil.NewStubbedValidator()
sv.StubSuccessPull()
sv.StubSuccessPush()
dt.RegisterVoucherType(testutil.NewFakeDTType(), sv)
dt.RegisterVoucherResultType(testutil.NewFakeDTType())
dt.RegisterVoucherType(testutil.TestVoucherType, sv)
return Instance{
Adapter: dtNet,
Peer: p,
Expand Down
79 changes: 42 additions & 37 deletions channels/channel_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,19 @@ import (
"bytes"

"github.com/ipfs/go-cid"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/codec/dagcbor"
"github.com/ipld/go-ipld-prime/datamodel"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
peer "github.com/libp2p/go-libp2p-core/peer"

datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
)

// channelState is immutable channel data plus mutable state
type channelState struct {
ic internal.ChannelState

// additional voucherResults
voucherResultDecoder DecoderByTypeFunc
voucherDecoder DecoderByTypeFunc
}

// EmptyChannelState is the zero value for channel state, meaning not present
Expand All @@ -45,7 +42,7 @@ func (c channelState) BaseCID() cid.Cid { return c.ic.BaseCid }

// Selector returns the IPLD selector for this data transfer (represented as
// an IPLD node)
func (c channelState) Selector() ipld.Node {
func (c channelState) Selector() datamodel.Node {
builder := basicnode.Prototype.Any.NewBuilder()
reader := bytes.NewReader(c.ic.Selector.Raw)
err := dagcbor.Decode(builder, reader)
Expand All @@ -56,13 +53,15 @@ func (c channelState) Selector() ipld.Node {
}

// Voucher returns the voucher for this data transfer
func (c channelState) Voucher() datatransfer.Voucher {
func (c channelState) Voucher() (datatransfer.TypedVoucher, error) {
if len(c.ic.Vouchers) == 0 {
return nil
return datatransfer.TypedVoucher{}, nil
}
node, err := ipldutils.DeferredToNode(c.ic.Vouchers[0].Voucher)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
decoder, _ := c.voucherDecoder(c.ic.Vouchers[0].Type)
encodable, _ := decoder.DecodeFromCbor(c.ic.Vouchers[0].Voucher.Raw)
return encodable.(datatransfer.Voucher)
return datatransfer.TypedVoucher{Voucher: node, Type: c.ic.Vouchers[0].Type}, nil
}

// ReceivedCidsTotal returns the number of (non-unique) cids received so far
Expand Down Expand Up @@ -108,36 +107,46 @@ func (c channelState) Message() string {
return c.ic.Message
}

func (c channelState) Vouchers() []datatransfer.Voucher {
vouchers := make([]datatransfer.Voucher, 0, len(c.ic.Vouchers))
func (c channelState) Vouchers() ([]datatransfer.TypedVoucher, error) {
vouchers := make([]datatransfer.TypedVoucher, 0, len(c.ic.Vouchers))
for _, encoded := range c.ic.Vouchers {
decoder, _ := c.voucherDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.Voucher.Raw)
vouchers = append(vouchers, encodable.(datatransfer.Voucher))
node, err := ipldutils.DeferredToNode(encoded.Voucher)
if err != nil {
return nil, err
}
vouchers = append(vouchers, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
}
return vouchers
return vouchers, nil
}

func (c channelState) LastVoucher() datatransfer.Voucher {
decoder, _ := c.voucherDecoder(c.ic.Vouchers[len(c.ic.Vouchers)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.ic.Vouchers[len(c.ic.Vouchers)-1].Voucher.Raw)
return encodable.(datatransfer.Voucher)
func (c channelState) LastVoucher() (datatransfer.TypedVoucher, error) {
ev := c.ic.Vouchers[len(c.ic.Vouchers)-1]
node, err := ipldutils.DeferredToNode(ev.Voucher)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
return datatransfer.TypedVoucher{Voucher: node, Type: ev.Type}, nil
}

func (c channelState) LastVoucherResult() datatransfer.VoucherResult {
decoder, _ := c.voucherResultDecoder(c.ic.VoucherResults[len(c.ic.VoucherResults)-1].Type)
encodable, _ := decoder.DecodeFromCbor(c.ic.VoucherResults[len(c.ic.VoucherResults)-1].VoucherResult.Raw)
return encodable.(datatransfer.VoucherResult)
func (c channelState) LastVoucherResult() (datatransfer.TypedVoucher, error) {
evr := c.ic.VoucherResults[len(c.ic.VoucherResults)-1]
node, err := ipldutils.DeferredToNode(evr.VoucherResult)
if err != nil {
return datatransfer.TypedVoucher{}, err
}
return datatransfer.TypedVoucher{Voucher: node, Type: evr.Type}, nil
}

func (c channelState) VoucherResults() []datatransfer.VoucherResult {
voucherResults := make([]datatransfer.VoucherResult, 0, len(c.ic.VoucherResults))
func (c channelState) VoucherResults() ([]datatransfer.TypedVoucher, error) {
voucherResults := make([]datatransfer.TypedVoucher, 0, len(c.ic.VoucherResults))
for _, encoded := range c.ic.VoucherResults {
decoder, _ := c.voucherResultDecoder(encoded.Type)
encodable, _ := decoder.DecodeFromCbor(encoded.VoucherResult.Raw)
voucherResults = append(voucherResults, encodable.(datatransfer.VoucherResult))
node, err := ipldutils.DeferredToNode(encoded.VoucherResult)
if err != nil {
return nil, err
}
voucherResults = append(voucherResults, datatransfer.TypedVoucher{Voucher: node, Type: encoded.Type})
}
return voucherResults
return voucherResults, nil
}

func (c channelState) SelfPeer() peer.ID {
Expand Down Expand Up @@ -174,12 +183,8 @@ func (c channelState) Stages() *datatransfer.ChannelStages {
return c.ic.Stages
}

func fromInternalChannelState(c internal.ChannelState, voucherDecoder DecoderByTypeFunc, voucherResultDecoder DecoderByTypeFunc) datatransfer.ChannelState {
return channelState{
ic: c,
voucherResultDecoder: voucherResultDecoder,
voucherDecoder: voucherDecoder,
}
func fromInternalChannelState(c internal.ChannelState) datatransfer.ChannelState {
return channelState{ic: c}
}

var _ datatransfer.ChannelState = channelState{}
42 changes: 15 additions & 27 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-ipld-prime"
"github.com/ipld/go-ipld-prime/datamodel"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
Expand All @@ -20,11 +20,9 @@ import (
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal"
"github.com/filecoin-project/go-data-transfer/v2/channels/internal/migrations"
"github.com/filecoin-project/go-data-transfer/v2/encoding"
ipldutils "github.com/filecoin-project/go-data-transfer/v2/ipldutils"
)

type DecoderByTypeFunc func(identifier datatransfer.TypeIdentifier) (encoding.Decoder, bool)

type Notifier func(datatransfer.Event, datatransfer.ChannelState)

// ErrNotFound is returned when a channel cannot be found with a given channel ID
Expand All @@ -46,8 +44,6 @@ var ErrWrongType = errors.New("Cannot change type of implementation specific dat
// Channels is a thread safe list of channels
type Channels struct {
notifier Notifier
voucherDecoder DecoderByTypeFunc
voucherResultDecoder DecoderByTypeFunc
blockIndexCache *blockIndexCache
progressCache *progressCache
stateMachines fsm.Group
Expand All @@ -65,16 +61,10 @@ type ChannelEnvironment interface {
// New returns a new thread safe list of channels
func New(ds datastore.Batching,
notifier Notifier,
voucherDecoder DecoderByTypeFunc,
voucherResultDecoder DecoderByTypeFunc,
env ChannelEnvironment,
selfPeer peer.ID) (*Channels, error) {

c := &Channels{
notifier: notifier,
voucherDecoder: voucherDecoder,
voucherResultDecoder: voucherResultDecoder,
}
c := &Channels{notifier: notifier}
c.blockIndexCache = newBlockIndexCache()
c.progressCache = newProgressCache()
channelMigrations, err := migrations.GetChannelStateMigrations(selfPeer)
Expand Down Expand Up @@ -121,19 +111,19 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {

// CreateNew creates a new channel id and channel state and saves to channels.
// returns error if the channel exists already.
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector ipld.Node, voucher datatransfer.Voucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, baseCid cid.Cid, selector datamodel.Node, voucher datatransfer.TypedVoucher, initiator, dataSender, dataReceiver peer.ID) (datatransfer.ChannelID, error) {
var responder peer.ID
if dataSender == initiator {
responder = dataReceiver
} else {
responder = dataSender
}
chid := datatransfer.ChannelID{Initiator: initiator, Responder: responder, ID: tid}
voucherBytes, err := encoding.Encode(voucher)
initialVoucher, err := ipldutils.NodeToDeferred(voucher.Voucher)
if err != nil {
return datatransfer.ChannelID{}, err
}
selBytes, err := encoding.Encode(selector)
selBytes, err := ipldutils.NodeToBytes(selector)
if err != nil {
return datatransfer.ChannelID{}, err
}
Expand All @@ -149,10 +139,8 @@ func (c *Channels) CreateNew(selfPeer peer.ID, tid datatransfer.TransferID, base
Stages: &datatransfer.ChannelStages{},
Vouchers: []internal.EncodedVoucher{
{
Type: voucher.Type(),
Voucher: &cbg.Deferred{
Raw: voucherBytes,
},
Type: voucher.Type,
Voucher: initialVoucher,
},
},
Status: datatransfer.Requested,
Expand Down Expand Up @@ -289,21 +277,21 @@ func (c *Channels) ResumeResponder(chid datatransfer.ChannelID) error {
}

// NewVoucher records a new voucher for this channel
func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.Voucher) error {
voucherBytes, err := encoding.Encode(voucher)
func (c *Channels) NewVoucher(chid datatransfer.ChannelID, voucher datatransfer.TypedVoucher) error {
voucherBytes, err := ipldutils.NodeToBytes(voucher.Voucher)
if err != nil {
return err
}
return c.send(chid, datatransfer.NewVoucher, voucher.Type(), voucherBytes)
return c.send(chid, datatransfer.NewVoucher, voucher.Type, voucherBytes)
}

// NewVoucherResult records a new voucher result for this channel
func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.VoucherResult) error {
voucherResultBytes, err := encoding.Encode(voucherResult)
func (c *Channels) NewVoucherResult(chid datatransfer.ChannelID, voucherResult datatransfer.TypedVoucher) error {
voucherResultBytes, err := ipldutils.NodeToBytes(voucherResult.Voucher)
if err != nil {
return err
}
return c.send(chid, datatransfer.NewVoucherResult, voucherResult.Type(), voucherResultBytes)
return c.send(chid, datatransfer.NewVoucherResult, voucherResult.Type, voucherResultBytes)
}

// Complete indicates responder has completed sending/receiving data
Expand Down Expand Up @@ -485,5 +473,5 @@ func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatran

// Convert from the internally used channel state format to the externally exposed ChannelState
func (c *Channels) fromInternalChannelState(ch internal.ChannelState) datatransfer.ChannelState {
return fromInternalChannelState(ch, c.voucherDecoder, c.voucherResultDecoder)
return fromInternalChannelState(ch)
}
Loading