From 9e482258d271e4c3baae8904d57afe0e368236e4 Mon Sep 17 00:00:00 2001 From: Pavel Savara Date: Mon, 5 Feb 2024 09:21:40 +0100 Subject: [PATCH] [browser][ws] fix ConnectAsync_CancellationRequestedInflightConnect_ThrowsOperationCanceledException (#97868) Co-authored-by: campersau --- .../BrowserWebSockets/BrowserWebSocket.cs | 17 +- src/mono/browser/runtime/loader/exit.ts | 8 +- src/mono/browser/runtime/web-socket.ts | 173 +++++++++++------- 3 files changed, 113 insertions(+), 85 deletions(-) diff --git a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs index 478de9846cc254..6134d85656cc2c 100644 --- a/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs +++ b/src/libraries/System.Net.WebSockets.Client/src/System/Net/WebSockets/BrowserWebSockets/BrowserWebSocket.cs @@ -571,35 +571,22 @@ private async Task CancellationHelper(Task promise, CancellationToken cancellati lock (_lockObject) { var state = State; + ForceReadCloseStatusLocked(); if (state == WebSocketState.Aborted) { - ForceReadCloseStatusLocked(); throw new OperationCanceledException(nameof(WebSocketState.Aborted), ex); } - if (ex is OperationCanceledException) + if (ex is OperationCanceledException || cancellationToken.IsCancellationRequested || ex.Message == "Error: OperationCanceledException") { if(state != WebSocketState.Closed) { FastState = WebSocketState.Aborted; } _cancelled = true; - throw; - } - if (state != WebSocketState.Closed && cancellationToken.IsCancellationRequested) - { - FastState = WebSocketState.Aborted; - _cancelled = true; - throw new OperationCanceledException(cancellationToken); - } - if (state != WebSocketState.Closed && ex.Message == "Error: OperationCanceledException") - { - FastState = WebSocketState.Aborted; - _cancelled = true; throw new OperationCanceledException("The operation was cancelled.", ex, cancellationToken); } if (previousState == WebSocketState.Connecting) { - ForceReadCloseStatusLocked(); throw new WebSocketException(WebSocketError.Faulted, SR.net_webstatus_ConnectFailure, ex); } throw new WebSocketException(WebSocketError.NativeError, ex); diff --git a/src/mono/browser/runtime/loader/exit.ts b/src/mono/browser/runtime/loader/exit.ts index 9a9c78775cfd90..4e26e0467a2218 100644 --- a/src/mono/browser/runtime/loader/exit.ts +++ b/src/mono/browser/runtime/loader/exit.ts @@ -274,18 +274,18 @@ function logOnExit(exit_code: number, reason: any) { } } function unhandledrejection_handler(event: any) { - fatal_handler(event, event.reason); + fatal_handler(event, event.reason, "rejection"); } function error_handler(event: any) { - fatal_handler(event, event.error); + fatal_handler(event, event.error, "error"); } -function fatal_handler(event: any, reason: any) { +function fatal_handler(event: any, reason: any, type: string) { event.preventDefault(); try { if (!reason) { - reason = new Error("Unhandled"); + reason = new Error("Unhandled " + type); } if (reason.stack === undefined) { reason.stack = new Error().stack; diff --git a/src/mono/browser/runtime/web-socket.ts b/src/mono/browser/runtime/web-socket.ts index a5f783e0aeab80..b40e9e5daf4862 100644 --- a/src/mono/browser/runtime/web-socket.ts +++ b/src/mono/browser/runtime/web-socket.ts @@ -21,6 +21,7 @@ const wasm_ws_pending_receive_event_queue = Symbol.for("wasm ws_pending_receive_ const wasm_ws_pending_receive_promise_queue = Symbol.for("wasm ws_pending_receive_promise_queue"); const wasm_ws_pending_open_promise = Symbol.for("wasm ws_pending_open_promise"); const wasm_ws_pending_open_promise_used = Symbol.for("wasm wasm_ws_pending_open_promise_used"); +const wasm_ws_pending_error = Symbol.for("wasm wasm_ws_pending_error"); const wasm_ws_pending_close_promises = Symbol.for("wasm ws_pending_close_promises"); const wasm_ws_pending_send_promises = Symbol.for("wasm ws_pending_send_promises"); const wasm_ws_is_aborted = Symbol.for("wasm ws_is_aborted"); @@ -51,9 +52,9 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece try { ws = new globalThis.WebSocket(uri, sub_protocols || undefined) as WebSocketExtension; } - catch (e) { - mono_log_warn("WebSocket error", e); - throw e; + catch (error: any) { + mono_log_warn("WebSocket error in ws_wasm_create: " + error.toString()); + throw error; } const { promise_control: open_promise_control } = createPromiseController(); @@ -65,49 +66,69 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece ws[wasm_ws_receive_status_ptr] = receive_status_ptr; ws.binaryType = "arraybuffer"; const local_on_open = () => { - if (ws[wasm_ws_is_aborted]) return; - if (!loaderHelpers.is_runtime_running()) return; - open_promise_control.resolve(ws); - prevent_timer_throttling(); + try { + if (ws[wasm_ws_is_aborted]) return; + if (!loaderHelpers.is_runtime_running()) return; + open_promise_control.resolve(ws); + prevent_timer_throttling(); + } catch (error: any) { + mono_log_warn("failed to propagate WebSocket open event: " + error.toString()); + } }; const local_on_message = (ev: MessageEvent) => { - if (ws[wasm_ws_is_aborted]) return; - if (!loaderHelpers.is_runtime_running()) return; - _mono_wasm_web_socket_on_message(ws, ev); - prevent_timer_throttling(); + try { + if (ws[wasm_ws_is_aborted]) return; + if (!loaderHelpers.is_runtime_running()) return; + web_socket_on_message(ws, ev); + prevent_timer_throttling(); + } catch (error: any) { + mono_log_warn("failed to propagate WebSocket message event: " + error.toString()); + } }; const local_on_close = (ev: CloseEvent) => { - ws.removeEventListener("message", local_on_message); - if (ws[wasm_ws_is_aborted]) return; - if (!loaderHelpers.is_runtime_running()) return; + try { + ws.removeEventListener("message", local_on_message); + if (ws[wasm_ws_is_aborted]) return; + if (!loaderHelpers.is_runtime_running()) return; - ws[wasm_ws_close_received] = true; - ws["close_status"] = ev.code; - ws["close_status_description"] = ev.reason; + ws[wasm_ws_close_received] = true; + ws["close_status"] = ev.code; + ws["close_status_description"] = ev.reason; - // this reject would not do anything if there was already "open" before it. - open_promise_control.reject(new Error(ev.reason)); + if (ws[wasm_ws_pending_open_promise_used]) { + open_promise_control.reject(new Error(ev.reason)); + } - for (const close_promise_control of ws[wasm_ws_pending_close_promises]) { - close_promise_control.resolve(); - } + for (const close_promise_control of ws[wasm_ws_pending_close_promises]) { + close_promise_control.resolve(); + } - // send close to any pending receivers, to wake them - const receive_promise_queue = ws[wasm_ws_pending_receive_promise_queue]; - receive_promise_queue.drain((receive_promise_control) => { - setI32(receive_status_ptr, 0); // count - setI32(receive_status_ptr + 4, 2); // type:close - setI32(receive_status_ptr + 8, 1);// end_of_message: true - receive_promise_control.resolve(); - }); + // send close to any pending receivers, to wake them + const receive_promise_queue = ws[wasm_ws_pending_receive_promise_queue]; + receive_promise_queue.drain((receive_promise_control) => { + setI32(receive_status_ptr, 0); // count + setI32(receive_status_ptr + 4, 2); // type:close + setI32(receive_status_ptr + 8, 1);// end_of_message: true + receive_promise_control.resolve(); + }); + } catch (error: any) { + mono_log_warn("failed to propagate WebSocket close event: " + error.toString()); + } }; const local_on_error = (ev: any) => { - if (ws[wasm_ws_is_aborted]) return; - if (!loaderHelpers.is_runtime_running()) return; - ws.removeEventListener("message", local_on_message); - const error = new Error(ev.message || "WebSocket error"); - mono_log_warn("WebSocket error", error); - reject_promises(ws, error); + try { + if (ws[wasm_ws_is_aborted]) return; + if (!loaderHelpers.is_runtime_running()) return; + ws.removeEventListener("message", local_on_message); + const message = ev.message + ? "WebSocket error: " + ev.message + : "WebSocket error"; + mono_log_warn(message); + ws[wasm_ws_pending_error] = message; + reject_promises(ws, new Error(message)); + } catch (error: any) { + mono_log_warn("failed to propagate WebSocket error event: " + error.toString()); + } }; ws.addEventListener("message", local_on_message); ws.addEventListener("open", local_on_open, { once: true }); @@ -126,6 +147,9 @@ export function ws_wasm_create(uri: string, sub_protocols: string[] | null, rece export function ws_wasm_open(ws: WebSocketExtension): Promise | null { mono_assert(!!ws, "ERR17: expected ws instance"); + if (ws[wasm_ws_pending_error]) { + return rejectedPromise(ws[wasm_ws_pending_error]); + } const open_promise_control = ws[wasm_ws_pending_open_promise]; ws[wasm_ws_pending_open_promise_used] = true; return open_promise_control.promise; @@ -134,6 +158,9 @@ export function ws_wasm_open(ws: WebSocketExtension): Promise | null { mono_assert(!!ws, "ERR17: expected ws instance"); + if (ws[wasm_ws_pending_error]) { + return rejectedPromise(ws[wasm_ws_pending_error]); + } if (ws[wasm_ws_is_aborted] || ws[wasm_ws_close_sent]) { return rejectedPromise("InvalidState: The WebSocket is not connected."); } @@ -144,18 +171,22 @@ export function ws_wasm_send(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer } const buffer_view = new Uint8Array(localHeapViewU8().buffer, buffer_ptr, buffer_length); - const whole_buffer = _mono_wasm_web_socket_send_buffering(ws, buffer_view, message_type, end_of_message); + const whole_buffer = web_socket_send_buffering(ws, buffer_view, message_type, end_of_message); if (!end_of_message || !whole_buffer) { return resolvedPromise(); } - return _mono_wasm_web_socket_send_and_wait(ws, whole_buffer); + return web_socket_send_and_wait(ws, whole_buffer); } export function ws_wasm_receive(ws: WebSocketExtension, buffer_ptr: VoidPtr, buffer_length: number): Promise | null { mono_assert(!!ws, "ERR18: expected ws instance"); + if (ws[wasm_ws_pending_error]) { + return rejectedPromise(ws[wasm_ws_pending_error]); + } + // we can't quickly return if wasm_ws_close_received==true, because there could be pending messages if (ws[wasm_ws_is_aborted]) { const receive_status_ptr = ws[wasm_ws_receive_status_ptr]; @@ -171,7 +202,7 @@ export function ws_wasm_receive(ws: WebSocketExtension, buffer_ptr: VoidPtr, buf if (receive_event_queue.getLength()) { mono_assert(receive_promise_queue.getLength() == 0, "ERR20: Invalid WS state"); - _mono_wasm_web_socket_receive_buffering(ws, receive_event_queue, buffer_ptr, buffer_length); + web_socket_receive_buffering(ws, receive_event_queue, buffer_ptr, buffer_length); return resolvedPromise(); } @@ -200,6 +231,9 @@ export function ws_wasm_close(ws: WebSocketExtension, code: number, reason: stri if (ws[wasm_ws_is_aborted] || ws[wasm_ws_close_sent] || ws.readyState == WebSocket.CLOSED) { return resolvedPromise(); } + if (ws[wasm_ws_pending_error]) { + return rejectedPromise(ws[wasm_ws_pending_error]); + } ws[wasm_ws_close_sent] = true; if (wait_for_close_received) { const { promise, promise_control } = createPromiseController(); @@ -235,8 +269,8 @@ export function ws_wasm_abort(ws: WebSocketExtension): void { try { // this is different from Managed implementation ws.close(1000, "Connection was aborted."); - } catch (error) { - mono_log_warn("WebSocket error while aborting", error); + } catch (error: any) { + mono_log_warn("WebSocket error in ws_wasm_abort: " + error.toString()); } } @@ -263,7 +297,7 @@ function reject_promises(ws: WebSocketExtension, error: Error) { } // send and return promise -function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view: Uint8Array | string): Promise | null { +function web_socket_send_and_wait(ws: WebSocketExtension, buffer_view: Uint8Array | string): Promise | null { ws.send(buffer_view); ws[wasm_ws_pending_send_buffer] = null; @@ -281,28 +315,34 @@ function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view let nextDelay = 1; const polling_check = () => { - // was it all sent yet ? - if (ws.bufferedAmount === 0) { - promise_control.resolve(); - } - else { - const readyState = ws.readyState; - if (readyState != WebSocket.OPEN && readyState != WebSocket.CLOSING) { - // only reject if the data were not sent - // bufferedAmount does not reset to zero once the connection closes - promise_control.reject(new Error(`InvalidState: ${readyState} The WebSocket is not connected.`)); + try { + // was it all sent yet ? + if (ws.bufferedAmount === 0) { + promise_control.resolve(); + } + else { + const readyState = ws.readyState; + if (readyState != WebSocket.OPEN && readyState != WebSocket.CLOSING) { + // only reject if the data were not sent + // bufferedAmount does not reset to zero once the connection closes + promise_control.reject(new Error(`InvalidState: ${readyState} The WebSocket is not connected.`)); + } + else if (!promise_control.isDone) { + globalThis.setTimeout(polling_check, nextDelay); + // exponentially longer delays, up to 1000ms + nextDelay = Math.min(nextDelay * 1.5, 1000); + return; + } } - else if (!promise_control.isDone) { - globalThis.setTimeout(polling_check, nextDelay); - // exponentially longer delays, up to 1000ms - nextDelay = Math.min(nextDelay * 1.5, 1000); - return; + // remove from pending + const index = pending.indexOf(promise_control); + if (index > -1) { + pending.splice(index, 1); } } - // remove from pending - const index = pending.indexOf(promise_control); - if (index > -1) { - pending.splice(index, 1); + catch (error: any) { + mono_log_warn("WebSocket error in web_socket_send_and_wait: " + error.toString()); + promise_control.reject(error); } }; @@ -311,7 +351,7 @@ function _mono_wasm_web_socket_send_and_wait(ws: WebSocketExtension, buffer_view return promise; } -function _mono_wasm_web_socket_on_message(ws: WebSocketExtension, event: MessageEvent) { +function web_socket_on_message(ws: WebSocketExtension, event: MessageEvent) { const event_queue = ws[wasm_ws_pending_receive_event_queue]; const promise_queue = ws[wasm_ws_pending_receive_promise_queue]; @@ -340,14 +380,14 @@ function _mono_wasm_web_socket_on_message(ws: WebSocketExtension, event: Message } while (promise_queue.getLength() && event_queue.getLength()) { const promise_control = promise_queue.dequeue()!; - _mono_wasm_web_socket_receive_buffering(ws, event_queue, + web_socket_receive_buffering(ws, event_queue, promise_control.buffer_ptr, promise_control.buffer_length); promise_control.resolve(); } prevent_timer_throttling(); } -function _mono_wasm_web_socket_receive_buffering(ws: WebSocketExtension, event_queue: Queue, buffer_ptr: VoidPtr, buffer_length: number) { +function web_socket_receive_buffering(ws: WebSocketExtension, event_queue: Queue, buffer_ptr: VoidPtr, buffer_length: number) { const event = event_queue.peek(); const count = Math.min(buffer_length, event.data.length - event.offset); @@ -367,7 +407,7 @@ function _mono_wasm_web_socket_receive_buffering(ws: WebSocketExtension, event_q setI32(response_ptr + 8, end_of_message); } -function _mono_wasm_web_socket_send_buffering(ws: WebSocketExtension, buffer_view: Uint8Array, message_type: number, end_of_message: boolean): Uint8Array | string | null { +function web_socket_send_buffering(ws: WebSocketExtension, buffer_view: Uint8Array, message_type: number, end_of_message: boolean): Uint8Array | string | null { let buffer = ws[wasm_ws_pending_send_buffer]; let offset = 0; const length = buffer_view.byteLength; @@ -438,6 +478,7 @@ type WebSocketExtension = WebSocket & { [wasm_ws_pending_open_promise_used]: boolean [wasm_ws_pending_send_promises]: PromiseController[] [wasm_ws_pending_close_promises]: PromiseController[] + [wasm_ws_pending_error]: string | undefined [wasm_ws_is_aborted]: boolean [wasm_ws_close_received]: boolean [wasm_ws_close_sent]: boolean @@ -476,7 +517,7 @@ function resolvedPromise(): Promise | null { } } -function rejectedPromise(message: string): Promise | null { +function rejectedPromise(message: string): Promise | null { const resolved = Promise.reject(new Error(message)); return wrap_as_cancelable(resolved); }