Skip to content

Commit

Permalink
feat(Worker Pool): Creates new fromWorkerPool method to automatically
Browse files Browse the repository at this point in the history
 manage pools of workers for parallel processing
  • Loading branch information
zak-cloudnc committed Aug 4, 2019
1 parent b090625 commit f0a274a
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 42 deletions.
70 changes: 70 additions & 0 deletions projects/observable-webworker/src/lib/from-worker-pool.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { EMPTY, Observable, of, Subject } from 'rxjs';
import { finalize, mergeMap } from 'rxjs/operators';
import { fromWorker } from './from-worker';

export function fromWorkerPool<I, O>(
workerConstructor: (index: number) => Worker,
workUnitIterator: IterableIterator<I> | Array<I>,
selectTransferables?: (input: I) => Transferable[],
workerCount: number = navigator.hardwareConcurrency - 1,
): Observable<O> {

const iterator = Array.isArray(workUnitIterator) ? workUnitIterator[Symbol.iterator]() : workUnitIterator;

return new Observable<O>(resultObserver => {

const idleWorker$$: Subject<Worker> = new Subject();

let completed = 0;
let sent = 0;
let finished = false;

const workers: Worker[] = Array.from({ length: workerCount }).map((_, i) => workerConstructor(i));

const processor$ = idleWorker$$.pipe(
mergeMap(
(worker): Observable<O> => {

const next = iterator.next();

if (next.done) {
idleWorker$$.complete();
finished = true;
return EMPTY;
}

sent++;
const unitWork: I = next.value;

return fromWorker<I, O>(() => worker, of(unitWork), selectTransferables, { terminateOnComplete: false }).pipe(
finalize(() => {
completed ++;

if (!idleWorker$$.closed) {
idleWorker$$.next(worker);
}

if (finished && completed === sent){
resultObserver.complete();
}

})
);
},
),
);

const sub = processor$.subscribe({
next: (o) => resultObserver.next(o),
error: (e) => resultObserver.error(e),
});

workers.forEach(w => idleWorker$$.next(w));

return () => {
workers.forEach(w => w.terminate());
sub.unsubscribe();
}

});
}
7 changes: 6 additions & 1 deletion projects/observable-webworker/src/lib/from-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ import { Observable, Observer, Subscription, Notification } from 'rxjs';
import { dematerialize, map, materialize, tap } from 'rxjs/operators';
import { GenericWorkerMessage, WorkerMessageNotification } from './observable-worker.types';

export interface WorkerOptions {
terminateOnComplete: boolean;
}

export function fromWorker<Input, Output>(
workerFactory: () => Worker,
input$: Observable<Input>,
selectTransferables?: (input: Input) => Transferable[],
options: WorkerOptions = { terminateOnComplete: true },
): Observable<Output> {
return new Observable((responseObserver: Observer<Notification<GenericWorkerMessage<Output>>>) => {
let worker: Worker;
Expand Down Expand Up @@ -39,7 +44,7 @@ export function fromWorker<Input, Output>(
if (subscription) {
subscription.unsubscribe();
}
if (worker) {
if (worker && options.terminateOnComplete) {
worker.terminate();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,11 @@ export interface WorkerMessageNotification<T> extends MessageEvent {
data: Notification<GenericWorkerMessage<T>>;
}

export interface DoWorkUnit<I, O> {
workUnit(input: I): Observable<O>;
selectTransferables?(output: O): Transferable[];
}

export interface DoWork<I, O> {
work(input$: Observable<I>): Observable<O>;
selectTransferables?(output: O): Transferable[];
Expand Down
73 changes: 49 additions & 24 deletions projects/observable-webworker/src/lib/run-worker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
import { fromEvent, Notification } from 'rxjs';
import { dematerialize, map, materialize } from 'rxjs/operators';
import { DoTransferableWork, DoWork, GenericWorkerMessage, WorkerMessageNotification } from './observable-worker.types';
import { fromEvent, Notification, Observable } from 'rxjs';
import { NotificationKind } from 'rxjs/internal/Notification';
import { concatMap, dematerialize, filter, finalize, map, materialize, tap } from 'rxjs/operators';
import {
DoTransferableWork,
DoWork,
DoWorkUnit,
GenericWorkerMessage,
WorkerMessageNotification,
} from './observable-worker.types';

export type ObservableWorkerConstructor<I = any, O = any> = new (...args) => DoWork<I, O>;
export type ObservableWorkerConstructor<I = any, O = any> = new (...args) => DoWork<I, O> | DoWorkUnit<I, O>;

/** @internal */
export type WorkerPostMessageNotification<T> = (
Expand All @@ -11,37 +18,55 @@ export type WorkerPostMessageNotification<T> = (
) => void;

/** @internal */
export function workerIsTransferableType<I, O>(worker: DoWork<I, O>): worker is DoTransferableWork<I, O> {
export function workerIsTransferableType<I, O>(
worker: DoWork<I, O> | DoWorkUnit<I, O>,
): worker is DoTransferableWork<I, O> {
return !!worker.selectTransferables;
}

/** @internal */
export function workerIsUnitType<I, O>(worker: DoWork<I, O> | DoWorkUnit<I, O>): worker is DoWorkUnit<I, O> {
return !!(worker as DoWorkUnit<I, O>).workUnit;
}

/** @internal */
export function processWork<I, O>(
obs$: Observable<O>,
worker: DoWork<I, O> | DoWorkUnit<I, O>,
): Observable<Notification<GenericWorkerMessage<O>>> {
return obs$.pipe(
map(payload => {
const message: GenericWorkerMessage<O> = { payload };

if (workerIsTransferableType(worker)) {
message.transferables = worker.selectTransferables(payload);
}

return message;
}),
materialize(),
);
}

export function runWorker<I, O>(workerConstructor: ObservableWorkerConstructor<I, O>) {
const input$ = fromEvent(self, 'message').pipe(
map((e: WorkerMessageNotification<I>): Notification<GenericWorkerMessage<I>> => e.data),
map((n: Notification<GenericWorkerMessage<I>>) => new Notification(n.kind, n.value, n.error)),
// ignore complete, the calling thread will manage termination of the stream
filter(n => n.kind != NotificationKind.COMPLETE),
dematerialize(),
map(i => i.payload),
);

const worker = new workerConstructor();

worker
.work(input$)
.pipe(
map(payload => {
const message: GenericWorkerMessage<O> = { payload };

if (workerIsTransferableType(worker)) {
message.transferables = worker.selectTransferables(payload);
}

return message;
}),
materialize(),
)
.subscribe((notification: Notification<GenericWorkerMessage<O>>) => {
const transferables = notification.hasValue ? notification.value.transferables : undefined;
// type to workaround typescript trying to compile as non-webworker context
((postMessage as unknown) as WorkerPostMessageNotification<O>)(notification, transferables);
});
const outputStream$ = workerIsUnitType(worker)
? input$.pipe(concatMap(input => processWork(worker.workUnit(input), worker)))
: processWork(worker.work(input$), worker);

outputStream$.subscribe((notification: Notification<GenericWorkerMessage<O>>) => {
const transferables = notification.hasValue ? notification.value.transferables : undefined;
// type to workaround typescript trying to compile as non-webworker context
((postMessage as unknown) as WorkerPostMessageNotification<O>)(notification, transferables);
});
}
1 change: 1 addition & 0 deletions projects/observable-webworker/src/public-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ export * from './lib/observable-worker.types';
export * from './lib/observable-worker.decorator';
export * from './lib/run-worker';
export * from './lib/from-worker';
export * from './lib/from-worker-pool';
14 changes: 14 additions & 0 deletions src/app/app.component.html
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ <h1>
Demo
</h1>

<h2>Single Worker</h2>

Select file to compute SHA-256 sum of, in webworker:
<input type="file" (change)="calculateSha256($event)" />

Expand All @@ -19,3 +21,15 @@ <h3>Events:</h3>
<code>mergeMap</code>
would allow both files to complete hashing.
</p>


<hr>
<h2>Multiple Worker Pool</h2>

Select multiple files to compute SHA-256 sum of, in webworker:
<input type="file" multiple (change)="calculateSha256Multiple($event)" />

<h3>Events:</h3>
<ol>
<li *ngFor="let event of eventListPool$ | async">{{ event }}</li>
</ol>
38 changes: 37 additions & 1 deletion src/app/app.component.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Component } from '@angular/core';
import { Observable, of, Subject } from 'rxjs';
import { finalize, scan, switchMap, tap } from 'rxjs/operators';
import { fromWorker } from '../../projects/observable-webworker/src/lib/from-worker';
import { fromWorkerPool } from '../../projects/observable-webworker/src/lib/from-worker-pool';

@Component({
selector: 'app-root',
Expand All @@ -18,9 +19,11 @@ export class AppComponent {
);

private filesToHash: Subject<File> = new Subject();
private multiFilesToHash: Subject<File[]> = new Subject();

constructor() {
this.filesToHash.pipe(switchMap(file => this.hashFile(file))).subscribe();
this.multiFilesToHash.pipe(switchMap(files => this.hashMultipleFiles(files))).subscribe();
}

public calculateSha256($event): void {
Expand All @@ -34,7 +37,7 @@ export class AppComponent {
const input$: Observable<Blob> = of(file);

return fromWorker<Blob, string>(() => {
const worker = new Worker('./secure-hash-algorithm.worker', { type: 'module' });
const worker = new Worker('./secure-hash-algorithm.worker', { name: 'sha-worker', type: 'module' });
this.events$.next('Main: worker created');
return worker;
}, input$).pipe(
Expand All @@ -43,4 +46,37 @@ export class AppComponent {
}),
);
}

private *workPool(files: File[]) {
for (let file of files) {
yield file;
this.eventsPool$.next(`Main: file ${file.name} picked up for processing`);
}
}

public hashMultipleFiles(files: File[]): Observable<string> {
return fromWorkerPool<Blob, string>(index => {
const worker = new Worker('./secure-hash-algorithm.worker', { name: `sha-worker-${index}`, type: 'module' });
this.eventsPool$.next(`Main: worker ${index} created`);
return worker;
}, this.workPool(files)).pipe(
tap(res => {
this.eventsPool$.next(`Worker: ${res}`);
}),
);
}

public eventsPool$: Subject<string> = new Subject();
public eventListPool$: Observable<string[]> = this.eventsPool$.pipe(
scan<string>((list, event) => {
list.push(`${new Date().toISOString()}: ${event}`);
return list;
}, []),
);

public calculateSha256Multiple($event): void {
this.eventsPool$.next('Main: files selected');
const files: File[] = $event.target.files;
this.multiFilesToHash.next(files);
}
}
34 changes: 18 additions & 16 deletions src/app/secure-hash-algorithm.worker.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,26 @@
import { ObservableWorker } from 'observable-webworker';
import { Observable, Subject } from 'rxjs';
import { map, switchMap, take, tap } from 'rxjs/operators';
import { DoWork, ObservableWorker } from 'observable-webworker';
import { map, switchMap, tap } from 'rxjs/operators';
import { DoWorkUnit } from '../../projects/observable-webworker/src/lib/observable-worker.types';

@ObservableWorker()
export class SecureHashAlgorithmWorker implements DoWork<Blob, string> {
public work(input$: Observable<Blob>): Observable<string> {
export class SecureHashAlgorithmWorker implements DoWorkUnit<File, string> {

public workUnit(input: File): Observable<string> {
const output$: Subject<string> = new Subject();

input$
.pipe(
take(1),
tap(() => output$.next('received file')),
switchMap(message => this.readFileAsArrayBuffer(message)),
tap(() => output$.next('read file')),
switchMap(arrayBuffer => crypto.subtle.digest('SHA-256', arrayBuffer)),
tap(() => output$.next('hashed file')),
map((digest: ArrayBuffer): string => 'hash result: ' + this.arrayBufferToHex(digest)),
tap(() => output$.next('sending hash back to main thread')),
)
.subscribe(output$);
output$.next(`received file ${input.name}`);
return this.readFileAsArrayBuffer(input).pipe(
tap(() => output$.next(`${input.name}: read file`)),
switchMap(arrayBuffer => crypto.subtle.digest('SHA-256', arrayBuffer)),
tap(() => output$.next(`${input.name}: hashed file`)),
map((digest: ArrayBuffer): string => `${input.name}: hash result: ${this.arrayBufferToHex(digest)}`),
tap(() => output$.next(`${input.name}: sending hash back to main thread`)),
tap(out => {
output$.next(out);
output$.complete();
}),
);

return output$;
}
Expand Down

0 comments on commit f0a274a

Please sign in to comment.