Skip to content
This repository was archived by the owner on Jun 19, 2023. It is now read-only.

Interop test between Go and Js implementations #23

Merged
merged 13 commits into from
Sep 30, 2022
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ bower_components
Thumbs.db

# Ignore built ts files
dist/**/*
dist/

# ignore yarn.lock
yarn.lock
Expand Down
10 changes: 9 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
"othergen": "./node_modules/.bin/proto-loader-gen-types --longs=String --enums=String --defaults --oneofs --grpcLib=@grpc/grpc-js --outDir=proto_ts/ src/*.proto",
"build": "aegir build",
"test": "aegir test --target browser",
"test:interop": "run-p --race start-ext-server wait-then-test",
"start-ext-server": "rm -vf dist/test/server-multiaddr.js ; cd ../go-libp2p/ && go run examples/webrtc/main.go ../js-libp2p-webrtc/dist/test/ ",
"wait-for-server": "wait-on --delay 1000 --timeout 10000 dist/test/server-multiaddr.js",
"wait-then-test": "run-s wait-for-server test",
"lint": "aegir lint",
"lint:fix": "aegir lint --fix",
"clean": "aegir clean",
Expand All @@ -35,14 +39,18 @@
},
"devDependencies": {
"@libp2p/interface-mocks": "^4.0.1",
"@libp2p/peer-id-factory": "^1.0.18",
"@multiformats/multiaddr": "^10.4.1",
"@types/uuid": "^8.3.4",
"@typescript-eslint/parser": "^5.32.0",
"aegir": "^37.4.6",
"it-all": "^1.0.6",
"it-first": "^1.0.7",
"npm-run-all": "^4.1.5",
"prettier": "^2.7.1",
"typescript": "^4.7.4",
"uint8arrays": "^3.1.0"
"uint8arrays": "^3.1.0",
"wait-on": "^6.0.1"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^8.0.0",
Expand Down
2 changes: 1 addition & 1 deletion proto_ts/message.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// @generated by protobuf-ts 2.8.0
// @generated by protobuf-ts 2.8.1
// @generated from protobuf file "message.proto" (package "webrtc.pb", syntax proto2)
// tslint:disable
import type { BinaryWriteOptions } from "@protobuf-ts/runtime";
Expand Down
82 changes: 49 additions & 33 deletions src/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { WebRTCStream } from './stream';
import { select as msselect, handle as mshandle } from '@libp2p/multistream-select';
import { Duplex } from 'it-stream-types';
import { Uint8ArrayList } from 'uint8arraylist';
import { dataChannelError, operationAborted, overStreamLimit } from './error';
import { connectionClosedError, dataChannelError, operationAborted, overStreamLimit } from './error';

const log = logger('libp2p:webrtc:connection');

Expand Down Expand Up @@ -40,6 +40,7 @@ export class WebRTCConnection implements ic.Connection {
remotePeer: PeerId;
tags: string[] = [];
components: Components;
closed: boolean = false;

private _streams: Map<string, ic.Stream> = new Map();
private peerConnection: RTCPeerConnection;
Expand All @@ -60,14 +61,22 @@ export class WebRTCConnection implements ic.Connection {
},
};
this.handleIncomingStreams();
this.peerConnection.onconnectionstatechange = (_) => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just put const closed = ['closed', 'failed', 'disconnected'].includes(this.peer.connectionState);

Avoids the whole switch statement etc.

switch(this.peerConnection.connectionState) {
case 'closed': // fallthrough
case 'failed': // fallthrough
case 'disconnected': // fallthrough
log.trace(`peerconnection moved to state: ${this.peerConnection.connectionState}`)
closed = true;
this.streams.forEach((stream) => stream.abort(connectionClosedError(this.peerConnection.connectionState, 'closing stream')))
}
}
}

private handleIncomingStreams() {
let metrics = this.components.getMetrics();
this.peerConnection.ondatachannel = async ({ channel }) => {
const logPrefix = `[stream:${channel.label}][inbound]`;
log.trace(`incoming stream - ${channel.label}`);
let [openPromise, abortPromise] = [defer(), defer()];
const [openPromise, abortPromise] = [defer(), defer()];
let controller = new TimeoutController(OPEN_STREAM_TIMEOUT);
controller.signal.onabort = () => abortPromise.resolve();
channel.onopen = () => openPromise.resolve();
Expand All @@ -77,7 +86,7 @@ export class WebRTCConnection implements ic.Connection {
throw operationAborted('prior to a new stream incoming.', controller.signal.reason);
}

let rawStream = new WebRTCStream({
const rawStream = new WebRTCStream({
channel,
stat: {
direction: 'inbound',
Expand All @@ -86,26 +95,30 @@ export class WebRTCConnection implements ic.Connection {
},
},
});
let registrar = this.components.getRegistrar();
let protocols = registrar.getProtocols();
const registrar = this.components.getRegistrar();
const protocols = registrar.getProtocols();

log.trace(`${logPrefix} supported protocols - ${protocols}`);
log.trace(`supported protocols - ${protocols}`);

let { stream, protocol } = await mshandle(rawStream, protocols, { signal: controller.signal });
if (metrics) {
metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer });
}
try {
const { stream, protocol } = await mshandle(rawStream, protocols, { signal: controller.signal });
if (metrics) {
metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer });
}

log.trace(`${logPrefix} handled protocol - ${protocol}`);
log.trace(`handled protocol - ${protocol}`);

rawStream.stat.protocol = protocol;
let result = this.wrapMsStream(rawStream, stream);
rawStream.stat.protocol = protocol;
const result = this.wrapMsStream(rawStream, stream);

this.addStream(result);
this.addStream(result);

// handle stream
let { handler } = registrar.getHandler(protocol);
handler({ connection: this, stream: result });
// handle stream
const { handler } = registrar.getHandler(protocol);
handler({ connection: this, stream: result });
} catch (err) {
log.error('stream error: ', rawStream.id, rawStream.stat.direction);
}
};
}

Expand All @@ -132,9 +145,9 @@ export class WebRTCConnection implements ic.Connection {
}

private findStreamLimit(protocol: string, direction: ic.Direction): number {
let registrar = this.components.getRegistrar();
const registrar = this.components.getRegistrar();
try {
let handler = registrar.getHandler(protocol);
const handler = registrar.getHandler(protocol);
return direction === 'inbound' ? handler.options.maxInboundStreams || DEFAULT_MAX_INBOUND_STREAMS : handler.options.maxOutboundStreams || DEFAULT_MAX_OUTBOUND_STREAMS;
} catch (err) {}
return direction === 'inbound' ? DEFAULT_MAX_INBOUND_STREAMS : DEFAULT_MAX_OUTBOUND_STREAMS;
Expand All @@ -145,12 +158,15 @@ export class WebRTCConnection implements ic.Connection {
}

async newStream(protocols: string | string[], options: AbortOptions = {}): Promise<ic.Stream> {
let label = genUuid().slice(0, 8);
let openPromise = defer();
let abortedPromise = defer();
let controller: TimeoutController | undefined;
let metrics = this.components.getMetrics();
if (this.closed) {
throw connectionClosedError(this.peerConnection.connectionState, 'cannot open new stream')
}
const label = genUuid().slice(0, 8);
const [openPromise, abortedPromise] = [defer(), defer()];
const metrics = this.components.getMetrics();

let openError: Error | undefined;
let controller: TimeoutController | undefined;

log.trace(`opening new stream with protocols: ${protocols}`);

Expand All @@ -168,7 +184,7 @@ export class WebRTCConnection implements ic.Connection {
};

log.trace(`[stream: ${label}] peerconnection state: ${this.peerConnection.connectionState}`);
let channel = this.peerConnection.createDataChannel(label);
const channel = this.peerConnection.createDataChannel(label);
channel.onopen = (_evt) => {
log.trace(`[stream: ${label}] data channel opened`);
openPromise.resolve();
Expand All @@ -188,7 +204,7 @@ export class WebRTCConnection implements ic.Connection {
throw openError;
}

let rawStream = new WebRTCStream({
const rawStream = new WebRTCStream({
channel,
stat: {
direction: 'outbound',
Expand All @@ -198,11 +214,11 @@ export class WebRTCConnection implements ic.Connection {
},
});

let { stream, protocol } = await msselect(rawStream, protocols, { signal: options.signal });
const { stream, protocol } = await msselect(rawStream, protocols, { signal: options.signal });
log.trace(`[stream ${label}] select protocol - ${protocol}`);
// check if stream is within limit after protocol has been negotiated
rawStream.stat.protocol = protocol;
let result = this.wrapMsStream(rawStream, stream);
const result = this.wrapMsStream(rawStream, stream);
// check if stream can be accomodated
if (metrics) {
metrics.trackStream({ stream, protocol, remotePeer: this.remotePeer });
Expand All @@ -213,10 +229,10 @@ export class WebRTCConnection implements ic.Connection {
}

addStream(stream: ic.Stream): void {
let protocol = stream.stat.protocol!;
let direction = stream.stat.direction;
const protocol = stream.stat.protocol!;
const direction = stream.stat.direction;
if (this.countStream(protocol, direction) === this.findStreamLimit(protocol, direction)) {
let err = overStreamLimit(direction, protocol);
const err = overStreamLimit(direction, protocol);
log(err.message);
stream.abort(err);
throw err;
Expand Down
14 changes: 13 additions & 1 deletion src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ export enum codes {
ERR_NOT_IMPLEMENTED = 'ERR_NOT_IMPLEMENTED',
ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS',
ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS = 'ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS',
ERR_CONNECTION_CLOSED = 'ERR_CONNECTION_CLOSED',
}

export class ConnectionClosedError extends WebRTCTransportError {
constructor(state: RTCPeerConnectionState, msg: string) {
super(`peerconnection moved to state: ${state}:` + msg);
this.name = 'WebRTC/ConnectionClosed';
}
}

export function connectionClosedError(state: RTCPeerConnectionState, msg: string) {
return createError(new ConnectionClosedError(state, msg), codes.ERR_CONNECTION_CLOSED)
}

export class InvalidArgumentError extends WebRTCTransportError {
Expand Down Expand Up @@ -83,7 +95,7 @@ export class DataChannelError extends WebRTCTransportError {
}

export function dataChannelError(streamLabel: string, msg: string) {
return createError(new OperationAbortedError(streamLabel, msg), codes.ERR_DATA_CHANNEL);
return createError(new DataChannelError(streamLabel, msg), codes.ERR_DATA_CHANNEL);
}

export class StreamingLimitationError extends WebRTCTransportError {
Expand Down
2 changes: 1 addition & 1 deletion src/message.proto
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@ message Message {
optional Flag flag = 1;

optional bytes message = 2;
}
}
3 changes: 1 addition & 2 deletions src/options.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,4 @@ export interface WebRTCListenerOptions extends CreateListenerOptions {
// channelOptions?: WebRTCReceiverInit
}

export interface WebRTCDialOptions extends DialOptions {
}
export interface WebRTCDialOptions extends DialOptions {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

our linting rules must be pretty relaxed as normally you would get an "empty interface" complaint here, no?

29 changes: 14 additions & 15 deletions src/sdp.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { bases } from 'multiformats/basics';

const log = logger('libp2p:webrtc:sdp');

const mbdecoder = (function () {
export const mbdecoder = (function () {
const decoders = Object.values(bases).map((b) => b.decoder);
let acc = decoders[0].or(decoders[1]);
decoders.slice(2).forEach((d) => (acc = acc.or(d)));
Expand All @@ -16,7 +16,7 @@ const mbdecoder = (function () {
const CERTHASH_CODE: number = 466;

function ipv(ma: Multiaddr): string {
for (let proto of ma.protoNames()) {
for (const proto of ma.protoNames()) {
if (proto.startsWith('ip')) {
return proto.toUpperCase();
}
Expand All @@ -32,20 +32,20 @@ function port(ma: Multiaddr): number {
}

export function certhash(ma: Multiaddr): string {
let tups = ma.stringTuples();
let certhash_value = tups.filter((tup) => tup[0] == CERTHASH_CODE).map((tup) => tup[1])[0];
const tups = ma.stringTuples();
const certhash_value = tups.filter((tup) => tup[0] == CERTHASH_CODE).map((tup) => tup[1])[0];
if (certhash_value) {
return certhash_value;
} else {
throw inappropriateMultiaddr("Couldn't find a certhash component of multiaddr:" + ma.toString());
}
}

function certhashToFingerprint(ma: Multiaddr): string {
let certhash_value = certhash(ma);
export function certhashToFingerprint(ma: Multiaddr): string[] {
const certhash_value = certhash(ma);
// certhash_value is a multibase encoded multihash encoded string
let mbdecoded = mbdecoder.decode(certhash_value);
let mhdecoded = multihashes.decode(mbdecoded);
const mbdecoded = mbdecoder.decode(certhash_value);
const mhdecoded = multihashes.decode(mbdecoded);
let prefix = '';
switch (mhdecoded.name) {
case 'md5':
Expand All @@ -61,17 +61,17 @@ function certhashToFingerprint(ma: Multiaddr): string {
throw unsupportedHashAlgorithm(mhdecoded.name);
}

let fp = mhdecoded.digest.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), '');
fp = fp.match(/.{1,2}/g)!.join(':');
const fp = mhdecoded.digest.reduce((str, byte) => str + byte.toString(16).padStart(2, '0'), '');
const fpSdp = fp.match(/.{1,2}/g)!.join(':');

return `${prefix} ${fp}`;
return [`${prefix.toUpperCase()} ${fpSdp.toUpperCase()}`, fp];
}

function ma2sdp(ma: Multiaddr, ufrag: string): string {
const IP = ip(ma);
const IPVERSION = ipv(ma);
const PORT = port(ma);
const CERTFP = certhashToFingerprint(ma);
const [CERTFP, _] = certhashToFingerprint(ma);
return `v=0
o=- 0 0 IN ${IPVERSION} ${IP}
s=-
Expand All @@ -80,14 +80,13 @@ t=0 0
a=ice-lite
m=application ${PORT} UDP/DTLS/SCTP webrtc-datachannel
a=mid:0
a=setup:active
a=ice-options:ice2
a=setup:passive
a=ice-ufrag:${ufrag}
a=ice-pwd:${ufrag}
a=fingerprint:${CERTFP}
a=sctp-port:5000
a=max-message-size:100000
a=candidate:1 1 UDP 1 ${IP} ${PORT} typ host`;
a=candidate:1467250027 1 UDP 1467250027 ${IP} ${PORT} typ host\r\n`;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do these numbers come from, might be nice to add a line of comment about them?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if a comment explaining is useful, but here is the grammar for the candidate line: https://www.rfc-editor.org/rfc/rfc8839.html#name-candidate-attribute .

The foundation (first number) is a way to encode ice candidates. The RFC specifies what the foundation should do here: https://www.rfc-editor.org/rfc/rfc8445.html#section-5.1.1.3 , but in practice it is not used much. This can be any number for our purposes.

The second number is the stream component. 1 is RTP and 2 is RTCP. Since we multiplex RTP and RTCP over the same component, we use 1.

The third number (after UDP) is the candidate priority. Since we have only 1 candidate, this does not matter in this SDP.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I would do is make constants for this with very explicit declarative names, and you can link to the relevant paragraph in an RFC for each constant. I think that's enough documentation. Gives all the info with little effort.

}

export function fromMultiAddr(ma: Multiaddr, ufrag: string): RTCSessionDescriptionInit {
Expand Down
Loading