Skip to content

Commit 30d28ae

Browse files
jedlikowskiJounQin
andauthored
fix: handle outdated message in channel queue (#184)
Co-authored-by: JounQin <admin@1stg.me>
1 parent 35a89ea commit 30d28ae

File tree

5 files changed

+207
-22
lines changed

5 files changed

+207
-22
lines changed

.changeset/violet-laws-compare.md

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"synckit": patch
3+
---
4+
5+
fix: handle outdated message in channel queue

src/index.ts

+58-21
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import path from 'node:path'
55
import { fileURLToPath, pathToFileURL } from 'node:url'
66
import {
77
MessageChannel,
8+
MessagePort,
89
type TransferListItem,
910
Worker,
1011
parentPort,
@@ -19,6 +20,7 @@ import type {
1920
AnyAsyncFn,
2021
AnyFn,
2122
GlobalShim,
23+
MainToWorkerCommandMessage,
2224
MainToWorkerMessage,
2325
Syncify,
2426
ValueOf,
@@ -522,36 +524,59 @@ function startWorkerThread<R, T extends AnyAsyncFn<R>>(
522524

523525
let nextID = 0
524526

525-
const syncFn = (...args: Parameters<T>): R => {
526-
const id = nextID++
527-
528-
const msg: MainToWorkerMessage<Parameters<T>> = { id, args }
529-
530-
worker.postMessage(msg)
531-
532-
const status = Atomics.wait(sharedBufferView!, 0, 0, timeout)
533-
534-
// Reset SharedArrayBuffer for next call
527+
const receiveMessageWithId = (
528+
port: MessagePort,
529+
expectedId: number,
530+
waitingTimeout?: number,
531+
): WorkerToMainMessage<R> => {
532+
const start = Date.now()
533+
const status = Atomics.wait(sharedBufferView!, 0, 0, waitingTimeout)
535534
Atomics.store(sharedBufferView!, 0, 0)
536535

537-
/* istanbul ignore if */
538536
if (!['ok', 'not-equal'].includes(status)) {
537+
const abortMsg: MainToWorkerCommandMessage = {
538+
id: expectedId,
539+
cmd: 'abort',
540+
}
541+
port.postMessage(abortMsg)
539542
throw new Error('Internal error: Atomics.wait() failed: ' + status)
540543
}
541544

542-
const {
543-
id: id2,
544-
result,
545-
error,
546-
properties,
547-
} = (receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> })
548-
.message
545+
const { id, ...message } = (
546+
receiveMessageOnPort(mainPort) as { message: WorkerToMainMessage<R> }
547+
).message
549548

550-
/* istanbul ignore if */
551-
if (id !== id2) {
552-
throw new Error(`Internal error: Expected id ${id} but got id ${id2}`)
549+
if (id < expectedId) {
550+
const waitingTime = Date.now() - start
551+
return receiveMessageWithId(
552+
port,
553+
expectedId,
554+
waitingTimeout ? waitingTimeout - waitingTime : undefined,
555+
)
553556
}
554557

558+
if (expectedId !== id) {
559+
throw new Error(
560+
`Internal error: Expected id ${expectedId} but got id ${id}`,
561+
)
562+
}
563+
564+
return { id, ...message }
565+
}
566+
567+
const syncFn = (...args: Parameters<T>): R => {
568+
const id = nextID++
569+
570+
const msg: MainToWorkerMessage<Parameters<T>> = { id, args }
571+
572+
worker.postMessage(msg)
573+
574+
const { result, error, properties } = receiveMessageWithId(
575+
mainPort,
576+
id,
577+
timeout,
578+
)
579+
555580
if (error) {
556581
throw Object.assign(error as object, properties)
557582
}
@@ -587,12 +612,24 @@ export function runAsWorker<
587612
({ id, args }: MainToWorkerMessage<Parameters<T>>) => {
588613
// eslint-disable-next-line @typescript-eslint/no-floating-promises
589614
;(async () => {
615+
let isAborted = false
616+
const handleAbortMessage = (msg: MainToWorkerCommandMessage) => {
617+
if (msg.id === id && msg.cmd === 'abort') {
618+
isAborted = true
619+
}
620+
}
621+
workerPort.on('message', handleAbortMessage)
590622
let msg: WorkerToMainMessage<R>
591623
try {
592624
msg = { id, result: await fn(...args) }
593625
} catch (error: unknown) {
594626
msg = { id, error, properties: extractProperties(error) }
595627
}
628+
workerPort.off('message', handleAbortMessage)
629+
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
630+
if (isAborted) {
631+
return
632+
}
596633
workerPort.postMessage(msg)
597634
Atomics.add(sharedBufferView, 0, 1)
598635
Atomics.notify(sharedBufferView, 0)

src/types.ts

+5
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,11 @@ export interface MainToWorkerMessage<T extends unknown[]> {
2525
args: T
2626
}
2727

28+
export interface MainToWorkerCommandMessage {
29+
id: number
30+
cmd: string
31+
}
32+
2833
export interface WorkerData {
2934
sharedBuffer: SharedArrayBuffer
3035
workerPort: MessagePort

test/fn.spec.ts

+115-1
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@ import path from 'node:path'
33

44
import { jest } from '@jest/globals'
55

6-
import { _dirname, testIf, tsUseEsmSupported } from './helpers.js'
6+
import {
7+
_dirname,
8+
setupReceiveMessageOnPortMock,
9+
testIf,
10+
tsUseEsmSupported,
11+
} from './helpers.js'
712
import type { AsyncWorkerFn } from './types.js'
813

914
import { createSyncFn } from 'synckit'
@@ -12,6 +17,7 @@ const { SYNCKIT_TIMEOUT } = process.env
1217

1318
beforeEach(() => {
1419
jest.resetModules()
20+
jest.restoreAllMocks()
1521

1622
delete process.env.SYNCKIT_GLOBAL_SHIMS
1723

@@ -104,6 +110,114 @@ test('timeout', async () => {
104110
)
105111
})
106112

113+
test('subsequent executions after timeout', async () => {
114+
const executionTimeout = 30
115+
const longRunningTaskDuration = executionTimeout * 10
116+
process.env.SYNCKIT_TIMEOUT = executionTimeout.toString()
117+
118+
const { createSyncFn } = await import('synckit')
119+
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)
120+
121+
// start an execution in worker that will definitely time out
122+
expect(() => syncFn(1, longRunningTaskDuration)).toThrow()
123+
124+
// wait for timed out execution to finish inside worker
125+
await new Promise(resolve => setTimeout(resolve, longRunningTaskDuration))
126+
127+
// subsequent executions should work correctly
128+
expect(syncFn(2, 1)).toBe(2)
129+
expect(syncFn(3, 1)).toBe(3)
130+
})
131+
132+
test('handling of outdated message from worker', async () => {
133+
const executionTimeout = 60
134+
process.env.SYNCKIT_TIMEOUT = executionTimeout.toString()
135+
const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock()
136+
137+
jest.spyOn(Atomics, 'wait').mockReturnValue('ok')
138+
139+
receiveMessageOnPortMock
140+
.mockReturnValueOnce({ message: { id: -1 } })
141+
.mockReturnValueOnce({ message: { id: 0, result: 1 } })
142+
143+
const { createSyncFn } = await import('synckit')
144+
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)
145+
expect(syncFn(1)).toBe(1)
146+
expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2)
147+
})
148+
149+
test('propagation of undefined timeout', async () => {
150+
delete process.env.SYNCKIT_TIMEOUT
151+
const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock()
152+
153+
const atomicsWaitSpy = jest.spyOn(Atomics, 'wait').mockReturnValue('ok')
154+
155+
receiveMessageOnPortMock
156+
.mockReturnValueOnce({ message: { id: -1 } })
157+
.mockReturnValueOnce({ message: { id: 0, result: 1 } })
158+
159+
const { createSyncFn } = await import('synckit')
160+
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)
161+
expect(syncFn(1)).toBe(1)
162+
expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2)
163+
164+
const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] =
165+
atomicsWaitSpy.mock.calls
166+
const [, , , firstAtomicsWaitCallTimeout] = firstAtomicsWaitArgs
167+
const [, , , secondAtomicsWaitCallTimeout] = secondAtomicsWaitArgs
168+
169+
expect(typeof firstAtomicsWaitCallTimeout).toBe('undefined')
170+
expect(typeof secondAtomicsWaitCallTimeout).toBe('undefined')
171+
})
172+
173+
test('reduction of waiting time', async () => {
174+
const synckitTimeout = 60
175+
process.env.SYNCKIT_TIMEOUT = synckitTimeout.toString()
176+
const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock()
177+
178+
const atomicsWaitSpy = jest.spyOn(Atomics, 'wait').mockImplementation(() => {
179+
const start = Date.now()
180+
// simulate waiting 10ms for worker to respond
181+
while (Date.now() - start < 10) {
182+
continue
183+
}
184+
185+
return 'ok'
186+
})
187+
188+
receiveMessageOnPortMock
189+
.mockReturnValueOnce({ message: { id: -1 } })
190+
.mockReturnValueOnce({ message: { id: 0, result: 1 } })
191+
192+
const { createSyncFn } = await import('synckit')
193+
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)
194+
expect(syncFn(1)).toBe(1)
195+
expect(receiveMessageOnPortMock).toHaveBeenCalledTimes(2)
196+
197+
const [firstAtomicsWaitArgs, secondAtomicsWaitArgs] =
198+
atomicsWaitSpy.mock.calls
199+
const [, , , firstAtomicsWaitCallTimeout] = firstAtomicsWaitArgs
200+
const [, , , secondAtomicsWaitCallTimeout] = secondAtomicsWaitArgs
201+
202+
expect(typeof firstAtomicsWaitCallTimeout).toBe('number')
203+
expect(firstAtomicsWaitCallTimeout).toBe(synckitTimeout)
204+
expect(typeof secondAtomicsWaitCallTimeout).toBe('number')
205+
expect(secondAtomicsWaitCallTimeout).toBeLessThan(synckitTimeout)
206+
})
207+
208+
test('unexpected message from worker', async () => {
209+
jest.spyOn(Atomics, 'wait').mockReturnValue('ok')
210+
211+
const receiveMessageOnPortMock = await setupReceiveMessageOnPortMock()
212+
receiveMessageOnPortMock.mockReturnValueOnce({ message: { id: 100 } })
213+
214+
const { createSyncFn } = await import('synckit')
215+
const syncFn = createSyncFn<AsyncWorkerFn>(workerCjsPath)
216+
expect(() => syncFn(1)).toThrow(
217+
'Internal error: Expected id 0 but got id 100',
218+
)
219+
})
220+
107221
test('globalShims env', async () => {
108222
process.env.SYNCKIT_GLOBAL_SHIMS = '1'
109223

test/helpers.ts

+24
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import path from 'node:path'
22
import { fileURLToPath } from 'node:url'
3+
import WorkerThreads from 'node:worker_threads'
4+
5+
import { jest } from '@jest/globals'
36

47
import { MTS_SUPPORTED_NODE_VERSION } from 'synckit'
58

@@ -13,3 +16,24 @@ export const tsUseEsmSupported =
1316
nodeVersion >= MTS_SUPPORTED_NODE_VERSION && nodeVersion <= 18.18
1417

1518
export const testIf = (condition: boolean) => (condition ? it : it.skip)
19+
20+
type ReceiveMessageOnPortMock = jest.Mock<
21+
typeof WorkerThreads.receiveMessageOnPort
22+
>
23+
export const setupReceiveMessageOnPortMock =
24+
async (): Promise<ReceiveMessageOnPortMock> => {
25+
jest.unstable_mockModule('node:worker_threads', () => {
26+
return {
27+
...WorkerThreads,
28+
receiveMessageOnPort: jest.fn(WorkerThreads.receiveMessageOnPort),
29+
}
30+
})
31+
32+
const { receiveMessageOnPort: receiveMessageOnPortMock } = (await import(
33+
'node:worker_threads'
34+
)) as unknown as {
35+
receiveMessageOnPort: ReceiveMessageOnPortMock
36+
}
37+
38+
return receiveMessageOnPortMock
39+
}

0 commit comments

Comments
 (0)