Skip to content

Commit c5f91ae

Browse files
hannahhowardrvagg
andcommitted
Feat/refactor transport protocol update part 2 (#338)
* feat(network): transport versioning and detection Support multiple transports on the libp2p protocol, via different protocol naming, and using libp2p to do protocol negotiation * Update transport/helpers/network/libp2p_impl.go Co-authored-by: Rod Vagg <rod@vagg.org> * Update transport/helpers/network/libp2p_impl.go Co-authored-by: Rod Vagg <rod@vagg.org> * fix(network): add versions check for legacy transport Co-authored-by: Rod Vagg <rod@vagg.org>
1 parent b374b08 commit c5f91ae

18 files changed

+305
-147
lines changed

impl/utils.go

-1
Original file line numberDiff line numberDiff line change
@@ -83,4 +83,3 @@ func (m *manager) cancelMessage(chid datatransfer.ChannelID) datatransfer.Messag
8383
}
8484
return message.CancelResponse(chid.ID)
8585
}
86-

itest/integration_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -1779,7 +1779,7 @@ func TestRespondingToPushGraphsyncRequests(t *testing.T) {
17791779
r := &receiver{
17801780
messageReceived: make(chan receivedMessage),
17811781
}
1782-
dtnet2.SetDelegate("graphsync", r)
1782+
dtnet2.SetDelegate(datatransfer.LegacyTransportID, []datatransfer.Version{datatransfer.LegacyTransportVersion}, r)
17831783

17841784
gsr := &fakeGraphSyncReceiver{
17851785
receivedMessages: make(chan receivedGraphSyncMessage),
@@ -1857,7 +1857,7 @@ func TestResponseHookWhenExtensionNotFound(t *testing.T) {
18571857
r := &receiver{
18581858
messageReceived: make(chan receivedMessage),
18591859
}
1860-
dtnet2.SetDelegate("graphsync", r)
1860+
dtnet2.SetDelegate(datatransfer.LegacyTransportID, []datatransfer.Version{datatransfer.LegacyTransportVersion}, r)
18611861

18621862
gsr := &fakeGraphSyncReceiver{
18631863
receivedMessages: make(chan receivedGraphSyncMessage),

message.go

+19-11
Original file line numberDiff line numberDiff line change
@@ -11,41 +11,41 @@ import (
1111
"github.com/ipld/go-ipld-prime/datamodel"
1212
)
1313

14-
type MessageVersion struct {
14+
type Version struct {
1515
Major uint64
1616
Minor uint64
1717
Patch uint64
1818
}
1919

20-
func (mv MessageVersion) String() string {
20+
func (mv Version) String() string {
2121
return fmt.Sprintf("%d.%d.%d", mv.Major, mv.Minor, mv.Patch)
2222
}
2323

2424
// MessageVersionFromString parses a string into a message version
25-
func MessageVersionFromString(versionString string) (MessageVersion, error) {
25+
func MessageVersionFromString(versionString string) (Version, error) {
2626
versions := strings.Split(versionString, ".")
2727
if len(versions) != 3 {
28-
return MessageVersion{}, errors.New("not a version string")
28+
return Version{}, errors.New("not a version string")
2929
}
3030
major, err := strconv.ParseUint(versions[0], 10, 0)
3131
if err != nil {
32-
return MessageVersion{}, errors.New("unable to parse major version")
32+
return Version{}, errors.New("unable to parse major version")
3333
}
3434
minor, err := strconv.ParseUint(versions[1], 10, 0)
3535
if err != nil {
36-
return MessageVersion{}, errors.New("unable to parse major version")
36+
return Version{}, errors.New("unable to parse major version")
3737
}
3838
patch, err := strconv.ParseUint(versions[2], 10, 0)
3939
if err != nil {
40-
return MessageVersion{}, errors.New("unable to parse major version")
40+
return Version{}, errors.New("unable to parse major version")
4141
}
42-
return MessageVersion{Major: major, Minor: minor, Patch: patch}, nil
42+
return Version{Major: major, Minor: minor, Patch: patch}, nil
4343
}
4444

4545
var (
4646
// DataTransfer1_2 is the identifier for the current
4747
// supported version of data-transfer
48-
DataTransfer1_2 MessageVersion = MessageVersion{1, 2, 0}
48+
DataTransfer1_2 Version = Version{1, 2, 0}
4949
)
5050

5151
// Message is a message for the data transfer protocol
@@ -60,8 +60,16 @@ type Message interface {
6060
TransferID() TransferID
6161
ToNet(w io.Writer) error
6262
ToIPLD() datamodel.Node
63-
MessageForVersion(targetProtocol MessageVersion) (newMsg Message, err error)
64-
WrappedForTransport(transportID TransportID) Message
63+
MessageForVersion(targetProtocol Version) (newMsg Message, err error)
64+
Version() Version
65+
WrappedForTransport(transportID TransportID, transportVersion Version) TransportedMessage
66+
}
67+
68+
// TransportedMessage is a message that can also report how it was transported
69+
type TransportedMessage interface {
70+
Message
71+
TransportID() TransportID
72+
TransportVersion() Version
6573
}
6674

6775
// Request is a response message for the data transfer protocol

message/message1_1prime/message.go

+23-4
Original file line numberDiff line numberDiff line change
@@ -212,15 +212,34 @@ func fromMessage(tresp *TransferMessage1_1) (datatransfer.Message, error) {
212212
return tresp.Response, nil
213213
}
214214

215+
func fromWrappedMessage(wtresp *WrappedTransferMessage1_1) (datatransfer.TransportedMessage, error) {
216+
tresp := wtresp.Message
217+
if (tresp.IsRequest && tresp.Request == nil) || (!tresp.IsRequest && tresp.Response == nil) {
218+
return nil, xerrors.Errorf("invalid/malformed message")
219+
}
220+
221+
if tresp.IsRequest {
222+
return &WrappedTransferRequest1_1{
223+
tresp.Request,
224+
wtresp.TransportVersion,
225+
wtresp.TransportID,
226+
}, nil
227+
}
228+
return &WrappedTransferResponse1_1{
229+
tresp.Response,
230+
wtresp.TransportID,
231+
wtresp.TransportVersion,
232+
}, nil
233+
}
234+
215235
// FromNetWrraped can read a network stream to deserialize a message + transport ID
216-
func FromNetWrapped(r io.Reader) (datatransfer.TransportID, datatransfer.Message, error) {
236+
func FromNetWrapped(r io.Reader) (datatransfer.TransportedMessage, error) {
217237
tm, err := bindnodeRegistry.TypeFromReader(r, &WrappedTransferMessage1_1{}, dagcbor.Decode)
218238
if err != nil {
219-
return "", nil, err
239+
return nil, err
220240
}
221241
wtresp := tm.(*WrappedTransferMessage1_1)
222-
msg, err := fromMessage(&wtresp.Message)
223-
return datatransfer.TransportID(wtresp.TransportID), msg, err
242+
return fromWrappedMessage(wtresp)
224243
}
225244

226245
// FromNet can read a network stream to deserialize a GraphSyncMessage

message/message1_1prime/message_test.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,7 @@ func TestToNetFromNetEquivalency(t *testing.T) {
510510
})
511511
t.Run("round-trip with wrapping", func(t *testing.T) {
512512
transportID := datatransfer.TransportID("applesauce")
513+
transportVersion := datatransfer.Version{Major: 1, Minor: 5, Patch: 0}
513514
baseCid := testutil.GenerateCids(1)[0]
514515
selector := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any).Matcher().Node()
515516
isPull := false
@@ -519,15 +520,16 @@ func TestToNetFromNetEquivalency(t *testing.T) {
519520
voucherResult := testutil.NewTestTypedVoucher()
520521
request, err := message1_1.NewRequest(id, false, isPull, &voucher, baseCid, selector)
521522
require.NoError(t, err)
522-
wrequest := request.WrappedForTransport(transportID)
523+
wrequest := request.WrappedForTransport(transportID, transportVersion)
523524
buf := new(bytes.Buffer)
524525
err = wrequest.ToNet(buf)
525526
require.NoError(t, err)
526527
require.Greater(t, buf.Len(), 0)
527-
receivedTransportID, deserialized, err := message1_1.FromNetWrapped(buf)
528+
deserialized, err := message1_1.FromNetWrapped(buf)
528529
require.NoError(t, err)
529530

530-
require.Equal(t, transportID, receivedTransportID)
531+
require.Equal(t, transportID, deserialized.TransportID())
532+
require.Equal(t, transportVersion, deserialized.TransportVersion())
531533
deserializedRequest, ok := deserialized.(datatransfer.Request)
532534
require.True(t, ok)
533535

@@ -541,12 +543,13 @@ func TestToNetFromNetEquivalency(t *testing.T) {
541543

542544
response, err := message1_1.NewResponse(id, accepted, false, &voucherResult)
543545
require.NoError(t, err)
544-
wresponse := response.WrappedForTransport(transportID)
546+
wresponse := response.WrappedForTransport(transportID, transportVersion)
545547
err = wresponse.ToNet(buf)
546548
require.NoError(t, err)
547-
receivedTransportID, deserialized, err = message1_1.FromNetWrapped(buf)
549+
deserialized, err = message1_1.FromNetWrapped(buf)
548550
require.NoError(t, err)
549-
require.Equal(t, transportID, receivedTransportID)
551+
require.Equal(t, transportID, deserialized.TransportID())
552+
require.Equal(t, transportVersion, deserialized.TransportVersion())
550553

551554
deserializedResponse, ok := deserialized.(datatransfer.Response)
552555
require.True(t, ok)
@@ -559,12 +562,13 @@ func TestToNetFromNetEquivalency(t *testing.T) {
559562
testutil.AssertEqualTestVoucherResult(t, response, deserializedResponse)
560563

561564
request = message1_1.CancelRequest(id)
562-
wrequest = request.WrappedForTransport(transportID)
565+
wrequest = request.WrappedForTransport(transportID, transportVersion)
563566
err = wrequest.ToNet(buf)
564567
require.NoError(t, err)
565-
receivedTransportID, deserialized, err = message1_1.FromNetWrapped(buf)
568+
deserialized, err = message1_1.FromNetWrapped(buf)
566569
require.NoError(t, err)
567-
require.Equal(t, transportID, receivedTransportID)
570+
require.Equal(t, transportID, deserialized.TransportID())
571+
require.Equal(t, transportVersion, deserialized.TransportVersion())
568572

569573
deserializedRequest, ok = deserialized.(datatransfer.Request)
570574
require.True(t, ok)

message/message1_1prime/schema.ipldsch

+7
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,14 @@ type TransferMessage1_1 struct {
3737
Response nullable TransferResponse
3838
}
3939

40+
type Version struct {
41+
Major Int
42+
Minor Int
43+
Patch Int
44+
} representation tuple
45+
4046
type WrappedTransferMessage1_1 struct {
4147
TransportID TransportID (rename "ID")
48+
TransportVersion Version (rename "TV")
4249
Message TransferMessage1_1 (rename "Msg")
4350
}

message/message1_1prime/transfer_message.go

+3-2
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@ func init() {
5959
}
6060

6161
type WrappedTransferMessage1_1 struct {
62-
TransportID string
63-
Message TransferMessage1_1
62+
TransportID string
63+
TransportVersion datatransfer.Version
64+
Message TransferMessage1_1
6465
}
6566

6667
func (wtm *WrappedTransferMessage1_1) BindnodeSchema() string {

message/message1_1prime/transfer_request.go

+25-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ type TransferRequest1_1 struct {
2929
RestartChannel datatransfer.ChannelID
3030
}
3131

32-
func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.MessageVersion) (datatransfer.Message, error) {
32+
func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.Version) (datatransfer.Message, error) {
3333
switch version {
3434
case datatransfer.DataTransfer1_2:
3535
return trq, nil
@@ -38,8 +38,16 @@ func (trq *TransferRequest1_1) MessageForVersion(version datatransfer.MessageVer
3838
}
3939
}
4040

41-
func (trq *TransferRequest1_1) WrappedForTransport(transportID datatransfer.TransportID) datatransfer.Message {
42-
return &WrappedTransferRequest1_1{trq, string(transportID)}
41+
func (trq *TransferRequest1_1) Version() datatransfer.Version {
42+
return datatransfer.DataTransfer1_2
43+
}
44+
45+
func (trq *TransferRequest1_1) WrappedForTransport(transportID datatransfer.TransportID, transportVersion datatransfer.Version) datatransfer.TransportedMessage {
46+
return &WrappedTransferRequest1_1{
47+
TransferRequest1_1: trq,
48+
transportID: string(transportID),
49+
transportVersion: transportVersion,
50+
}
4351
}
4452

4553
// IsRequest always returns true in this case because this is a transfer request
@@ -164,15 +172,25 @@ func (trq *TransferRequest1_1) ToNet(w io.Writer) error {
164172
// transport id
165173
type WrappedTransferRequest1_1 struct {
166174
*TransferRequest1_1
167-
TransportID string
175+
transportVersion datatransfer.Version
176+
transportID string
177+
}
178+
179+
func (trq *WrappedTransferRequest1_1) TransportID() datatransfer.TransportID {
180+
return datatransfer.TransportID(trq.transportID)
181+
}
182+
183+
func (trq *WrappedTransferRequest1_1) TransportVersion() datatransfer.Version {
184+
return trq.transportVersion
168185
}
169186

170-
func (trsp *WrappedTransferRequest1_1) toIPLD() schema.TypedNode {
187+
func (trq *WrappedTransferRequest1_1) toIPLD() schema.TypedNode {
171188
msg := WrappedTransferMessage1_1{
172-
TransportID: trsp.TransportID,
189+
TransportID: trq.transportID,
190+
TransportVersion: trq.transportVersion,
173191
Message: TransferMessage1_1{
174192
IsRequest: true,
175-
Request: trsp.TransferRequest1_1,
193+
Request: trq.TransferRequest1_1,
176194
Response: nil,
177195
},
178196
}

message/message1_1prime/transfer_request_test.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,12 @@ func TestRequestMessageForVersion(t *testing.T) {
4040
require.Equal(t, selector, n)
4141
require.Equal(t, testutil.TestVoucherType, req.VoucherType())
4242

43-
wrappedOut12 := out12.WrappedForTransport(datatransfer.LegacyTransportID)
44-
require.Equal(t, &message1_1.WrappedTransferRequest1_1{
45-
TransferRequest1_1: request.(*message1_1.TransferRequest1_1),
46-
TransportID: string(datatransfer.LegacyTransportID),
47-
}, wrappedOut12)
43+
wrappedOut12 := out12.WrappedForTransport(datatransfer.LegacyTransportID, datatransfer.LegacyTransportVersion)
44+
require.Equal(t, datatransfer.LegacyTransportID, wrappedOut12.TransportID())
45+
require.Equal(t, datatransfer.LegacyTransportVersion, wrappedOut12.TransportVersion())
4846

4947
// random protocol should fail
50-
_, err = request.MessageForVersion(datatransfer.MessageVersion{
48+
_, err = request.MessageForVersion(datatransfer.Version{
5149
Major: rand.Uint64(),
5250
Minor: rand.Uint64(),
5351
Patch: rand.Uint64(),

message/message1_1prime/transfer_response.go

+22-5
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (trsp *TransferResponse1_1) EmptyVoucherResult() bool {
8787
return trsp.VoucherTypeIdentifier == datatransfer.EmptyTypeIdentifier
8888
}
8989

90-
func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.MessageVersion) (datatransfer.Message, error) {
90+
func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.Version) (datatransfer.Message, error) {
9191
switch version {
9292
case datatransfer.DataTransfer1_2:
9393
return trsp, nil
@@ -96,8 +96,16 @@ func (trsp *TransferResponse1_1) MessageForVersion(version datatransfer.MessageV
9696
}
9797
}
9898

99-
func (trsp *TransferResponse1_1) WrappedForTransport(transportID datatransfer.TransportID) datatransfer.Message {
100-
return &WrappedTransferResponse1_1{trsp, string(transportID)}
99+
func (trsp *TransferResponse1_1) Version() datatransfer.Version {
100+
return datatransfer.DataTransfer1_2
101+
}
102+
103+
func (trsp *TransferResponse1_1) WrappedForTransport(transportID datatransfer.TransportID, transportVersion datatransfer.Version) datatransfer.TransportedMessage {
104+
return &WrappedTransferResponse1_1{
105+
TransferResponse1_1: trsp,
106+
transportID: string(transportID),
107+
transportVersion: transportVersion,
108+
}
101109
}
102110
func (trsp *TransferResponse1_1) toIPLD() schema.TypedNode {
103111
msg := TransferMessage1_1{
@@ -121,12 +129,21 @@ func (trsp *TransferResponse1_1) ToNet(w io.Writer) error {
121129
// transport id
122130
type WrappedTransferResponse1_1 struct {
123131
*TransferResponse1_1
124-
TransportID string
132+
transportID string
133+
transportVersion datatransfer.Version
134+
}
135+
136+
func (trsp *WrappedTransferResponse1_1) TransportID() datatransfer.TransportID {
137+
return datatransfer.TransportID(trsp.transportID)
138+
}
139+
func (trsp *WrappedTransferResponse1_1) TransportVersion() datatransfer.Version {
140+
return trsp.transportVersion
125141
}
126142

127143
func (trsp *WrappedTransferResponse1_1) toIPLD() schema.TypedNode {
128144
msg := WrappedTransferMessage1_1{
129-
TransportID: trsp.TransportID,
145+
TransportID: trsp.transportID,
146+
TransportVersion: trsp.transportVersion,
130147
Message: TransferMessage1_1{
131148
IsRequest: false,
132149
Request: nil,

message/message1_1prime/transfer_response_test.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,12 @@ func TestResponseMessageForVersion(t *testing.T) {
2828
require.Equal(t, testutil.TestVoucherType, resp.VoucherResultType())
2929
require.True(t, resp.IsValidationResult())
3030

31-
wrappedOut := out.WrappedForTransport(datatransfer.LegacyTransportID)
32-
require.Equal(t, &message1_1.WrappedTransferResponse1_1{
33-
TransferResponse1_1: response.(*message1_1.TransferResponse1_1),
34-
TransportID: string(datatransfer.LegacyTransportID),
35-
}, wrappedOut)
31+
wrappedOut := out.WrappedForTransport(datatransfer.LegacyTransportID, datatransfer.LegacyTransportVersion)
32+
require.Equal(t, datatransfer.LegacyTransportID, wrappedOut.TransportID())
33+
require.Equal(t, datatransfer.LegacyTransportVersion, wrappedOut.TransportVersion())
3634

3735
// random protocol should fail
38-
_, err = response.MessageForVersion(datatransfer.MessageVersion{
36+
_, err = response.MessageForVersion(datatransfer.Version{
3937
Major: rand.Uint64(),
4038
Minor: rand.Uint64(),
4139
Patch: rand.Uint64(),

testutil/faketransport.go

+5
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,11 @@ func (ft *FakeTransport) ID() datatransfer.TransportID {
5959
return "fake"
6060
}
6161

62+
// Versions indicates what versions of this transport are supported
63+
func (ft *FakeTransport) Versions() []datatransfer.Version {
64+
return []datatransfer.Version{{Major: 1, Minor: 1, Patch: 0}}
65+
}
66+
6267
// Capabilities tells datatransfer what kinds of capabilities this transport supports
6368
func (ft *FakeTransport) Capabilities() datatransfer.TransportCapabilities {
6469
return datatransfer.TransportCapabilities{

0 commit comments

Comments
 (0)