Skip to content

Commit

Permalink
[browser][ws] fix ConnectAsync_CancellationRequestedInflightConnect_T…
Browse files Browse the repository at this point in the history
…hrowsOperationCanceledException (#97868)

Co-authored-by: campersau <buchholz.bastian@googlemail.com>
  • Loading branch information
pavelsavara and campersau authored Feb 5, 2024
1 parent f8629e6 commit 9e48225
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -571,35 +571,22 @@ private async Task CancellationHelper(Task promise, CancellationToken cancellati
lock (_lockObject)
{
var state = State;
ForceReadCloseStatusLocked();
if (state == WebSocketState.Aborted)
{
ForceReadCloseStatusLocked();
throw new OperationCanceledException(nameof(WebSocketState.Aborted), ex);
}
if (ex is OperationCanceledException)
if (ex is OperationCanceledException || cancellationToken.IsCancellationRequested || ex.Message == "Error: OperationCanceledException")
{
if(state != WebSocketState.Closed)
{
FastState = WebSocketState.Aborted;
}
_cancelled = true;
throw;
}
if (state != WebSocketState.Closed && cancellationToken.IsCancellationRequested)
{
FastState = WebSocketState.Aborted;
_cancelled = true;
throw new OperationCanceledException(cancellationToken);
}
if (state != WebSocketState.Closed && ex.Message == "Error: OperationCanceledException")
{
FastState = WebSocketState.Aborted;
_cancelled = true;
throw new OperationCanceledException("The operation was cancelled.", ex, cancellationToken);
}
if (previousState == WebSocketState.Connecting)
{
ForceReadCloseStatusLocked();
throw new WebSocketException(WebSocketError.Faulted, SR.net_webstatus_ConnectFailure, ex);
}
throw new WebSocketException(WebSocketError.NativeError, ex);
Expand Down
8 changes: 4 additions & 4 deletions src/mono/browser/runtime/loader/exit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -274,18 +274,18 @@ function logOnExit(exit_code: number, reason: any) {
}
}
function unhandledrejection_handler(event: any) {
fatal_handler(event, event.reason);
fatal_handler(event, event.reason, "rejection");
}

function error_handler(event: any) {
fatal_handler(event, event.error);
fatal_handler(event, event.error, "error");
}

function fatal_handler(event: any, reason: any) {
function fatal_handler(event: any, reason: any, type: string) {
event.preventDefault();
try {
if (!reason) {
reason = new Error("Unhandled");
reason = new Error("Unhandled " + type);
}
if (reason.stack === undefined) {
reason.stack = new Error().stack;
Expand Down
173 changes: 107 additions & 66 deletions src/mono/browser/runtime/web-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const wasm_ws_pending_receive_event_queue = Symbol.for("wasm ws_pending_receive_
const wasm_ws_pending_receive_promise_queue = Symbol.for("wasm ws_pending_receive_promise_queue");
const wasm_ws_pending_open_promise = Symbol.for("wasm ws_pending_open_promise");
const wasm_ws_pending_open_promise_used = Symbol.for("wasm wasm_ws_pending_open_promise_used");
const wasm_ws_pending_error = Symbol.for("wasm wasm_ws_pending_error");
const wasm_ws_pending_close_promises = Symbol.for("wasm ws_pending_close_promises");
const wasm_ws_pending_send_promises = Symbol.for("wasm ws_pending_send_promises");
const wasm_ws_is_aborted = Symbol.for("wasm ws_is_aborted");
Expand Down Expand Up @@ -51,9 +52,9 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece
try {
ws = new globalThis.WebSocket(uri, sub_protocols || undefined) as WebSocketExtension;
}
catch (e) {
mono_log_warn("WebSocket error", e);
throw e;
catch (error: any) {
mono_log_warn("WebSocket error in ws_wasm_create: " + error.toString());
throw error;
}
const { promise_control: open_promise_control } = createPromiseController<WebSocketExtension>();

Expand All @@ -65,49 +66,69 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece
ws[wasm_ws_receive_status_ptr] = receive_status_ptr;
ws.binaryType = "arraybuffer";
const local_on_open = () => {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
open_promise_control.resolve(ws);
prevent_timer_throttling();
try {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
open_promise_control.resolve(ws);
prevent_timer_throttling();
} catch (error: any) {
mono_log_warn("failed to propagate WebSocket open event: " + error.toString());
}
};
const local_on_message = (ev: MessageEvent) => {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
_mono_wasm_web_socket_on_message(ws, ev);
prevent_timer_throttling();
try {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
web_socket_on_message(ws, ev);
prevent_timer_throttling();
} catch (error: any) {
mono_log_warn("failed to propagate WebSocket message event: " + error.toString());
}
};
const local_on_close = (ev: CloseEvent) => {
ws.removeEventListener("message", local_on_message);
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
try {
ws.removeEventListener("message", local_on_message);
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;

ws[wasm_ws_close_received] = true;
ws["close_status"] = ev.code;
ws["close_status_description"] = ev.reason;
ws[wasm_ws_close_received] = true;
ws["close_status"] = ev.code;
ws["close_status_description"] = ev.reason;

// this reject would not do anything if there was already "open" before it.
open_promise_control.reject(new Error(ev.reason));
if (ws[wasm_ws_pending_open_promise_used]) {
open_promise_control.reject(new Error(ev.reason));
}

for (const close_promise_control of ws[wasm_ws_pending_close_promises]) {
close_promise_control.resolve();
}
for (const close_promise_control of ws[wasm_ws_pending_close_promises]) {
close_promise_control.resolve();
}

// send close to any pending receivers, to wake them
const receive_promise_queue = ws[wasm_ws_pending_receive_promise_queue];
receive_promise_queue.drain((receive_promise_control) => {
setI32(receive_status_ptr, 0); // count
setI32(<any>receive_status_ptr + 4, 2); // type:close
setI32(<any>receive_status_ptr + 8, 1);// end_of_message: true
receive_promise_control.resolve();
});
// send close to any pending receivers, to wake them
const receive_promise_queue = ws[wasm_ws_pending_receive_promise_queue];
receive_promise_queue.drain((receive_promise_control) => {
setI32(receive_status_ptr, 0); // count
setI32(<any>receive_status_ptr + 4, 2); // type:close
setI32(<any>receive_status_ptr + 8, 1);// end_of_message: true
receive_promise_control.resolve();
});
} catch (error: any) {
mono_log_warn("failed to propagate WebSocket close event: " + error.toString());
}
};
const local_on_error = (ev: any) => {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
ws.removeEventListener("message", local_on_message);
const error = new Error(ev.message || "WebSocket error");
mono_log_warn("WebSocket error", error);
reject_promises(ws, error);
try {
if (ws[wasm_ws_is_aborted]) return;
if (!loaderHelpers.is_runtime_running()) return;
ws.removeEventListener("message", local_on_message);
const message = ev.message
? "WebSocket error: " + ev.message
: "WebSocket error";
mono_log_warn(message);
ws[wasm_ws_pending_error] = message;
reject_promises(ws, new Error(message));
} catch (error: any) {
mono_log_warn("failed to propagate WebSocket error event: " + error.toString());
}
};
ws.addEventListener("message", local_on_message);
ws.addEventListener("open", local_on_open, { once: true });
Expand All @@ -126,6 +147,9 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece

export function ws_wasm_open(ws: WebSocketExtension): Promise<WebSocketExtension> | null {
mono_assert(!!ws, "ERR17: expected ws instance");
if (ws[wasm_ws_pending_error]) {
return rejectedPromise(ws[wasm_ws_pending_error]);
}
const open_promise_control = ws[wasm_ws_pending_open_promise];
ws[wasm_ws_pending_open_promise_used] = true;
return open_promise_control.promise;
Expand All @@ -134,6 +158,9 @@ export function ws_wasm_open(ws: WebSocketExtension): Promise<WebSocketExtension
export function ws_wasm_send(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer_length: number, message_type: number, end_of_message: boolean): Promise<void> | null {
mono_assert(!!ws, "ERR17: expected ws instance");

if (ws[wasm_ws_pending_error]) {
return rejectedPromise(ws[wasm_ws_pending_error]);
}
if (ws[wasm_ws_is_aborted] || ws[wasm_ws_close_sent]) {
return rejectedPromise("InvalidState: The WebSocket is not connected.");
}
Expand All @@ -144,18 +171,22 @@ export function ws_wasm_send(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer
}

const buffer_view = new Uint8Array(localHeapViewU8().buffer, <any>buffer_ptr, buffer_length);
const whole_buffer = _mono_wasm_web_socket_send_buffering(ws, buffer_view, message_type, end_of_message);
const whole_buffer = web_socket_send_buffering(ws, buffer_view, message_type, end_of_message);

if (!end_of_message || !whole_buffer) {
return resolvedPromise();
}

return _mono_wasm_web_socket_send_and_wait(ws, whole_buffer);
return web_socket_send_and_wait(ws, whole_buffer);
}

export function ws_wasm_receive(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer_length: number): Promise<void> | null {
mono_assert(!!ws, "ERR18: expected ws instance");

if (ws[wasm_ws_pending_error]) {
return rejectedPromise(ws[wasm_ws_pending_error]);
}

// we can't quickly return if wasm_ws_close_received==true, because there could be pending messages
if (ws[wasm_ws_is_aborted]) {
const receive_status_ptr = ws[wasm_ws_receive_status_ptr];
Expand All @@ -171,7 +202,7 @@ export function ws_wasm_receive(ws: WebSocketExtension, buffer_ptr: VoidPtr, buf
if (receive_event_queue.getLength()) {
mono_assert(receive_promise_queue.getLength() == 0, "ERR20: Invalid WS state");

_mono_wasm_web_socket_receive_buffering(ws, receive_event_queue, buffer_ptr, buffer_length);
web_socket_receive_buffering(ws, receive_event_queue, buffer_ptr, buffer_length);

return resolvedPromise();
}
Expand Down Expand Up @@ -200,6 +231,9 @@ export function ws_wasm_close(ws: WebSocketExtension, code: number, reason: stri
if (ws[wasm_ws_is_aborted] || ws[wasm_ws_close_sent] || ws.readyState == WebSocket.CLOSED) {
return resolvedPromise();
}
if (ws[wasm_ws_pending_error]) {
return rejectedPromise(ws[wasm_ws_pending_error]);
}
ws[wasm_ws_close_sent] = true;
if (wait_for_close_received) {
const { promise, promise_control } = createPromiseController<void>();
Expand Down Expand Up @@ -235,8 +269,8 @@ export function ws_wasm_abort(ws: WebSocketExtension): void {
try {
// this is different from Managed implementation
ws.close(1000, "Connection was aborted.");
} catch (error) {
mono_log_warn("WebSocket error while aborting", error);
} catch (error: any) {
mono_log_warn("WebSocket error in ws_wasm_abort: " + error.toString());
}
}

Expand All @@ -263,7 +297,7 @@ function reject_promises(ws: WebSocketExtension, error: Error) {
}

// send and return promise
function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view: Uint8Array | string): Promise<void> | null {
function web_socket_send_and_wait(ws: WebSocketExtension, buffer_view: Uint8Array | string): Promise<void> | null {
ws.send(buffer_view);
ws[wasm_ws_pending_send_buffer] = null;

Expand All @@ -281,28 +315,34 @@ function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view

let nextDelay = 1;
const polling_check = () => {
// was it all sent yet ?
if (ws.bufferedAmount === 0) {
promise_control.resolve();
}
else {
const readyState = ws.readyState;
if (readyState != WebSocket.OPEN && readyState != WebSocket.CLOSING) {
// only reject if the data were not sent
// bufferedAmount does not reset to zero once the connection closes
promise_control.reject(new Error(`InvalidState: ${readyState} The WebSocket is not connected.`));
try {
// was it all sent yet ?
if (ws.bufferedAmount === 0) {
promise_control.resolve();
}
else {
const readyState = ws.readyState;
if (readyState != WebSocket.OPEN && readyState != WebSocket.CLOSING) {
// only reject if the data were not sent
// bufferedAmount does not reset to zero once the connection closes
promise_control.reject(new Error(`InvalidState: ${readyState} The WebSocket is not connected.`));
}
else if (!promise_control.isDone) {
globalThis.setTimeout(polling_check, nextDelay);
// exponentially longer delays, up to 1000ms
nextDelay = Math.min(nextDelay * 1.5, 1000);
return;
}
}
else if (!promise_control.isDone) {
globalThis.setTimeout(polling_check, nextDelay);
// exponentially longer delays, up to 1000ms
nextDelay = Math.min(nextDelay * 1.5, 1000);
return;
// remove from pending
const index = pending.indexOf(promise_control);
if (index > -1) {
pending.splice(index, 1);
}
}
// remove from pending
const index = pending.indexOf(promise_control);
if (index > -1) {
pending.splice(index, 1);
catch (error: any) {
mono_log_warn("WebSocket error in web_socket_send_and_wait: " + error.toString());
promise_control.reject(error);
}
};

Expand All @@ -311,7 +351,7 @@ function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view
return promise;
}

function _mono_wasm_web_socket_on_message(ws: WebSocketExtension, event: MessageEvent) {
function web_socket_on_message(ws: WebSocketExtension, event: MessageEvent) {
const event_queue = ws[wasm_ws_pending_receive_event_queue];
const promise_queue = ws[wasm_ws_pending_receive_promise_queue];

Expand Down Expand Up @@ -340,14 +380,14 @@ function _mono_wasm_web_socket_on_message(ws: WebSocketExtension, event: Message
}
while (promise_queue.getLength() && event_queue.getLength()) {
const promise_control = promise_queue.dequeue()!;
_mono_wasm_web_socket_receive_buffering(ws, event_queue,
web_socket_receive_buffering(ws, event_queue,
promise_control.buffer_ptr, promise_control.buffer_length);
promise_control.resolve();
}
prevent_timer_throttling();
}

function _mono_wasm_web_socket_receive_buffering(ws: WebSocketExtension, event_queue: Queue<any>, buffer_ptr: VoidPtr, buffer_length: number) {
function web_socket_receive_buffering(ws: WebSocketExtension, event_queue: Queue<any>, buffer_ptr: VoidPtr, buffer_length: number) {
const event = event_queue.peek();

const count = Math.min(buffer_length, event.data.length - event.offset);
Expand All @@ -367,7 +407,7 @@ function _mono_wasm_web_socket_receive_buffering(ws: WebSocketExtension, event_q
setI32(<any>response_ptr + 8, end_of_message);
}

function _mono_wasm_web_socket_send_buffering(ws: WebSocketExtension, buffer_view: Uint8Array, message_type: number, end_of_message: boolean): Uint8Array | string | null {
function web_socket_send_buffering(ws: WebSocketExtension, buffer_view: Uint8Array, message_type: number, end_of_message: boolean): Uint8Array | string | null {
let buffer = ws[wasm_ws_pending_send_buffer];
let offset = 0;
const length = buffer_view.byteLength;
Expand Down Expand Up @@ -438,6 +478,7 @@ type WebSocketExtension = WebSocket & {
[wasm_ws_pending_open_promise_used]: boolean
[wasm_ws_pending_send_promises]: PromiseController<void>[]
[wasm_ws_pending_close_promises]: PromiseController<void>[]
[wasm_ws_pending_error]: string | undefined
[wasm_ws_is_aborted]: boolean
[wasm_ws_close_received]: boolean
[wasm_ws_close_sent]: boolean
Expand Down Expand Up @@ -476,7 +517,7 @@ function resolvedPromise(): Promise<void> | null {
}
}

function rejectedPromise(message: string): Promise<void> | null {
function rejectedPromise(message: string): Promise<any> | null {
const resolved = Promise.reject(new Error(message));
return wrap_as_cancelable<void>(resolved);
}

0 comments on commit 9e48225

Please sign in to comment.