Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(node): fix worker_threads issues blocking Angular support #26024

Merged
merged 4 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions ext/node/polyfills/worker_threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ class NodeWorker extends EventEmitter {
if (this.#status !== "TERMINATED") {
this.#status = "TERMINATED";
op_host_terminate_worker(this.#id);
this.emit("exit", 0);
}
this.emit("exit", 0);
return PromiseResolve(0);
}

Expand Down Expand Up @@ -422,7 +422,11 @@ internals.__initWorkerThreads = (

parentPort.once = function (this: ParentPort, name, listener) {
// deno-lint-ignore no-explicit-any
const _listener = (ev: any) => listener(ev.data);
const _listener = (ev: any) => {
const message = ev.data;
patchMessagePortIfFound(message);
return listener(message);
};
listeners.set(listener, _listener);
this.addEventListener(name, _listener);
return this;
Expand Down Expand Up @@ -494,7 +498,9 @@ export function receiveMessageOnPort(port: MessagePort): object | undefined {
port[MessagePortReceiveMessageOnPortSymbol] = true;
const data = op_message_port_recv_message_sync(port[MessagePortIdSymbol]);
if (data === null) return undefined;
return { message: deserializeJsMessageData(data)[0] };
const message = deserializeJsMessageData(data)[0];
patchMessagePortIfFound(message);
return { message };
}

class NodeMessageChannel {
Expand Down
88 changes: 64 additions & 24 deletions ext/web/13_message_port.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ const {
Symbol,
SymbolFor,
SymbolIterator,
PromiseResolve,
SafeArrayIterator,
TypeError,
} = primordials;
Expand All @@ -41,7 +42,10 @@ import {
import { isDetachedBuffer } from "./06_streams.js";
import { DOMException } from "./01_dom_exception.js";

let messageEventListenerCount = 0;
// counter of how many message ports are actively refed
// either due to the existence of "message" event listeners or
// explicit calls to ref/unref (in the case of node message ports)
let refedMessagePortsCount = 0;

class MessageChannel {
/** @type {MessagePort} */
Expand Down Expand Up @@ -93,6 +97,7 @@ const MessagePortReceiveMessageOnPortSymbol = Symbol(
);
const _enabled = Symbol("enabled");
const _refed = Symbol("refed");
const _messageEventListenerCount = Symbol("messageEventListenerCount");
const nodeWorkerThreadCloseCb = Symbol("nodeWorkerThreadCloseCb");
const nodeWorkerThreadCloseCbInvoked = Symbol("nodeWorkerThreadCloseCbInvoked");
export const refMessagePort = Symbol("refMessagePort");
Expand All @@ -109,6 +114,9 @@ function createMessagePort(id) {
port[core.hostObjectBrand] = core.hostObjectBrand;
setEventTargetData(port);
port[_id] = id;
port[_enabled] = false;
port[_messageEventListenerCount] = 0;
port[_refed] = false;
return port;
}

Expand All @@ -122,12 +130,18 @@ function nodeWorkerThreadMaybeInvokeCloseCb(port) {
}
}

const _isRefed = Symbol("isRefed");
const _dataPromise = Symbol("dataPromise");

class MessagePort extends EventTarget {
/** @type {number | null} */
[_id] = null;
/** @type {boolean} */
[_enabled] = false;
[_refed] = false;
/** @type {Promise<any> | undefined} */
[_dataPromise] = undefined;
[_messageEventListenerCount] = 0;

constructor() {
super();
Expand Down Expand Up @@ -193,24 +207,21 @@ class MessagePort extends EventTarget {
this[_enabled] = true;
while (true) {
if (this[_id] === null) break;
// Exit if no message event listeners are present in Node compat mode.
if (
typeof this[nodeWorkerThreadCloseCb] == "function" &&
messageEventListenerCount === 0
) break;
let data;
try {
data = await op_message_port_recv_message(
this[_dataPromise] = op_message_port_recv_message(
this[_id],
);
if (
typeof this[nodeWorkerThreadCloseCb] === "function" &&
!this[_refed]
) {
core.unrefOpPromise(this[_dataPromise]);
}
data = await this[_dataPromise];
this[_dataPromise] = undefined;
} catch (err) {
if (ObjectPrototypeIsPrototypeOf(InterruptedPrototype, err)) {
// If we were interrupted, check if the interruption is coming
// from `receiveMessageOnPort` API from Node compat, if so, continue.
if (this[MessagePortReceiveMessageOnPortSymbol]) {
this[MessagePortReceiveMessageOnPortSymbol] = false;
continue;
}
break;
}
nodeWorkerThreadMaybeInvokeCloseCb(this);
Expand Down Expand Up @@ -246,12 +257,26 @@ class MessagePort extends EventTarget {
}

[refMessagePort](ref) {
if (ref && !this[_refed]) {
this[_refed] = true;
messageEventListenerCount++;
} else if (!ref && this[_refed]) {
this[_refed] = false;
messageEventListenerCount = 0;
if (ref) {
if (!this[_refed]) {
refedMessagePortsCount++;
if (
this[_dataPromise]
) {
core.refOpPromise(this[_dataPromise]);
}
this[_refed] = true;
}
} else if (!ref) {
if (this[_refed]) {
refedMessagePortsCount--;
if (
this[_dataPromise]
) {
core.unrefOpPromise(this[_dataPromise]);
}
this[_refed] = false;
}
}
}

Expand All @@ -266,15 +291,20 @@ class MessagePort extends EventTarget {

removeEventListener(...args) {
if (args[0] == "message") {
messageEventListenerCount--;
if (--this[_messageEventListenerCount] === 0 && this[_refed]) {
refedMessagePortsCount--;
this[_refed] = false;
}
}
super.removeEventListener(...new SafeArrayIterator(args));
}

addEventListener(...args) {
if (args[0] == "message") {
messageEventListenerCount++;
if (!this[_refed]) this[_refed] = true;
if (++this[_messageEventListenerCount] === 1 && !this[_refed]) {
refedMessagePortsCount++;
this[_refed] = true;
}
}
super.addEventListener(...new SafeArrayIterator(args));
}
Expand All @@ -295,7 +325,17 @@ class MessagePort extends EventTarget {
}

defineEventHandler(MessagePort.prototype, "message", function (self) {
self.start();
if (self[nodeWorkerThreadCloseCb]) {
(async () => {
// delay `start()` until he end of this event loop turn, to give `receiveMessageOnPort`
// a chance to receive a message first. this is primarily to resolve an issue with
// a pattern used in `npm:piscina` that results in an indefinite hang
await PromiseResolve();
self.start();
})();
} else {
self.start();
}
});
defineEventHandler(MessagePort.prototype, "messageerror");

Expand Down Expand Up @@ -463,12 +503,12 @@ function structuredClone(value, options) {
export {
deserializeJsMessageData,
MessageChannel,
messageEventListenerCount,
MessagePort,
MessagePortIdSymbol,
MessagePortPrototype,
MessagePortReceiveMessageOnPortSymbol,
nodeWorkerThreadCloseCb,
refedMessagePortsCount,
serializeJsMessageData,
structuredClone,
};
1 change: 0 additions & 1 deletion ext/web/message_port.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,6 @@ pub fn op_message_port_recv_message_sync(
#[smi] rid: ResourceId,
) -> Result<Option<JsMessageData>, AnyError> {
let resource = state.resource_table.get::<MessagePortResource>(rid)?;
resource.cancel.cancel();
let mut rx = resource.port.rx.borrow_mut();

match rx.try_recv() {
Expand Down
5 changes: 4 additions & 1 deletion runtime/js/99_main.js
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,11 @@ let isClosing = false;
let globalDispatchEvent;

function hasMessageEventListener() {
// the function name is kind of a misnomer, but we want to behave
// as if we have message event listeners if a node message port is explicitly
// refed (and the inverse as well)
return event.listenerCount(globalThis, "message") > 0 ||
messagePort.messageEventListenerCount > 0;
messagePort.refedMessagePortsCount > 0;
}

async function pollForMessages() {
Expand Down
Loading