Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[issue tracker]: pipe dead lock in WebSocketProxy callback #42

Open
WSH032 opened this issue Jul 22, 2024 · 1 comment
Open

[issue tracker]: pipe dead lock in WebSocketProxy callback #42

WSH032 opened this issue Jul 22, 2024 · 1 comment
Labels
help wanted Extra attention is needed

Comments

@WSH032
Copy link
Owner

WSH032 commented Jul 22, 2024

I just found a bug. The current implementation only supports a strict one-receive-one-send mode within a single loop. If this pattern is violated, such as multiple receives and one send, one receive and multiple sends, or sending before receiving within a single loop, it will result in a deadlock.

async def callback(ctx: CallbackPipeContextType[str]) -> None:
    with ctx as (sender, receiver):
        # multiple receives and one send, dead lock!
        await receiver.receive()
        await receiver.receive()
        await sender.send("foo")

async def callback(ctx: CallbackPipeContextType[str]) -> None:
    with ctx as (sender, receiver):
        # one receive and multiple sends, dead lock!
        async for message in receiver:
            await sender.send("foo")
            await sender.send("bar")

async def callback(ctx: CallbackPipeContextType[str]) -> None:
    with ctx as (sender, receiver):
        # sending before receiving, dead lock!
        await sender.send("foo")
        async for message in receiver:
            await sender.send(message)

Unfortunately, we can't resolve this issue until the underlying logic is rewritten using anyio and memory-object-streams.

There is already a PR for anyio: #34. However, this PR still has many issues, and I currently don't have time to merge it.

Originally posted by @WSH032 in #41 (comment)

@WSH032 WSH032 added the help wanted Extra attention is needed label Jul 22, 2024
@WSH032
Copy link
Owner Author

WSH032 commented Jul 22, 2024

The current implementation processes message receiving and sending serially within the same task, which prevents multiple receives and sends.

async def _wait_client_then_send_to_server(
*,
client_ws: starlette_ws.WebSocket,
server_ws: httpx_ws.AsyncWebSocketSession,
pipe_context: Optional[CallbackPipeContextType[_WsMsgTypeVar]] = None,
) -> starlette_ws.WebSocketDisconnect:
"""Receive data from client, then send to target server.
Args:
client_ws: The websocket which receive data of client.
server_ws: The websocket which send data to target server.
pipe_context: The callback pipe for processing data.
will send the received data(from client) to the sender,
and receive the data from the receiver(then send to the server).
Returns:
If the client_ws sends a shutdown message normally, will return starlette_ws.WebSocketDisconnect.
Raises:
error for receiving: refer to `_starlette_ws_receive_bytes_or_str`
error for sending: refer to `_httpx_ws_send_bytes_or_str`
error for callback: refer to `MemoryObjectReceiveStream.receive` and `MemoryObjectSendStream.send`
"""
with pipe_context or nullcontext() as pipe:
while True:
try:
receive = await _starlette_ws_receive_bytes_or_str(client_ws)
except starlette_ws.WebSocketDisconnect as e:
return e
# TODO: do not use `if` statement in loop
if pipe is not None:
sender, receiver = pipe
# XXX, HACK, TODO: We can't identify the msg type from websocket,
# so we have to igonre the type check here.
await sender.send(receive) # pyright: ignore [reportArgumentType]
receive = await receiver.receive()
await _httpx_ws_send_bytes_or_str(server_ws, receive)
async def _wait_server_then_send_to_client(
*,
client_ws: starlette_ws.WebSocket,
server_ws: httpx_ws.AsyncWebSocketSession,
pipe_context: Optional[CallbackPipeContextType[_WsMsgTypeVar]] = None,
) -> httpx_ws.WebSocketDisconnect:
"""Receive data from target server, then send to client.
Args:
client_ws: The websocket which send data to client.
server_ws: The websocket which receive data of target server.
pipe_context: The callback pipe for processing data.
will send the received data(from server) to the sender,
and receive the data from the receiver(then send to the client).
Returns:
If the server_ws sends a shutdown message normally, will return httpx_ws.WebSocketDisconnect.
Raises:
error for receiving: refer to `_httpx_ws_receive_bytes_or_str`
error for sending: refer to `_starlette_ws_send_bytes_or_str`
error for callback: refer to `MemoryObjectReceiveStream.receive` and `MemoryObjectSendStream.send`
"""
with pipe_context or nullcontext() as pipe:
while True:
try:
receive = await _httpx_ws_receive_bytes_or_str(server_ws)
except httpx_ws.WebSocketDisconnect as e:
return e
# TODO: do not use `if` statement in loop
if pipe is not None:
sender, receiver = pipe
# XXX, HACK, TODO: We can't identify the msg type from websocket,
# so we have to igonre the type check here.
await sender.send(receive) # pyright: ignore [reportArgumentType]
receive = await receiver.receive()
await _starlette_ws_send_bytes_or_str(client_ws, receive)

To resolve this issue, we need to modify the underlying implementation by creating four asynchronous tasks specifically for client receive, client send, server receive, and server send, while using pipelines to pass messages between them.

However, managing so many asynchronous tasks robustly is a challenge. We must switch to the anyio backend and rely on its structured concurrency to manage the tasks.


I’m currently not motivated to resolve this issue because I’m unsure if there’s demand from the community.

If there is interest, PRs are welcome. Alternatively, you can give a thumbs up on the issue to indicate community feedback.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

1 participant