Skip to content
This repository was archived by the owner on Apr 14, 2025. It is now read-only.

Commit f9120de

Browse files
committed
refactor!: rewrite and optimize worker logic, clean up
1 parent 39367b0 commit f9120de

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+689
-737
lines changed

package-lock.json

+148-425
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

+4-3
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
"import": "./dist/esm/index.js",
4040
"require": "./dist/cjs/index.js",
4141
"types": "./dist/types/index.d.ts"
42-
}
42+
},
43+
"./package.json": "./package.json"
4344
},
4445
"typesVersions": {
4546
"*": {
@@ -59,7 +60,7 @@
5960
"uuid": "8.3.2"
6061
},
6162
"devDependencies": {
62-
"@jest/globals": "^29.7.0",
63+
"@jest/globals": "29.7.0",
6364
"@types/bluebird": "3.5.36",
6465
"@types/debug": "4.1.7",
6566
"@types/jest": "29.5.8",
@@ -71,7 +72,7 @@
7172
"eslint": "8.53.0",
7273
"eslint-config-prettier": "9.0.0",
7374
"eslint-plugin-prettier": "5.0.1",
74-
"husky": "7.0.4",
75+
"husky": "9.0.11",
7576
"jest": "29.7.0",
7677
"lint-staged": "11.1.2",
7778
"prettier": "3.0.3",

scripts/test.sh

+2-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ set -x
1111
set -e
1212

1313
export NODE_ENV=test
14-
DIRNAME=$(dirname "$0")
15-
16-
. "${DIRNAME}"/build.sh
14+
export NODE_OPTIONS="$NODE_OPTIONS --trace-warnings"
15+
npm run build
1716
jest --runInBand --verbose --collectCoverage "$@"

src/event-bus/event-bus-redis.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ export class EventBusRedis<Events extends TEventBusEvent>
119119
return this;
120120
}
121121

122-
shutDown(cb: ICallback<void>) {
122+
shutdown(cb: ICallback<void>) {
123123
if (this.connected) {
124124
async.waterfall(
125125
[

src/event-bus/event-bus.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ export class EventBus<Events extends TEventBusEvent>
9191
return this;
9292
}
9393

94-
shutDown(cb: ICallback<void>) {
94+
shutdown(cb: ICallback<void>) {
9595
if (this.connected) this.connected = false;
9696
cb();
9797
}

src/event-bus/types/event-bus.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,5 +19,5 @@ export type TEventBusEvent = TEventEmitterEvent & {
1919

2020
export interface IEventBus<Events extends TEventBusEvent>
2121
extends IEventEmitter<Events> {
22-
shutDown(cb: ICallback<void>): void;
22+
shutdown(cb: ICallback<void>): void;
2323
}

src/redis-client/clients/ioredis-client.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ export class IoredisClient extends RedisClientAbstract {
389389
}
390390
}
391391

392-
shutDown(cb: ICallback<void> = () => void 0): void {
392+
shutdown(cb: ICallback<void> = () => void 0): void {
393393
if (!this.connectionClosed) {
394394
this.client.once('end', cb);
395395
this.client.quit();

src/redis-client/clients/node-redis-client.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -517,7 +517,7 @@ export class NodeRedisClient extends RedisClientAbstract {
517517
}
518518
}
519519

520-
shutDown(cb: ICallback<void> = () => void 0): void {
520+
shutdown(cb: ICallback<void> = () => void 0): void {
521521
if (!this.connectionClosed) {
522522
this.client.once('end', cb);
523523
this.client.quit();

src/redis-client/clients/redis-client-abstract.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,7 @@ export abstract class RedisClientAbstract
322322

323323
abstract end(flush: boolean): void;
324324

325-
abstract shutDown(cb: ICallback<void>): void;
325+
abstract shutdown(cb: ICallback<void>): void;
326326

327327
abstract getInfo(cb: ICallback<string>): void;
328328

src/redis-client/types/redis-client.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ export interface IRedisClient extends EventEmitter<TRedisClientEvent> {
246246

247247
end(flush: boolean): void;
248248

249-
shutDown(cb: ICallback<void>): void;
249+
shutdown(cb: ICallback<void>): void;
250250

251251
getInfo(cb: ICallback<string>): void;
252252

src/worker/errors/worker-thread.error.ts

+5-5
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,17 @@
88
*/
99

1010
import {
11-
EWorkerThreadExecutionCode,
12-
EWorkerThreadExitCode,
13-
TWorkerThreadMessage,
11+
EWorkerThreadChildExecutionCode,
12+
EWorkerThreadChildExitCode,
13+
TWorkerThreadChildMessage,
1414
} from '../types/index.js';
1515
import { WorkerError } from './worker-error.js';
1616

1717
export class WorkerThreadError extends WorkerError {
18-
constructor(msg: TWorkerThreadMessage) {
18+
constructor(msg: TWorkerThreadChildMessage) {
1919
const { code, error } = msg;
2020
const messageStr = `Error code: ${
21-
EWorkerThreadExitCode[code] ?? EWorkerThreadExecutionCode[code]
21+
EWorkerThreadChildExitCode[code] ?? EWorkerThreadChildExecutionCode[code]
2222
}.${error ? ` Cause: ${error.name}(${error.message})` : ''}`;
2323
super(messageStr);
2424
}

src/worker/types/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@
88
*/
99

1010
export * from './worker.js';
11+
export * from './worker-thread.js';

src/worker/types/worker-thread.ts

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/*
2+
* Copyright (c)
3+
* Weyoss <weyoss@protonmail.com>
4+
* https://github.com/weyoss
5+
*
6+
* This source code is licensed under the MIT license found in the LICENSE file
7+
* in the root directory of this source tree.
8+
*/
9+
10+
import { EWorkerType } from './worker.js';
11+
12+
export interface IWorkerThreadData {
13+
type: EWorkerType;
14+
filename: string;
15+
initialPayload: unknown;
16+
}
17+
18+
export enum EWorkerThreadChildExitCode {
19+
WORKER_DATA_REQUIRED = 100,
20+
INVALID_WORKER_TYPE,
21+
FILE_IMPORT_ERROR,
22+
UNCAUGHT_EXCEPTION,
23+
FILE_EXTENSION_ERROR,
24+
FILE_READ_ERROR,
25+
TERMINATED,
26+
}
27+
28+
export enum EWorkerThreadChildExecutionCode {
29+
OK = 200,
30+
PROCESSING_ERROR,
31+
PROCESSING_CAUGHT_ERROR,
32+
}
33+
34+
export type TWorkerThreadChildMessageCode =
35+
| EWorkerThreadChildExitCode
36+
| EWorkerThreadChildExecutionCode;
37+
38+
export type TWorkerThreadChildError = {
39+
name: string;
40+
message: string;
41+
};
42+
43+
export type TWorkerThreadChildMessage<Data = unknown> = {
44+
code: TWorkerThreadChildMessageCode;
45+
data?: Data;
46+
error?: TWorkerThreadChildError | null;
47+
};
48+
49+
export enum EWorkerThreadParentMessage {
50+
CALL,
51+
RUN,
52+
SHUTDOWN,
53+
}
54+
55+
export type TWorkerThreadParentMessageCall = {
56+
type: EWorkerThreadParentMessage.CALL;
57+
payload: unknown;
58+
};
59+
60+
export type TWorkerThreadParentMessageRun = {
61+
type: EWorkerThreadParentMessage.RUN;
62+
};
63+
64+
export type TWorkerThreadParentMessageShutdown = {
65+
type: EWorkerThreadParentMessage.SHUTDOWN;
66+
};
67+
68+
export type TWorkerThreadParentMessage =
69+
| TWorkerThreadParentMessageCall
70+
| TWorkerThreadParentMessageRun
71+
| TWorkerThreadParentMessageShutdown;

src/worker/types/worker.ts

+17-42
Original file line numberDiff line numberDiff line change
@@ -14,49 +14,24 @@ export enum EWorkerType {
1414
RUNNABLE,
1515
}
1616

17-
// eslint-disable-next-line
18-
export type TWorkerFn = (...args: [...any[], ICallback<any>]) => void; // type-coverage:ignore-line
19-
20-
export interface IWorkerRunnable<Payload> {
21-
run(initialPayload: Payload, cb: ICallback<void>): void;
17+
export type TWorkerCallableFunction = (
18+
args: unknown,
19+
cb: ICallback<unknown>,
20+
) => void;
21+
22+
export type TWorkerRunnableFunctionFactory = (
23+
initialPayload: unknown,
24+
) => IWorkerRunnable;
25+
26+
export type TWorkerFunction =
27+
| TWorkerRunnableFunctionFactory
28+
| TWorkerCallableFunction;
29+
30+
export interface IWorkerRunnable {
31+
run(cb: ICallback<void>): void;
32+
shutdown(cb: ICallback<void>): void;
2233
}
2334

2435
export interface IWorkerCallable<Payload, Reply> {
25-
call(payload: Payload, cb: ICallback<Reply>): void;
26-
}
27-
28-
export interface IWorkerData {
29-
type: EWorkerType;
30-
filename: string;
31-
}
32-
33-
export enum EWorkerThreadExitCode {
34-
WORKER_DATA_REQUIRED = 100,
35-
INVALID_WORKER_TYPE,
36-
FILE_IMPORT_ERROR,
37-
UNCAUGHT_EXCEPTION,
38-
FILE_EXTENSION_ERROR,
39-
FILE_READ_ERROR,
40-
TERMINATED,
36+
call(args: Payload, cb: ICallback<Reply>): void;
4137
}
42-
43-
export enum EWorkerThreadExecutionCode {
44-
OK = 200,
45-
PROCESSING_ERROR,
46-
PROCESSING_CAUGHT_ERROR,
47-
}
48-
49-
export type TWorkerThreadMessageCode =
50-
| EWorkerThreadExitCode
51-
| EWorkerThreadExecutionCode;
52-
53-
export type TWorkerThreadError = {
54-
name: string;
55-
message: string;
56-
};
57-
58-
export type TWorkerThreadMessage = {
59-
code: TWorkerThreadMessageCode;
60-
data?: unknown;
61-
error?: TWorkerThreadError | null;
62-
};

src/worker/worker-callable.ts

+12-4
Original file line numberDiff line numberDiff line change
@@ -8,22 +8,30 @@
88
*/
99

1010
import { ICallback } from '../common/index.js';
11-
import { EWorkerType, IWorkerCallable } from './types/index.js';
1211
import { WorkerPayloadRequiredError } from './errors/index.js';
12+
import {
13+
EWorkerThreadParentMessage,
14+
EWorkerType,
15+
IWorkerCallable,
16+
} from './types/index.js';
1317
import { Worker } from './worker.js';
1418

1519
export class WorkerCallable<Payload, Reply>
16-
extends Worker
20+
extends Worker<Payload, Reply>
1721
implements IWorkerCallable<Payload, Reply>
1822
{
1923
protected readonly type: EWorkerType = EWorkerType.CALLABLE;
2024

25+
constructor(workerFilename: string) {
26+
super(workerFilename);
27+
}
28+
2129
call(payload: Payload, cb: ICallback<Reply>) {
2230
if (payload === null || payload === undefined) {
2331
cb(new WorkerPayloadRequiredError());
2432
} else {
25-
// @ts-expect-error reply data type is known only at runtime
26-
this.exec(payload, cb);
33+
this.registerEvents(cb);
34+
this.postMessage({ type: EWorkerThreadParentMessage.CALL, payload });
2735
}
2836
}
2937
}

src/worker/worker-resource-group.ts

+5-7
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ export class WorkerResourceGroup extends Runnable<TWorkerResourceGroupEvent> {
2828
protected readonly locker;
2929
protected readonly redisClient;
3030
protected readonly logger;
31-
protected workers: { instance: WorkerRunnable<unknown>; payload: unknown }[] =
32-
[];
31+
protected workers: WorkerRunnable<unknown>[] = [];
3332
protected runWorkersLocked = false;
3433

3534
constructor(
@@ -77,8 +76,7 @@ export class WorkerResourceGroup extends Runnable<TWorkerResourceGroupEvent> {
7776
async.each(
7877
this.workers,
7978
(worker, _, done) => {
80-
const { instance, payload } = worker;
81-
instance.run(payload, done);
79+
worker.run(done);
8280
},
8381
(err) => {
8482
this.runWorkersLocked = false;
@@ -94,7 +92,7 @@ export class WorkerResourceGroup extends Runnable<TWorkerResourceGroupEvent> {
9492
async.each(
9593
this.workers,
9694
(worker, _, done) => {
97-
worker.instance.shutDown(() => done());
95+
worker.shutdown(() => done());
9896
},
9997
() => {
10098
this.workers = [];
@@ -129,9 +127,9 @@ export class WorkerResourceGroup extends Runnable<TWorkerResourceGroupEvent> {
129127
}
130128

131129
addWorker = (filename: string, payload: unknown): void => {
132-
const worker = new WorkerRunnable(filename);
130+
const worker = new WorkerRunnable(filename, payload);
133131
worker.on('worker.error', (err) => this.handleError(err));
134-
this.workers.push({ instance: worker, payload });
132+
this.workers.push(worker);
135133
};
136134

137135
loadFromDir = (

0 commit comments

Comments
 (0)