diff --git a/package.json b/package.json index 4f96f04..9d5f002 100644 --- a/package.json +++ b/package.json @@ -24,6 +24,8 @@ "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.0", "abortable-iterator": "^4.0.2", + "it-merge": "^1.0.4", + "p-defer": "^4.0.0", "socket.io-client": "^4.1.2", "uuid": "^8.3.2" } diff --git a/src/stream.ts b/src/stream.ts index 90bbb4d..685bbe2 100644 --- a/src/stream.ts +++ b/src/stream.ts @@ -1,70 +1,163 @@ import { Stream } from '@libp2p/interface-connection'; import { StreamStat } from '@libp2p/interface-connection'; -import { logger } from '@libp2p/logger'; +// import { logger } from '@libp2p/logger'; import { Source } from 'it-stream-types'; import { Sink } from 'it-stream-types'; +import { pushable, Pushable } from 'it-pushable'; +import defer, { DeferredPromise } from 'p-defer'; +import merge from 'it-merge'; -const log = logger('libp2p:webrtc:connection'); +// const log = logger('libp2p:webrtc:connection'); + +type StreamInitOpts = { + channel: RTCDataChannel; + metadata?: Record; + stat: StreamStat; +}; export class WebRTCStream implements Stream { - constructor() { - this.id = 'TODO'; - this.stat = { - direction: 'outbound', - timeline: { - open: 0, - close: 0, + /** + * Unique identifier for a stream + */ + id: string; + + /** + * Stats about this stream + */ + stat: StreamStat; + + /** + * User defined stream metadata + */ + metadata: Record; + private readonly channel: RTCDataChannel; + + source: Source = pushable(); + sink: Sink>; + + // promises + opened: DeferredPromise = defer(); + closeWritePromise: DeferredPromise = defer(); + writeClosed: boolean = false; + readClosed: boolean = false; + closed: boolean = false; + + constructor(opts: StreamInitOpts) { + this.channel = opts.channel; + this.id = this.channel.label; + + this.stat = opts.stat; + switch (this.channel.readyState) { + case 'open': + this.opened.resolve(); + break; + case 'closed': + case 'closing': + this.closed = true; + if (!this.stat.timeline.close) { + this.stat.timeline.close = new Date().getTime(); + } + this.opened.resolve(); + break; + } + + this.metadata = opts.metadata ?? {}; + + // closable sink + this.sink = this._sinkFn; + + // handle RTCDataChannel events + this.channel.onopen = (_evt) => { + this.stat.timeline.open = new Date().getTime(); + this.opened.resolve(); + }; + + this.channel.onmessage = ({ data }) => { + if (this.readClosed || this.closed) { + return; + } + (this.source as Pushable).push(data); + }; + + this.channel.onclose = (_evt) => { + this.close(); + }; + + this.channel.onerror = (evt) => { + let err = (evt as RTCErrorEvent).error; + this.abort(err); + }; + } + + private async _sinkFn(src: Source): Promise { + await this.opened.promise; + if (closed || this.writeClosed) { + return; + } + + let self = this; + let closeWriteIterable = { + async *[Symbol.asyncIterator]() { + await self.closeWritePromise.promise; + yield new Uint8Array(0); }, }; - this.metadata = {}; - this.sink = (x) => new Promise((res, rej) => {}); //TODO - if (this.dataChannel) { - log('TODO', this.dataChannel.id); + + for await (const buf of merge(closeWriteIterable, src)) { + if (closed || this.writeClosed) { + break; + } + this.channel.send(buf); } } /** * Close a stream for reading and writing */ - close(): void {} + close(): void { + if (this.closed) { + return; + } + this.stat.timeline.close = new Date().getTime(); + this.closed = true; + this.closeRead(); + this.closeWrite(); + this.channel.close(); + } /** * Close a stream for reading only */ - closeRead(): void {} + closeRead(): void { + this.readClosed = true; + (this.source as Pushable).end(); + if (this.readClosed && this.writeClosed) { + this.close(); + } + } /** * Close a stream for writing only */ - closeWrite(): void {} + closeWrite(): void { + this.writeClosed = true; + this.closeWritePromise.resolve(); + if (this.readClosed && this.writeClosed) { + this.close(); + } + } /** * Call when a local error occurs, should close the stream for reading and writing */ - abort(err: Error): void {} + abort(err: Error): void { + this.close(); + } /** * Call when a remote error occurs, should close the stream for reading and writing */ - reset(): void {} - - /** - * Unique identifier for a stream - */ - id: string; - - /** - * Stats about this stream - */ - stat: StreamStat; - - /** - * User defined stream metadata - */ - metadata: Record; - - source: Source = process.stdin; //TODO - sink: Sink>; - - private dataChannel?: RTCDataChannel; + reset(): void { + this.close(); + } }