-
-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Slow request body copy #2126
Slow request body copy #2126
Changes from all commits
6f6b466
33d74e0
691caf4
7860d10
2c5d10a
20bf8fa
7e1768f
a42d2cf
0c6c30e
a4c52fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Speed up the `PayloadWriter.write` method for large request bodies. |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -125,6 +125,7 @@ class PayloadWriter(AbstractPayloadWriter): | |
def __init__(self, stream, loop, acquire=True): | ||
self._stream = stream | ||
self._transport = None | ||
self._buffer = [] | ||
|
||
self.loop = loop | ||
self.length = None | ||
|
@@ -133,28 +134,35 @@ def __init__(self, stream, loop, acquire=True): | |
self.output_size = 0 | ||
|
||
self._eof = False | ||
self._buffer = [] | ||
self._compress = None | ||
self._drain_waiter = None | ||
|
||
if self._stream.available: | ||
self._transport = self._stream.transport | ||
self._buffer = None | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's not good idea to jungle variable type. How empty buffer There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm using There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hm...I see. Well, then such crash shold be controlled, imho. Otherwise it would be hard to distinguish transport re-use case from else error. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's purely internal to PayloadWriter, so everything is controlled where it should be. This None means that at any given time, exactly one of |
||
self._stream.available = False | ||
elif acquire: | ||
self._stream.acquire(self) | ||
|
||
def set_transport(self, transport): | ||
self._transport = transport | ||
|
||
chunk = b''.join(self._buffer) | ||
if chunk: | ||
transport.write(chunk) | ||
self._buffer.clear() | ||
if self._buffer is not None: | ||
for chunk in self._buffer: | ||
transport.write(chunk) | ||
self._buffer = None | ||
|
||
if self._drain_waiter is not None: | ||
waiter, self._drain_waiter = self._drain_waiter, None | ||
if not waiter.done(): | ||
waiter.set_result(None) | ||
waiter.set_result(None) | ||
|
||
async def get_transport(self): | ||
if self._transport is None: | ||
if self._drain_waiter is None: | ||
self._drain_waiter = self.loop.create_future() | ||
await self._drain_waiter | ||
|
||
return self._transport | ||
|
||
@property | ||
def tcp_nodelay(self): | ||
|
@@ -178,25 +186,14 @@ def enable_compression(self, encoding='deflate'): | |
if encoding == 'gzip' else -zlib.MAX_WBITS) | ||
self._compress = zlib.compressobj(wbits=zlib_mode) | ||
|
||
def buffer_data(self, chunk): | ||
if chunk: | ||
size = len(chunk) | ||
self.buffer_size += size | ||
self.output_size += size | ||
self._buffer.append(chunk) | ||
|
||
def _write(self, chunk): | ||
size = len(chunk) | ||
self.buffer_size += size | ||
self.output_size += size | ||
|
||
# see set_transport: exactly one of _buffer or _transport is None | ||
if self._transport is not None: | ||
if self._buffer: | ||
self._buffer.append(chunk) | ||
self._transport.write(b''.join(self._buffer)) | ||
self._buffer.clear() | ||
else: | ||
self._transport.write(chunk) | ||
self._transport.write(chunk) | ||
else: | ||
self._buffer.append(chunk) | ||
|
||
|
@@ -241,11 +238,7 @@ def write_headers(self, status_line, headers, SEP=': ', END='\r\n'): | |
headers = status_line + ''.join( | ||
[k + SEP + v + END for k, v in headers.items()]) | ||
headers = headers.encode('utf-8') + b'\r\n' | ||
|
||
size = len(headers) | ||
self.buffer_size += size | ||
self.output_size += size | ||
self._buffer.append(headers) | ||
self._write(headers) | ||
|
||
async def write_eof(self, chunk=b''): | ||
if self._eof: | ||
|
@@ -268,24 +261,17 @@ async def write_eof(self, chunk=b''): | |
chunk = b'0\r\n\r\n' | ||
|
||
if chunk: | ||
self.buffer_data(chunk) | ||
self._write(chunk) | ||
|
||
await self.drain(True) | ||
await self.drain() | ||
|
||
self._eof = True | ||
self._transport = None | ||
self._stream.release() | ||
|
||
async def drain(self, last=False): | ||
async def drain(self): | ||
if self._transport is not None: | ||
if self._buffer: | ||
self._transport.write(b''.join(self._buffer)) | ||
if not last: | ||
self._buffer.clear() | ||
await self._stream.drain() | ||
else: | ||
# wait for transport | ||
if self._drain_waiter is None: | ||
self._drain_waiter = self.loop.create_future() | ||
|
||
await self._drain_waiter | ||
await self.get_transport() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Side note: asyncio streams switched from
list
tobytearray
for sake of speed, maybe we need it too.Feel free to create a PR for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if i remember correctly, aiohttp switched from bytearrays to list for performance reasons too, so this probably need to be benchmarked…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just follow asyncio streams design, it switched from bytearrays to lists and back to bytearrays :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to benchmark bytearray vs join in #2179, but the results were inconclusive at best (or lean in favor of
b''.join
). I'll try to dig up why asyncio changed again to bytearray