Skip to content

Commit c9889dc

Browse files
hannahhowarddirkmcrvagg
authored
Data Transfer V2 final (#349)
* chore(v2): setup the v2 release given the expected breaking changes, it's time to setup the v2 release BREAKING CHANGE: v2 modules * Refactor revalidators (#308) * feat(revalidators): initial refactor refactor revalidation process -- removing independent revalidations, making validations results more clear * refactor(rename): RevalidateToComplete -> RequiresFinalization * refactor(datatransfer): enhance validator comments * refactor(message): revert message response changes * refactor(impl): comment and refactor on events add comments to event processing and also extract functions for receiving requests to a new file * refactor(message): s/IsVoucherResult/IsValidationResult rename the IsVoucherResult to is 'IsValidationResult' also add a method for generating messages from validation results * feat(events): only dispatch events on change Only dispatch Pause, Resume, SetDataLimit, and RequiresFinalization on change * style(imports): fix imports * feat(events): add DataLimitExceeded event * Update channels/channel_state.go Co-authored-by: dirkmc <dirkmdev@gmail.com> * Update manager.go Co-authored-by: dirkmc <dirkmdev@gmail.com> * Update manager.go Co-authored-by: dirkmc <dirkmdev@gmail.com> * Update statuses.go Co-authored-by: Rod Vagg <rod@vagg.org> Co-authored-by: dirkmc <dirkmdev@gmail.com> Co-authored-by: Rod Vagg <rod@vagg.org> * Refactor revalidators v2 (#322) * refactor(validators): remove revalidation move to all revalidation being asynchronous * feat(validators): implied pauses causes datalimit exceeded and requires finalization to leave request paused, regardless of where LeaveRequestPaused is set * refactor(events): reorder events reorder validation events so all get record when transfer finishes * Update impl/impl.go Co-authored-by: Rod Vagg <rod@vagg.org> Co-authored-by: Rod Vagg <rod@vagg.org> * chore(message): delete old message format code (#330) * feat(ipld): vouchers as plain ipld.Node (#325) * feat(ipld): vouchers as plain ipld.Node * feat: add ValidationResult#Equals() utility * feat(ipld): introduce TypedVoucher tuple type * chore(ipld): ipld.Node -> datamodel.Node * chore: remove RegisterVoucherResultType * fix: minor staticcheck fixes * refactor(channelstate): use cborgencompatiblenode (#336) use cborgencomptaiblenode to simply channelstate interface * feat(ipld): use bindnode/registry (#340) * chore(deps): update libp2p v0.19.4 (#341) * feat(ipld): use bindnode/registry Co-authored-by: Hannah Howard <hannah@hannahhoward.net> * refactor(v2): port graphsync w/o transport refactor Ports state machine and graphsync changes without the big transport refactor * fix(pr): respond to pr comments * chore(deps): upgrade libp2p v0.22 also upgrades graphsync and removes go-libp2p-core paths * fix(deps): use tagged go-ipld-prime Co-authored-by: dirkmc <dirkmdev@gmail.com> Co-authored-by: Rod Vagg <rod@vagg.org>
1 parent 07e61bc commit c9889dc

File tree

86 files changed

+4618
-5498
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+4618
-5498
lines changed

CONTRIBUTING.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ import (
5959

6060
"github.com/filecoin-project/go-statemachine"
6161

62-
datatransfer "github.com/filecoin-project/go-data-transfer"
62+
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
6363
)
6464
```
6565

README.md

+7-7
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ This module encapsulates protocols for exchanging piece data between storage cli
2121

2222
**Requires go 1.13**
2323

24-
Install the module in your package or app with `go get "github.com/filecoin-project/go-data-transfer/datatransfer"`
24+
Install the module in your package or app with `go get "github.com/filecoin-project/go-data-transfer/v2/datatransfer"`
2525

2626

2727
### Initialize a data transfer module
@@ -31,9 +31,9 @@ Install the module in your package or app with `go get "github.com/filecoin-proj
3131

3232
import (
3333
gsimpl "github.com/ipfs/go-graphsync/impl"
34-
datatransfer "github.com/filecoin-project/go-data-transfer/impl"
35-
gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
36-
"github.com/libp2p/go-libp2p-core/host"
34+
datatransfer "github.com/filecoin-project/go-data-transfer/v2/impl"
35+
gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
36+
"github.com/libp2p/go-libp2p/core/host"
3737
)
3838

3939
```
@@ -85,7 +85,7 @@ func (vl *myValidator) ValidatePush(
8585
sender peer.ID,
8686
voucher datatransfer.Voucher,
8787
baseCid cid.Cid,
88-
selector ipld.Node) error {
88+
selector datamodel.Node) error {
8989
9090
v := voucher.(*myVoucher)
9191
if v.data == "" || v.data != "validpush" {
@@ -99,7 +99,7 @@ func (vl *myValidator) ValidatePull(
9999
receiver peer.ID,
100100
voucher datatransfer.Voucher,
101101
baseCid cid.Cid,
102-
selector ipld.Node) error {
102+
selector datamodel.Node) error {
103103
104104
v := voucher.(*myVoucher)
105105
if v.data == "" || v.data != "validpull" {
@@ -135,7 +135,7 @@ must be sent with the request. Using the trivial examples above:
135135
For more detail, please see the [unit tests](https://github.com/filecoin-project/go-data-transfer/blob/master/impl/impl_test.go).
136136

137137
### Open a Push or Pull Request
138-
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `ipld.Node`. These
138+
For a push or pull request, provide a context, a `datatransfer.Voucher`, a host recipient `peer.ID`, a baseCID `cid.CID` and a selector `datamodel.Node`. These
139139
calls return a `datatransfer.ChannelID` and any error:
140140
```go
141141
channelID, err := dtm.OpenPullDataChannel(ctx, recipient, voucher, baseCid, selector)

benchmarks/benchmark_test.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@ import (
3030
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
3131
"github.com/stretchr/testify/require"
3232

33-
datatransfer "github.com/filecoin-project/go-data-transfer"
34-
"github.com/filecoin-project/go-data-transfer/benchmarks/testinstance"
35-
tn "github.com/filecoin-project/go-data-transfer/benchmarks/testnet"
36-
"github.com/filecoin-project/go-data-transfer/testutil"
33+
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
34+
"github.com/filecoin-project/go-data-transfer/v2/benchmarks/testinstance"
35+
tn "github.com/filecoin-project/go-data-transfer/v2/benchmarks/testnet"
36+
"github.com/filecoin-project/go-data-transfer/v2/testutil"
3737
)
3838

3939
const stdBlockSize = 8000
@@ -105,7 +105,7 @@ func p2pStrestTest(ctx context.Context, b *testing.B, numfiles int, df distFunc,
105105
timer := time.NewTimer(30 * time.Second)
106106
start := time.Now()
107107
for j := 0; j < numfiles; j++ {
108-
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewFakeDTType(), allCids[j], allSelector)
108+
_, err := pusher.Manager.OpenPushDataChannel(ctx, receiver.Peer, testutil.NewTestTypedVoucher(), allCids[j], allSelector)
109109
if err != nil {
110110
b.Fatalf("received error on request: %s", err.Error())
111111
}

benchmarks/testinstance/testinstance.go

+9-10
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ import (
1616
blockstore "github.com/ipfs/go-ipfs-blockstore"
1717
delay "github.com/ipfs/go-ipfs-delay"
1818
"github.com/ipld/go-ipld-prime"
19-
peer "github.com/libp2p/go-libp2p-core/peer"
20-
21-
datatransfer "github.com/filecoin-project/go-data-transfer"
22-
tn "github.com/filecoin-project/go-data-transfer/benchmarks/testnet"
23-
dtimpl "github.com/filecoin-project/go-data-transfer/impl"
24-
dtnet "github.com/filecoin-project/go-data-transfer/network"
25-
"github.com/filecoin-project/go-data-transfer/testutil"
26-
gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
19+
peer "github.com/libp2p/go-libp2p/core/peer"
20+
21+
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
22+
tn "github.com/filecoin-project/go-data-transfer/v2/benchmarks/testnet"
23+
dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl"
24+
dtnet "github.com/filecoin-project/go-data-transfer/v2/network"
25+
"github.com/filecoin-project/go-data-transfer/v2/testutil"
26+
gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync"
2727
)
2828

2929
// TempDirGenerator is any interface that can generate temporary directories
@@ -188,8 +188,7 @@ func NewInstance(ctx context.Context, net tn.Network, tempDir string, diskBasedD
188188
sv := testutil.NewStubbedValidator()
189189
sv.StubSuccessPull()
190190
sv.StubSuccessPush()
191-
dt.RegisterVoucherType(testutil.NewFakeDTType(), sv)
192-
dt.RegisterVoucherResultType(testutil.NewFakeDTType())
191+
dt.RegisterVoucherType(testutil.TestVoucherType, sv)
193192
return Instance{
194193
Adapter: dtNet,
195194
Peer: p,

benchmarks/testnet/interface.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package testnet
22

33
import (
44
gsnet "github.com/ipfs/go-graphsync/network"
5-
"github.com/libp2p/go-libp2p-core/peer"
5+
"github.com/libp2p/go-libp2p/core/peer"
66

7-
dtnet "github.com/filecoin-project/go-data-transfer/network"
7+
dtnet "github.com/filecoin-project/go-data-transfer/v2/network"
88
)
99

1010
// Network is an interface for generating graphsync network interfaces

benchmarks/testnet/peernet.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@ import (
44
"context"
55

66
gsnet "github.com/ipfs/go-graphsync/network"
7-
"github.com/libp2p/go-libp2p-core/peer"
7+
"github.com/libp2p/go-libp2p/core/peer"
88
mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock"
99

10-
dtnet "github.com/filecoin-project/go-data-transfer/network"
10+
dtnet "github.com/filecoin-project/go-data-transfer/v2/network"
1111
)
1212

1313
type peernet struct {

channelmonitor/channelmonitor.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import (
88

99
"github.com/bep/debounce"
1010
logging "github.com/ipfs/go-log/v2"
11-
"github.com/libp2p/go-libp2p-core/peer"
11+
"github.com/libp2p/go-libp2p/core/peer"
1212
"golang.org/x/xerrors"
1313

14-
datatransfer "github.com/filecoin-project/go-data-transfer"
15-
"github.com/filecoin-project/go-data-transfer/channels"
14+
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
15+
"github.com/filecoin-project/go-data-transfer/v2/channels"
1616
)
1717

1818
var log = logging.Logger("dt-chanmon")

channelmonitor/channelmonitor_test.go

+13-123
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,12 @@ import (
77
"testing"
88
"time"
99

10-
"github.com/ipfs/go-cid"
11-
"github.com/ipld/go-ipld-prime"
12-
"github.com/libp2p/go-libp2p-core/peer"
10+
"github.com/libp2p/go-libp2p/core/peer"
1311
"github.com/stretchr/testify/require"
1412
"golang.org/x/xerrors"
1513

16-
datatransfer "github.com/filecoin-project/go-data-transfer"
14+
datatransfer "github.com/filecoin-project/go-data-transfer/v2"
15+
"github.com/filecoin-project/go-data-transfer/v2/testutil"
1716
)
1817

1918
var ch1 = datatransfer.ChannelID{
@@ -41,7 +40,7 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
4140
runTest := func(name string, isPush bool) {
4241
for _, tc := range testCases {
4342
t.Run(name+": "+tc.name, func(t *testing.T) {
44-
ch := &mockChannelState{chid: ch1}
43+
ch := testutil.NewMockChannelState(testutil.MockChannelStateParams{ChannelID: ch1})
4544
mockAPI := newMockMonitorAPI(ch, tc.errReconnect, tc.errSendRestartMsg)
4645

4746
triggerErrorEvent := func() {
@@ -115,7 +114,7 @@ func TestChannelMonitorAutoRestart(t *testing.T) {
115114
func TestChannelMonitorMaxConsecutiveRestarts(t *testing.T) {
116115
runTest := func(name string, isPush bool) {
117116
t.Run(name, func(t *testing.T) {
118-
ch := &mockChannelState{chid: ch1}
117+
ch := testutil.NewMockChannelState(testutil.MockChannelStateParams{ChannelID: ch1})
119118
mockAPI := newMockMonitorAPI(ch, false, false)
120119

121120
triggerErrorEvent := func() {
@@ -198,7 +197,7 @@ func awaitRestartComplete(mch *monitoredChannel) error {
198197
func TestChannelMonitorQueuedRestart(t *testing.T) {
199198
runTest := func(name string, isPush bool) {
200199
t.Run(name, func(t *testing.T) {
201-
ch := &mockChannelState{chid: ch1}
200+
ch := testutil.NewMockChannelState(testutil.MockChannelStateParams{ChannelID: ch1})
202201
mockAPI := newMockMonitorAPI(ch, false, false)
203202

204203
triggerErrorEvent := func() {
@@ -285,7 +284,7 @@ func TestChannelMonitorTimeouts(t *testing.T) {
285284
runTest := func(name string, isPush bool) {
286285
for _, tc := range testCases {
287286
t.Run(name+": "+tc.name, func(t *testing.T) {
288-
ch := &mockChannelState{chid: ch1}
287+
ch := testutil.NewMockChannelState(testutil.MockChannelStateParams{ChannelID: ch1})
289288
mockAPI := newMockMonitorAPI(ch, false, false)
290289

291290
verifyClosedAndShutdown := func(chCtx context.Context, timeout time.Duration) {
@@ -370,7 +369,7 @@ func verifyChannelShutdown(t *testing.T, shutdownCtx context.Context) {
370369
}
371370

372371
type mockMonitorAPI struct {
373-
ch *mockChannelState
372+
ch *testutil.MockChannelState
374373
connectErrors bool
375374
restartErrors bool
376375
restartMessages chan struct{}
@@ -380,7 +379,7 @@ type mockMonitorAPI struct {
380379
subscribers map[int]datatransfer.Subscriber
381380
}
382381

383-
func newMockMonitorAPI(ch *mockChannelState, errOnReconnect, errOnRestart bool) *mockMonitorAPI {
382+
func newMockMonitorAPI(ch *testutil.MockChannelState, errOnReconnect, errOnRestart bool) *mockMonitorAPI {
384383
return &mockMonitorAPI{
385384
ch: ch,
386385
connectErrors: errOnReconnect,
@@ -482,17 +481,17 @@ func (m *mockMonitorAPI) accept() {
482481
}
483482

484483
func (m *mockMonitorAPI) dataQueued(n uint64) {
485-
m.ch.queued = n
484+
m.ch.SetQueued(n)
486485
m.fireEvent(datatransfer.Event{Code: datatransfer.DataQueued}, m.ch)
487486
}
488487

489488
func (m *mockMonitorAPI) dataSent(n uint64) {
490-
m.ch.sent = n
489+
m.ch.SetSent(n)
491490
m.fireEvent(datatransfer.Event{Code: datatransfer.DataSent}, m.ch)
492491
}
493492

494493
func (m *mockMonitorAPI) dataReceived(n uint64) {
495-
m.ch.received = n
494+
m.ch.SetReceived(n)
496495
m.fireEvent(datatransfer.Event{Code: datatransfer.DataReceived}, m.ch)
497496
}
498497

@@ -501,7 +500,7 @@ func (m *mockMonitorAPI) finishTransfer() {
501500
}
502501

503502
func (m *mockMonitorAPI) completed() {
504-
m.ch.complete = true
503+
m.ch.SetComplete(true)
505504
m.fireEvent(datatransfer.Event{Code: datatransfer.Complete}, m.ch)
506505
}
507506

@@ -512,112 +511,3 @@ func (m *mockMonitorAPI) sendDataErrorEvent() {
512511
func (m *mockMonitorAPI) receiveDataErrorEvent() {
513512
m.fireEvent(datatransfer.Event{Code: datatransfer.ReceiveDataError}, m.ch)
514513
}
515-
516-
type mockChannelState struct {
517-
chid datatransfer.ChannelID
518-
queued uint64
519-
sent uint64
520-
received uint64
521-
complete bool
522-
}
523-
524-
var _ datatransfer.ChannelState = (*mockChannelState)(nil)
525-
526-
func (m *mockChannelState) Queued() uint64 {
527-
return m.queued
528-
}
529-
530-
func (m *mockChannelState) Sent() uint64 {
531-
return m.sent
532-
}
533-
534-
func (m *mockChannelState) Received() uint64 {
535-
return m.received
536-
}
537-
538-
func (m *mockChannelState) ChannelID() datatransfer.ChannelID {
539-
return m.chid
540-
}
541-
542-
func (m *mockChannelState) Status() datatransfer.Status {
543-
if m.complete {
544-
return datatransfer.Completed
545-
}
546-
return datatransfer.Ongoing
547-
}
548-
549-
func (m *mockChannelState) TransferID() datatransfer.TransferID {
550-
panic("implement me")
551-
}
552-
553-
func (m *mockChannelState) BaseCID() cid.Cid {
554-
panic("implement me")
555-
}
556-
557-
func (m *mockChannelState) Selector() ipld.Node {
558-
panic("implement me")
559-
}
560-
561-
func (m *mockChannelState) Voucher() datatransfer.Voucher {
562-
panic("implement me")
563-
}
564-
565-
func (m *mockChannelState) Sender() peer.ID {
566-
panic("implement me")
567-
}
568-
569-
func (m *mockChannelState) Recipient() peer.ID {
570-
panic("implement me")
571-
}
572-
573-
func (m *mockChannelState) TotalSize() uint64 {
574-
panic("implement me")
575-
}
576-
577-
func (m *mockChannelState) IsPull() bool {
578-
panic("implement me")
579-
}
580-
581-
func (m *mockChannelState) OtherPeer() peer.ID {
582-
panic("implement me")
583-
}
584-
585-
func (m *mockChannelState) SelfPeer() peer.ID {
586-
panic("implement me")
587-
}
588-
589-
func (m *mockChannelState) Message() string {
590-
panic("implement me")
591-
}
592-
593-
func (m *mockChannelState) Vouchers() []datatransfer.Voucher {
594-
panic("implement me")
595-
}
596-
597-
func (m *mockChannelState) VoucherResults() []datatransfer.VoucherResult {
598-
panic("implement me")
599-
}
600-
601-
func (m *mockChannelState) LastVoucher() datatransfer.Voucher {
602-
panic("implement me")
603-
}
604-
605-
func (m *mockChannelState) LastVoucherResult() datatransfer.VoucherResult {
606-
panic("implement me")
607-
}
608-
609-
func (m *mockChannelState) Stages() *datatransfer.ChannelStages {
610-
panic("implement me")
611-
}
612-
613-
func (m *mockChannelState) ReceivedCidsTotal() int64 {
614-
panic("implement me")
615-
}
616-
617-
func (m *mockChannelState) QueuedCidsTotal() int64 {
618-
panic("implement me")
619-
}
620-
621-
func (m *mockChannelState) SentCidsTotal() int64 {
622-
panic("implement me")
623-
}

0 commit comments

Comments
 (0)