From 172a0315e2ecfca0a29fdc586f3d0f6203fb02e9 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker Date: Thu, 3 Oct 2024 13:29:09 -0700 Subject: [PATCH 1/4] Fix worker_threads issues blocking angular --- ext/node/polyfills/worker_threads.ts | 12 ++- ext/web/13_message_port.js | 44 +++++++--- ext/web/message_port.rs | 1 - tests/unit_node/worker_threads_test.ts | 113 +++++++++++++++++++++++++ 4 files changed, 152 insertions(+), 18 deletions(-) diff --git a/ext/node/polyfills/worker_threads.ts b/ext/node/polyfills/worker_threads.ts index 5ff4446f732fa7..d4b75fb30c2c3b 100644 --- a/ext/node/polyfills/worker_threads.ts +++ b/ext/node/polyfills/worker_threads.ts @@ -302,8 +302,8 @@ class NodeWorker extends EventEmitter { if (this.#status !== "TERMINATED") { this.#status = "TERMINATED"; op_host_terminate_worker(this.#id); + this.emit("exit", 0); } - this.emit("exit", 0); return PromiseResolve(0); } @@ -422,7 +422,11 @@ internals.__initWorkerThreads = ( parentPort.once = function (this: ParentPort, name, listener) { // deno-lint-ignore no-explicit-any - const _listener = (ev: any) => listener(ev.data); + const _listener = (ev: any) => { + const message = ev.data; + patchMessagePortIfFound(message); + return listener(message); + }; listeners.set(listener, _listener); this.addEventListener(name, _listener); return this; @@ -494,7 +498,9 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined { port[MessagePortReceiveMessageOnPortSymbol] = true; const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]); if (data === null) return undefined; - return { message: deserializeJsMessageData(data)[0] }; + const message = deserializeJsMessageData(data)[0]; + patchMessagePortIfFound(message); + return { message }; } class NodeMessageChannel { diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index 04697d6aa83614..c33fe562944486 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -22,6 +22,7 @@ const { Symbol, SymbolFor, SymbolIterator, + PromiseResolve, SafeArrayIterator, TypeError, } = primordials; @@ -122,12 +123,16 @@ function nodeWorkerThreadMaybeInvokeCloseCb(port) { } } +const _dataPromise = Symbol("dataPromise"); + class MessagePort extends EventTarget { /** @type {number | null} */ [_id] = null; /** @type {boolean} */ [_enabled] = false; [_refed] = false; + /** @type {Promise | undefined} */ + [_dataPromise] = undefined; constructor() { super(); @@ -193,24 +198,21 @@ class MessagePort extends EventTarget { this[_enabled] = true; while (true) { if (this[_id] === null) break; - // Exit if no message event listeners are present in Node compat mode. - if ( - typeof this[nodeWorkerThreadCloseCb] == "function" && - messageEventListenerCount === 0 - ) break; let data; try { - data = await op_message_port_recv_message( + this[_dataPromise] = op_message_port_recv_message( this[_id], ); + if ( + typeof this[nodeWorkerThreadCloseCb] === "function" && + !this[_refed] + ) { + core.unrefOpPromise(this[_dataPromise]); + } + data = await this[_dataPromise]; + this[_dataPromise] = undefined; } catch (err) { if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) { - // If we were interrupted, check if the interruption is coming - // from `receiveMessageOnPort` API from Node compat, if so, continue. - if (this[MessagePortReceiveMessageOnPortSymbol]) { - this[MessagePortReceiveMessageOnPortSymbol] = false; - continue; - } break; } nodeWorkerThreadMaybeInvokeCloseCb(this); @@ -251,7 +253,11 @@ class MessagePort extends EventTarget { messageEventListenerCount++; } else if (!ref && this[_refed]) { this[_refed] = false; - messageEventListenerCount = 0; + if ( + typeof this[nodeWorkerThreadCloseCb] == "function" && this[_dataPromise] + ) { + core.unrefOpPromise(this[_dataPromise]); + } } } @@ -295,7 +301,17 @@ class MessagePort extends EventTarget { } defineEventHandler(MessagePort.prototype, "message", function (self) { - self.start(); + if (self[nodeWorkerThreadCloseCb]) { + (async () => { + // delay `start()` until he end of this event loop turn, to give `receiveMessageOnPort` + // a chance to receive a message first. this is primarily to resolve an issue with + // a pattern used in `npm:piscina` that results in an indefinite hang + await PromiseResolve(); + self.start(); + })(); + } else { + self.start(); + } }); defineEventHandler(MessagePort.prototype, "messageerror"); diff --git a/ext/web/message_port.rs b/ext/web/message_port.rs index c069037f81f9f9..fa299475d90107 100644 --- a/ext/web/message_port.rs +++ b/ext/web/message_port.rs @@ -239,7 +239,6 @@ pub fn op_message_port_recv_message_sync( #[smi] rid: ResourceId, ) -> Result, AnyError> { let resource = state.resource_table.get::(rid)?; - resource.cancel.cancel(); let mut rx = resource.port.rx.borrow_mut(); match rx.try_recv() { diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index ac797601f5981e..fb10c01d416a49 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -621,3 +621,116 @@ Deno.test({ worker.terminate(); }, }); + +Deno.test({ + name: "[node/worker_threads] receiveMessageOnPort doesn't exit receive loop", + async fn() { + const worker = new workerThreads.Worker( + ` + import { parentPort, receiveMessageOnPort } from "node:worker_threads"; + parentPort.on("message", (msg) => { + const port = msg.port; + port.on("message", (msg2) => { + if (msg2 === "c") { + port.postMessage("done"); + port.unref(); + parentPort.unref(); + } + }); + parentPort.postMessage("ready"); + const msg2 = receiveMessageOnPort(port); + }); + `, + { eval: true }, + ); + + const { port1, port2 } = new workerThreads.MessageChannel(); + + worker.postMessage({ port: port2 }, [port2]); + + const done = Promise.withResolvers(); + + port1.on("message", (msg) => { + assertEquals(msg, "done"); + worker.unref(); + port1.close(); + done.resolve(true); + }); + worker.on("message", (msg) => { + assertEquals(msg, "ready"); + port1.postMessage("a"); + port1.postMessage("b"); + port1.postMessage("c"); + }); + + const timeout = setTimeout(() => { + fail("Test timed out"); + }, 20_000); + try { + const result = await done.promise; + assertEquals(result, true); + } finally { + clearTimeout(timeout); + } + }, +}); + +Deno.test({ + name: "[node/worker_threads] MessagePort.unref doesn't exit receive loop", + async fn() { + const worker = new workerThreads.Worker( + ` + import { parentPort } from "node:worker_threads"; + const assertEquals = (a, b) => { + if (a !== b) { + throw new Error(); + } + }; + let state = 0; + parentPort.on("message", (msg) => { + const port = msg.port; + const expect = ["a", "b", "c"]; + port.on("message", (msg2) => { + assertEquals(msg2, expect[state++]); + if (msg2 === "c") { + port.postMessage({ type: "done", got: msg2 }); + parentPort.unref(); + } + }); + port.unref(); + parentPort.postMessage("ready"); + }); + `, + { eval: true }, + ); + + const { port1, port2 } = new workerThreads.MessageChannel(); + + const done = Promise.withResolvers(); + + port1.on("message", (msg) => { + assertEquals(msg.type, "done"); + assertEquals(msg.got, "c"); + worker.unref(); + port1.close(); + done.resolve(true); + }); + worker.on("message", (msg) => { + assertEquals(msg, "ready"); + port1.postMessage("a"); + port1.postMessage("b"); + port1.postMessage("c"); + }); + worker.postMessage({ port: port2 }, [port2]); + + const timeout = setTimeout(() => { + fail("Test timed out"); + }, 20_000); + try { + const result = await done.promise; + assertEquals(result, true); + } finally { + clearTimeout(timeout); + } + }, +}); From d76813ea4cce870caf01b4bc10e90c97b317e6e7 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker Date: Thu, 3 Oct 2024 14:46:59 -0700 Subject: [PATCH 2/4] Add test for piscina regression --- tests/unit_node/worker_threads_test.ts | 88 ++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/tests/unit_node/worker_threads_test.ts b/tests/unit_node/worker_threads_test.ts index fb10c01d416a49..24a910789894c1 100644 --- a/tests/unit_node/worker_threads_test.ts +++ b/tests/unit_node/worker_threads_test.ts @@ -734,3 +734,91 @@ Deno.test({ } }, }); + +Deno.test({ + name: "[node/worker_threads] npm:piscina wait loop hang regression", + async fn() { + const worker = new workerThreads.Worker( + ` + import { assert, assertEquals } from "@std/assert"; + import { parentPort, receiveMessageOnPort } from "node:worker_threads"; + + assert(parentPort !== null); + + let currentTasks = 0; + let lastSeen = 0; + + parentPort.on("message", (msg) => { + (async () => { + assert(typeof msg === "object" && msg !== null); + assert(msg.buf !== undefined); + assert(msg.port !== undefined); + const { buf, port } = msg; + port.postMessage("ready"); + port.on("message", (msg) => onMessage(msg, buf, port)); + atomicsWaitLoop(buf, port); + })(); + }); + + function onMessage(msg, buf, port) { + currentTasks++; + (async () => { + assert(msg.taskName !== undefined); + port.postMessage({ type: "response", taskName: msg.taskName }); + currentTasks--; + atomicsWaitLoop(buf, port); + })(); + } + + function atomicsWaitLoop(buf, port) { + while (currentTasks === 0) { + Atomics.wait(buf, 0, lastSeen); + lastSeen = Atomics.load(buf, 0); + let task; + while ((task = receiveMessageOnPort(port)) !== undefined) { + onMessage(task.message, buf, port); + } + } + } + `, + { eval: true }, + ); + + const sab = new SharedArrayBuffer(4); + const buf = new Int32Array(sab); + const { port1, port2 } = new workerThreads.MessageChannel(); + + const done = Promise.withResolvers(); + + port1.unref(); + + worker.postMessage({ + type: "init", + buf, + port: port2, + }, [port2]); + + let count = 0; + port1.on("message", (msg) => { + if (count++ === 0) { + assertEquals(msg, "ready"); + } else { + assertEquals(msg.type, "response"); + port1.close(); + done.resolve(true); + } + }); + + port1.postMessage({ + taskName: "doThing", + }); + + Atomics.add(buf, 0, 1); + Atomics.notify(buf, 0, 1); + + worker.unref(); + + const result = await done.promise; + assertEquals(result, true); + }, +}); From 3dd77196b0bb8042edd7a5731bacf23dd154d1a2 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker Date: Thu, 3 Oct 2024 15:00:12 -0700 Subject: [PATCH 3/4] Also unref if no listeners --- ext/web/13_message_port.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index c33fe562944486..bf2f46b9e138ed 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -205,7 +205,7 @@ class MessagePort extends EventTarget { ); if ( typeof this[nodeWorkerThreadCloseCb] === "function" && - !this[_refed] + (!this[_refed] || messageEventListenerCount === 0) ) { core.unrefOpPromise(this[_dataPromise]); } From 1f776fd0ea5b07ab18e750b3870964933a6f9ab1 Mon Sep 17 00:00:00 2001 From: Nathan Whitaker Date: Thu, 3 Oct 2024 17:54:35 -0700 Subject: [PATCH 4/4] Rework messageport ref/unref logic --- ext/web/13_message_port.js | 54 +++++++++++++++++++++++++++----------- runtime/js/99_main.js | 5 +++- 2 files changed, 43 insertions(+), 16 deletions(-) diff --git a/ext/web/13_message_port.js b/ext/web/13_message_port.js index bf2f46b9e138ed..cf72c43e6ff291 100644 --- a/ext/web/13_message_port.js +++ b/ext/web/13_message_port.js @@ -42,7 +42,10 @@ import { import { isDetachedBuffer } from "./06_streams.js"; import { DOMException } from "./01_dom_exception.js"; -let messageEventListenerCount = 0; +// counter of how many message ports are actively refed +// either due to the existence of "message" event listeners or +// explicit calls to ref/unref (in the case of node message ports) +let refedMessagePortsCount = 0; class MessageChannel { /** @type {MessagePort} */ @@ -94,6 +97,7 @@ const MessagePortReceiveMessageOnPortSymbol = Symbol( ); const _enabled = Symbol("enabled"); const _refed = Symbol("refed"); +const _messageEventListenerCount = Symbol("messageEventListenerCount"); const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb"); const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked"); export const refMessagePort = Symbol("refMessagePort"); @@ -110,6 +114,9 @@ function createMessagePort(id) { port[core.hostObjectBrand] = core.hostObjectBrand; setEventTargetData(port); port[_id] = id; + port[_enabled] = false; + port[_messageEventListenerCount] = 0; + port[_refed] = false; return port; } @@ -123,6 +130,7 @@ function nodeWorkerThreadMaybeInvokeCloseCb(port) { } } +const _isRefed = Symbol("isRefed"); const _dataPromise = Symbol("dataPromise"); class MessagePort extends EventTarget { @@ -133,6 +141,7 @@ class MessagePort extends EventTarget { [_refed] = false; /** @type {Promise | undefined} */ [_dataPromise] = undefined; + [_messageEventListenerCount] = 0; constructor() { super(); @@ -205,7 +214,7 @@ class MessagePort extends EventTarget { ); if ( typeof this[nodeWorkerThreadCloseCb] === "function" && - (!this[_refed] || messageEventListenerCount === 0) + !this[_refed] ) { core.unrefOpPromise(this[_dataPromise]); } @@ -248,15 +257,25 @@ class MessagePort extends EventTarget { } [refMessagePort](ref) { - if (ref && !this[_refed]) { - this[_refed] = true; - messageEventListenerCount++; - } else if (!ref && this[_refed]) { - this[_refed] = false; - if ( - typeof this[nodeWorkerThreadCloseCb] == "function" && this[_dataPromise] - ) { - core.unrefOpPromise(this[_dataPromise]); + if (ref) { + if (!this[_refed]) { + refedMessagePortsCount++; + if ( + this[_dataPromise] + ) { + core.refOpPromise(this[_dataPromise]); + } + this[_refed] = true; + } + } else if (!ref) { + if (this[_refed]) { + refedMessagePortsCount--; + if ( + this[_dataPromise] + ) { + core.unrefOpPromise(this[_dataPromise]); + } + this[_refed] = false; } } } @@ -272,15 +291,20 @@ class MessagePort extends EventTarget { removeEventListener(...args) { if (args[0] == "message") { - messageEventListenerCount--; + if (--this[_messageEventListenerCount] === 0 && this[_refed]) { + refedMessagePortsCount--; + this[_refed] = false; + } } super.removeEventListener(...new SafeArrayIterator(args)); } addEventListener(...args) { if (args[0] == "message") { - messageEventListenerCount++; - if (!this[_refed]) this[_refed] = true; + if (++this[_messageEventListenerCount] === 1 && !this[_refed]) { + refedMessagePortsCount++; + this[_refed] = true; + } } super.addEventListener(...new SafeArrayIterator(args)); } @@ -479,12 +503,12 @@ function structuredClone(value, options) { export { deserializeJsMessageData, MessageChannel, - messageEventListenerCount, MessagePort, MessagePortIdSymbol, MessagePortPrototype, MessagePortReceiveMessageOnPortSymbol, nodeWorkerThreadCloseCb, + refedMessagePortsCount, serializeJsMessageData, structuredClone, }; diff --git a/runtime/js/99_main.js b/runtime/js/99_main.js index 0da2072b848d87..56a5b411bb7ae6 100644 --- a/runtime/js/99_main.js +++ b/runtime/js/99_main.js @@ -169,8 +169,11 @@ let isClosing = false; let globalDispatchEvent; function hasMessageEventListener() { + // the function name is kind of a misnomer, but we want to behave + // as if we have message event listeners if a node message port is explicitly + // refed (and the inverse as well) return event.listenerCount(globalThis, "message") > 0 || - messagePort.messageEventListenerCount > 0; + messagePort.refedMessagePortsCount > 0; } async function pollForMessages() {