From 9e14b9d8737b4968eceaaa75fa97d5f5ce88a334 Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Fri, 5 Aug 2022 11:59:13 +0530 Subject: [PATCH 1/2] intitial stream logic --- package.json | 2 + src/stream.ts | 142 +++++++++++++++++++++++++++++++++++++------------- 2 files changed, 108 insertions(+), 36 deletions(-) diff --git a/package.json b/package.json index 4f96f04..9d5f002 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,8 @@ "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.0", "abortable-iterator": "^4.0.2", + "it-merge": "^1.0.4", + "p-defer": "^4.0.0", "socket.io-client": "^4.1.2", "uuid": "^8.3.2" } diff --git a/src/stream.ts b/src/stream.ts index 90bbb4d..28ecaa8 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,70 +1,140 @@ -import { Stream } from '@libp2p/interface-connection'; +import { Stream, Direction } from '@libp2p/interface-connection'; import { StreamStat } from '@libp2p/interface-connection'; -import { logger } from '@libp2p/logger'; +// import { logger } from '@libp2p/logger'; import { Source } from 'it-stream-types'; import { Sink } from 'it-stream-types'; +import { pushable, Pushable } from 'it-pushable'; +import defer, { DeferredPromise } from 'p-defer'; +import merge from 'it-merge'; -const log = logger('libp2p:webrtc:connection'); +// const log = logger('libp2p:webrtc:connection'); + +type StreamInitOpts = { + channel: RTCDataChannel; + direction: Direction; + metadata?: Record; +}; export class WebRTCStream implements Stream { - constructor() { - this.id = 'TODO'; + /** + * Unique identifier for a stream + */ + id: string; + + /** + * Stats about this stream + */ + stat: StreamStat; + + /** + * User defined stream metadata + */ + metadata: Record; + private readonly channel: RTCDataChannel; + + source: Source = process.stdin; //TODO + sink: Sink>; + + // promises + opened: DeferredPromise = defer(); + closeWritePromise: DeferredPromise = defer(); + writeClosed: boolean = false; + readClosed: boolean = false; + closed: boolean = false; + + constructor(opts: StreamInitOpts) { + this.channel = opts.channel; + this.id = this.channel.label; this.stat = { - direction: 'outbound', + direction: opts.direction, timeline: { open: 0, close: 0, }, }; - this.metadata = {}; - this.sink = (x) => new Promise((res, rej) => {}); //TODO - if (this.dataChannel) { - log('TODO', this.dataChannel.id); - } + + this.metadata = opts.metadata ?? {}; + this.source = pushable(); + + // closable sink + this.sink = async (src: Source) => { + await this.opened.promise; + if (closed || this.writeClosed) { + return; + } + + let self = this; + let closeWriteIterable = { + async *[Symbol.asyncIterator]() { + await self.closeWritePromise.promise; + yield new Uint8Array(0); + }, + }; + + for await (const buf of merge(closeWriteIterable, src)) { + if (closed || this.writeClosed) { + break; + } + this.channel.send(buf); + } + }; + + // handle datachannel events + this.channel.onopen = (_) => this.opened.resolve(); + this.channel.onmessage = (evt) => { + if (this.readClosed) { + return; + } + (this.source as Pushable).push(evt.data); + }; + this.channel.onclose = (_) => this.close(); + this.channel.onerror = (_event) => { + this.abort(new Error('TODO')); + }; } + // duplex sink + /** * Close a stream for reading and writing */ - close(): void {} + close(): void { + if (this.closed) { + return; + } + this.closed = true; + this.closeRead(); + this.closeWrite(); + this.channel.close(); + } /** * Close a stream for reading only */ - closeRead(): void {} + closeRead(): void { + this.readClosed = true; + (this.source as Pushable).end(); + } /** * Close a stream for writing only */ - closeWrite(): void {} + closeWrite(): void { + this.writeClosed = true; + this.closeWritePromise.resolve(); + } /** * Call when a local error occurs, should close the stream for reading and writing */ - abort(err: Error): void {} + abort(err: Error): void { + this.close(); + } /** * Call when a remote error occurs, should close the stream for reading and writing */ - reset(): void {} - - /** - * Unique identifier for a stream - */ - id: string; - - /** - * Stats about this stream - */ - stat: StreamStat; - - /** - * User defined stream metadata - */ - metadata: Record; - - source: Source = process.stdin; //TODO - sink: Sink>; - - private dataChannel?: RTCDataChannel; + reset(): void { + this.close(); + } } From 8defcf82e401aa0743038d964e9d8fd7ebf3a64f Mon Sep 17 00:00:00 2001 From: Chinmay Kousik Date: Fri, 5 Aug 2022 19:37:44 +0530 Subject: [PATCH 2/2] change timeline --- src/stream.ts | 101 +++++++++++++++++++++++++++++++------------------- 1 file changed, 62 insertions(+), 39 deletions(-) diff --git a/src/stream.ts b/src/stream.ts index 28ecaa8..685bbe2 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,4 +1,4 @@ -import { Stream, Direction } from '@libp2p/interface-connection'; +import { Stream } from '@libp2p/interface-connection'; import { StreamStat } from '@libp2p/interface-connection'; // import { logger } from '@libp2p/logger'; import { Source } from 'it-stream-types'; @@ -11,8 +11,8 @@ import merge from 'it-merge'; type StreamInitOpts = { channel: RTCDataChannel; - direction: Direction; metadata?: Record; + stat: StreamStat; }; export class WebRTCStream implements Stream { @@ -32,7 +32,7 @@ export class WebRTCStream implements Stream { metadata: Record; private readonly channel: RTCDataChannel; - source: Source = process.stdin; //TODO + source: Source = pushable(); sink: Sink>; // promises @@ -45,55 +45,71 @@ export class WebRTCStream implements Stream { constructor(opts: StreamInitOpts) { this.channel = opts.channel; this.id = this.channel.label; - this.stat = { - direction: opts.direction, - timeline: { - open: 0, - close: 0, - }, - }; + + this.stat = opts.stat; + switch (this.channel.readyState) { + case 'open': + this.opened.resolve(); + break; + case 'closed': + case 'closing': + this.closed = true; + if (!this.stat.timeline.close) { + this.stat.timeline.close = new Date().getTime(); + } + this.opened.resolve(); + break; + } this.metadata = opts.metadata ?? {}; - this.source = pushable(); // closable sink - this.sink = async (src: Source) => { - await this.opened.promise; - if (closed || this.writeClosed) { - return; - } + this.sink = this._sinkFn; - let self = this; - let closeWriteIterable = { - async *[Symbol.asyncIterator]() { - await self.closeWritePromise.promise; - yield new Uint8Array(0); - }, - }; - - for await (const buf of merge(closeWriteIterable, src)) { - if (closed || this.writeClosed) { - break; - } - this.channel.send(buf); - } + // handle RTCDataChannel events + this.channel.onopen = (_evt) => { + this.stat.timeline.open = new Date().getTime(); + this.opened.resolve(); }; - // handle datachannel events - this.channel.onopen = (_) => this.opened.resolve(); - this.channel.onmessage = (evt) => { - if (this.readClosed) { + this.channel.onmessage = ({ data }) => { + if (this.readClosed || this.closed) { return; } - (this.source as Pushable).push(evt.data); + (this.source as Pushable).push(data); + }; + + this.channel.onclose = (_evt) => { + this.close(); }; - this.channel.onclose = (_) => this.close(); - this.channel.onerror = (_event) => { - this.abort(new Error('TODO')); + + this.channel.onerror = (evt) => { + let err = (evt as RTCErrorEvent).error; + this.abort(err); }; } - // duplex sink + private async _sinkFn(src: Source): Promise { + await this.opened.promise; + if (closed || this.writeClosed) { + return; + } + + let self = this; + let closeWriteIterable = { + async *[Symbol.asyncIterator]() { + await self.closeWritePromise.promise; + yield new Uint8Array(0); + }, + }; + + for await (const buf of merge(closeWriteIterable, src)) { + if (closed || this.writeClosed) { + break; + } + this.channel.send(buf); + } + } /** * Close a stream for reading and writing @@ -102,6 +118,7 @@ export class WebRTCStream implements Stream { if (this.closed) { return; } + this.stat.timeline.close = new Date().getTime(); this.closed = true; this.closeRead(); this.closeWrite(); @@ -114,6 +131,9 @@ export class WebRTCStream implements Stream { closeRead(): void { this.readClosed = true; (this.source as Pushable).end(); + if (this.readClosed && this.writeClosed) { + this.close(); + } } /** @@ -122,6 +142,9 @@ export class WebRTCStream implements Stream { closeWrite(): void { this.writeClosed = true; this.closeWritePromise.resolve(); + if (this.readClosed && this.writeClosed) { + this.close(); + } } /**