Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use PipeChannels or FdStreams to make worker processes natively async #1783

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
2fcf89d
Create _worker_processes.py
richardsheridan Oct 28, 2020
2c33843
Create test_worker_process.py and fix revealed bugs
richardsheridan Oct 29, 2020
62a28a5
improve tests, reduce race condition
richardsheridan Oct 29, 2020
041370d
Fixups from test failures
richardsheridan Oct 29, 2020
0740307
Better way to raise BrokenWorkerError
richardsheridan Oct 29, 2020
440153a
Py3.6 compat for kill method
richardsheridan Oct 31, 2020
7117c3e
prevent excessive lock releases
richardsheridan Oct 31, 2020
b9f5076
revamp cache pruning test
richardsheridan Oct 31, 2020
8e39143
don't cover test helpers that happen to be in the global namespace
richardsheridan Oct 31, 2020
52ca536
attempt to fix Alpine CI timeout and get a real traceback
richardsheridan Oct 31, 2020
558b58c
give segfault test a little longer timeout for CI
richardsheridan Nov 1, 2020
7369314
mark tests slow
richardsheridan Nov 1, 2020
46fc931
the cache is now a deque
richardsheridan Nov 1, 2020
3247502
remove unnecessary import time error possibility
richardsheridan Nov 1, 2020
a4a209a
match worker and thread names
richardsheridan Nov 1, 2020
2b41da4
comment for sleeping during exception handler
richardsheridan Nov 1, 2020
a8291a9
check correct worker is pruned
richardsheridan Nov 2, 2020
473bf6f
clean up imports
richardsheridan Nov 2, 2020
3d2d3ed
add alternative segfault method
richardsheridan Nov 2, 2020
8bcbf78
reformat and go back to using CI timeout
richardsheridan Nov 2, 2020
2e57398
avoid multierror, but can't race segfault types
richardsheridan Nov 2, 2020
eea2acb
sometimes stack overflow is a memory error instead of a segfault
richardsheridan Nov 2, 2020
0c597b8
cleanups
richardsheridan Nov 6, 2020
742f412
Don't join on kill
richardsheridan Nov 6, 2020
92a6a80
Don't double-check life of cached worker process
richardsheridan Nov 6, 2020
4c2b770
eliminate worker timeout race
richardsheridan Nov 7, 2020
a72cdd8
fix flakiness due to slow CI
richardsheridan Nov 8, 2020
f074ef8
fix flakiness due to slow CI, continued
richardsheridan Nov 8, 2020
0f59d9d
remove redundant test
richardsheridan Nov 17, 2020
69fe295
rename prune and PROC_CACHE
richardsheridan Nov 21, 2020
33a4c4d
relative imports
richardsheridan Nov 21, 2020
3dc7e06
child monitor task is apparently unnecessary
richardsheridan Nov 21, 2020
6b29e15
optimize barrier use in is_alive
richardsheridan Nov 21, 2020
e560eca
see if any platforms are persistently hanging
richardsheridan Nov 21, 2020
8abac5b
see if the monitor helps on pypy
richardsheridan Nov 21, 2020
d42b9a5
avoid multierror on child death
richardsheridan Nov 21, 2020
e9040d6
xfail on flaky segfault behavior
richardsheridan Nov 22, 2020
076cdf8
don't check _work coverage because it only happens in the subprocess
richardsheridan Nov 22, 2020
e0685f6
boost coverage
richardsheridan Nov 22, 2020
e9b204d
boost coverage and xfail on TooSlowError
richardsheridan Nov 22, 2020
29e6221
more coverage
richardsheridan Nov 22, 2020
9a2f094
ensure death on pop
richardsheridan Nov 22, 2020
1397e9a
expose API
richardsheridan Nov 22, 2020
07c20be
Docs and newsfragment
richardsheridan Nov 22, 2020
bed71fe
Expose new error
richardsheridan Nov 22, 2020
bb464ff
fix docstring warning
richardsheridan Nov 22, 2020
9260def
fix and test rare cancellation cleanup
richardsheridan Nov 28, 2020
4253762
flake-free test to raise BrokenWorkerError
richardsheridan Dec 16, 2020
e9bd34b
try just very long single segfault timeout
richardsheridan Dec 16, 2020
f7f2eb7
prevent another obscure race
richardsheridan Dec 18, 2020
f6b9187
move comment
richardsheridan Dec 18, 2020
91280a3
base synchronization purely on pipe behavior
richardsheridan Dec 18, 2020
1195190
close race condition between timeout and is_alive()==False
richardsheridan Dec 18, 2020
8cf7610
move join to method, only really needed for tests
richardsheridan Dec 18, 2020
e4801ba
Close another race condition
richardsheridan Dec 18, 2020
a139b1a
Use Windows message-oriented pipes to create PipeSendChannels
richardsheridan Oct 28, 2020
f9fc4e1
improve coverage
richardsheridan Oct 31, 2020
41e3965
more coverage
richardsheridan Oct 31, 2020
90ae32f
style and comments
richardsheridan Oct 31, 2020
a839bca
Optimize large messages by avoiding copies
richardsheridan Dec 10, 2020
5e7c3ff
cover small message branch
richardsheridan Dec 10, 2020
fd4cd8c
resolve disagreement between black versions :(
richardsheridan Dec 10, 2020
e303c23
detect conflict earlier, prevent message races
richardsheridan Dec 15, 2020
4ba5c30
Convert workers to use PipeSendChannels or FdStreams
richardsheridan Nov 24, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/source/reference-core.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1816,6 +1816,45 @@ to spawn a child thread, and then use a :ref:`memory channel

.. literalinclude:: reference-core/from-thread-example.py

.. _worker_processes:

Worker Processes
----------------

Given that Python (and CPython in particular) has ongoing difficulties with
CPU-bound work, Trio provides a method to dispatch synchronous function execution to
special subprocesses known as "Worker Processes". By default, Trio will create as many
workers as the system has CPUs (as reported by :func:`os.cpu_count`), allowing fair
and truly parallel dispatch of CPU-bound work. As with Trio threads, these processes
are cached in a process pool to minimize latency and resource usage. Despite this,
executing a function in a process is at best an order of magnitude slower than in
a thread, and possibly even slower when dealing with large arguments or a cold pool.
Therefore, we recommend avoiding worker process dispatch for functions with a
duration of less than about 10 ms.

Unlike threads, subprocesses are strongly isolated from the parent process, which
allows two important features that cannot be portably implemented in threads:

- Forceful cancellation: a deadlocked call or infinite loop can be cancelled
by completely terminating the process.
- Protection from errors: if a call segfaults or an extension module has an
unrecoverable error, the worker may die but Trio will raise
:exc:`BrokenWorkerError` and carry on.

In both cases the workers die suddenly and violently, and at an unpredictable point
in the execution of the dispatched function, so avoid using the cancellation feature
if loss of intermediate results, writes to the filesystem, or shared memory writes
may leave the larger system in an incoherent state.

.. module:: trio.to_process
.. currentmodule:: trio

Putting CPU-bound functions in worker processes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: trio.to_process.run_sync

.. autofunction:: trio.to_process.current_default_process_limiter

Exceptions and warnings
-----------------------
Expand All @@ -1834,6 +1873,8 @@ Exceptions and warnings

.. autoexception:: BrokenResourceError

.. autoexception:: BrokenWorkerError

.. autoexception:: RunFinishedError

.. autoexception:: TrioInternalError
Expand Down
4 changes: 4 additions & 0 deletions newsfragments/1781.headline.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Trio now provides `multiprocessing` based worker processes for the delegation
of cpu-bound work via :func:`trio.to_process.run_sync`. The API is comparable to
:func:`trio.to_thread.run_sync` but true cancellation is achieved by killing the
worker process.
3 changes: 3 additions & 0 deletions trio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,15 @@

from ._deprecate import TrioDeprecationWarning

from ._worker_processes import BrokenWorkerError

# Submodules imported by default
from . import lowlevel
from . import socket
from . import abc
from . import from_thread
from . import to_thread
from . import to_process

# Not imported by default, but mentioned here so static analysis tools like
# pylint will know that it exists.
Expand Down
45 changes: 45 additions & 0 deletions trio/_core/_windows_cffi.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,25 @@
LPOVERLAPPED lpOverlapped
);

BOOL PeekNamedPipe(
HANDLE hNamedPipe,
LPVOID lpBuffer,
DWORD nBufferSize,
LPDWORD lpBytesRead,
LPDWORD lpTotalBytesAvail,
LPDWORD lpBytesLeftThisMessage
);

BOOL GetNamedPipeHandleStateA(
HANDLE hNamedPipe,
LPDWORD lpState,
LPDWORD lpCurInstances,
LPDWORD lpMaxCollectionCount,
LPDWORD lpCollectDataTimeout,
LPSTR lpUserName,
DWORD nMaxUserNameSize
);

// From https://github.com/piscisaureus/wepoll/blob/master/src/afd.h
typedef struct _AFD_POLL_HANDLE_INFO {
HANDLE Handle;
Expand Down Expand Up @@ -245,6 +264,7 @@ class ErrorCodes(enum.IntEnum):
ERROR_INVALID_PARMETER = 87
ERROR_NOT_FOUND = 1168
ERROR_NOT_SOCKET = 10038
ERROR_MORE_DATA = 234


class FileFlags(enum.IntEnum):
Expand Down Expand Up @@ -296,6 +316,13 @@ class IoControlCodes(enum.IntEnum):
IOCTL_AFD_POLL = 0x00012024


class PipeModes(enum.IntFlag):
PIPE_WAIT = 0x00000000
PIPE_NOWAIT = 0x00000001
PIPE_READMODE_BYTE = 0x00000000
PIPE_READMODE_MESSAGE = 0x00000002


################################################################
# Generic helpers
################################################################
Expand All @@ -321,3 +348,21 @@ def raise_winerror(winerror=None, *, filename=None, filename2=None):
_, msg = ffi.getwinerror(winerror)
# https://docs.python.org/3/library/exceptions.html#OSError
raise OSError(0, msg, filename, winerror, filename2)


def get_pipe_state(handle):
lpState = ffi.new("LPDWORD")
if not kernel32.GetNamedPipeHandleStateA(
_handle(handle), lpState, ffi.NULL, ffi.NULL, ffi.NULL, ffi.NULL, 0
):
raise_winerror() # pragma: no cover
return lpState[0]


def peek_pipe_message_left(handle):
left = ffi.new("LPDWORD")
if not kernel32.PeekNamedPipe(
_handle(handle), ffi.NULL, 0, ffi.NULL, ffi.NULL, left
):
raise_winerror() # pragma: no cover
return left[0]
78 changes: 76 additions & 2 deletions trio/_windows_pipes.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import sys
from typing import TYPE_CHECKING
from . import _core
from ._abc import SendStream, ReceiveStream
from ._abc import SendStream, ReceiveStream, SendChannel, ReceiveChannel
from ._util import ConflictDetector, Final
from ._core._windows_cffi import _handle, raise_winerror, kernel32, ffi
from ._core._windows_cffi import (
_handle,
raise_winerror,
kernel32,
ffi,
ErrorCodes,
peek_pipe_message_left,
)

assert sys.platform == "win32" or not TYPE_CHECKING

Expand Down Expand Up @@ -132,3 +139,70 @@ async def receive_some(self, max_bytes=None) -> bytes:

async def aclose(self):
await self._handle_holder.aclose()


class PipeSendChannel(SendChannel[bytes]):
"""Represents a message stream over a pipe object."""

def __init__(self, handle: int) -> None:
self._pss = PipeSendStream(handle)
# needed for "detach" via _handle_holder.handle = -1
self._handle_holder = self._pss._handle_holder

async def send(self, value: bytes):
# Works just fine if the pipe is message-oriented
await self._pss.send_all(value)

async def aclose(self):
await self._handle_holder.aclose()


class PipeReceiveChannel(ReceiveChannel[bytes]):
"""Represents a message stream over a pipe object."""

def __init__(self, handle: int) -> None:
self._handle_holder = _HandleHolder(handle)
self._conflict_detector = ConflictDetector(
"another task is currently using this pipe"
)

async def receive(self) -> bytes:
with self._conflict_detector:
buffer = bytearray(DEFAULT_RECEIVE_SIZE)
try:
received = await self._receive_some_into(buffer)
except OSError as e:
if e.winerror != ErrorCodes.ERROR_MORE_DATA:
raise # pragma: no cover
left = peek_pipe_message_left(self._handle_holder.handle)
# preallocate memory to avoid an extra copy of very large messages
newbuffer = bytearray(DEFAULT_RECEIVE_SIZE + left)
with memoryview(newbuffer) as view:
view[:DEFAULT_RECEIVE_SIZE] = buffer
await self._receive_some_into(view[DEFAULT_RECEIVE_SIZE:])
return newbuffer
else:
del buffer[received:]
return buffer

async def _receive_some_into(self, buffer) -> bytes:
if self._handle_holder.closed:
raise _core.ClosedResourceError("this pipe is already closed")
try:
return await _core.readinto_overlapped(self._handle_holder.handle, buffer)
except BrokenPipeError:
if self._handle_holder.closed:
raise _core.ClosedResourceError(
"another task closed this pipe"
) from None

# Windows raises BrokenPipeError on one end of a pipe
# whenever the other end closes, regardless of direction.
# Convert this to EndOfChannel.
#
# Do we have to checkpoint manually? We are raising an exception.
await _core.checkpoint()
raise _core.EndOfChannel

async def aclose(self):
await self._handle_holder.aclose()
Loading