From 6f6b466cc354cb147cda3fa7587b0805cb89ac71 Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Mon, 24 Jul 2017 15:05:35 +0200 Subject: [PATCH 01/10] avoid copying the request payload before writing it to the transport --- aiohttp/http_writer.py | 44 ++++++++++++------------------------------ 1 file changed, 12 insertions(+), 32 deletions(-) diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index f8da95074c2..2d08d465b00 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -124,7 +124,7 @@ class PayloadWriter(AbstractPayloadWriter): def __init__(self, stream, loop, acquire=True): self._stream = stream - self._transport = None + self._transport, self._buffer = None, [] self.loop = loop self.length = None @@ -133,23 +133,21 @@ def __init__(self, stream, loop, acquire=True): self.output_size = 0 self._eof = False - self._buffer = [] self._compress = None self._drain_waiter = None if self._stream.available: - self._transport = self._stream.transport + self._transport, self._buffer = self._stream.transport, None self._stream.available = False elif acquire: self._stream.acquire(self) def set_transport(self, transport): - self._transport = transport + assert self._transport is None and self._buffer is not None - chunk = b''.join(self._buffer) - if chunk: - transport.write(chunk) - self._buffer.clear() + for b in self._buffer: + transport.write(b) + self._transport, self._buffer = transport, None if self._drain_waiter is not None: waiter, self._drain_waiter = self._drain_waiter, None @@ -178,25 +176,15 @@ def enable_compression(self, encoding='deflate'): if encoding == 'gzip' else -zlib.MAX_WBITS) self._compress = zlib.compressobj(wbits=zlib_mode) - def buffer_data(self, chunk): - if chunk: - size = len(chunk) - self.buffer_size += size - self.output_size += size - self._buffer.append(chunk) - def _write(self, chunk): size = len(chunk) self.buffer_size += size self.output_size += size + # see set_transport: exactly one of _buffer or _transport is None if self._transport is not None: - if self._buffer: - self._buffer.append(chunk) - self._transport.write(b''.join(self._buffer)) - self._buffer.clear() - else: - self._transport.write(chunk) + assert self._buffer is None + self._transport.write(chunk) else: self._buffer.append(chunk) @@ -241,11 +229,7 @@ def write_headers(self, status_line, headers, SEP=': ', END='\r\n'): headers = status_line + ''.join( [k + SEP + v + END for k, v in headers.items()]) headers = headers.encode('utf-8') + b'\r\n' - - size = len(headers) - self.buffer_size += size - self.output_size += size - self._buffer.append(headers) + self._write(headers) async def write_eof(self, chunk=b''): if self._eof: @@ -268,9 +252,9 @@ async def write_eof(self, chunk=b''): chunk = b'0\r\n\r\n' if chunk: - self.buffer_data(chunk) + self._write(chunk) - await self.drain(True) + await self.drain() self._eof = True self._transport = None @@ -278,10 +262,6 @@ async def write_eof(self, chunk=b''): async def drain(self, last=False): if self._transport is not None: - if self._buffer: - self._transport.write(b''.join(self._buffer)) - if not last: - self._buffer.clear() await self._stream.drain() else: # wait for transport From 33d74e004b4067219ab1e67362a8a968909d9337 Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Mon, 24 Jul 2017 15:49:54 +0200 Subject: [PATCH 02/10] when the server closes the connection before writing EOF, the client should see a payload error, not expect to see the deconnection before the body is read --- tests/test_web_server.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_web_server.py b/tests/test_web_server.py index b8284502d3c..3561dae504d 100644 --- a/tests/test_web_server.py +++ b/tests/test_web_server.py @@ -80,8 +80,9 @@ async def handler(request): server = await raw_test_server(handler, logger=logger) cli = await test_client(server) - with pytest.raises(client.ServerDisconnectedError): - await cli.get('/path/to') + resp = await cli.get('/path/to') + with pytest.raises(client.ClientPayloadError): + await resp.read() logger.debug.assert_called_with('Ignored premature client disconnection ') From 691caf48c99fd51d7df1889ecbf250dab205f209 Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Mon, 24 Jul 2017 17:02:31 +0200 Subject: [PATCH 03/10] do not use private attributes in SendfilePayloadWriter --- aiohttp/http_writer.py | 29 +++++++++++++++++++---------- aiohttp/web_fileresponse.py | 28 +++++++++++----------------- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index 2d08d465b00..c3216137b5d 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -124,7 +124,8 @@ class PayloadWriter(AbstractPayloadWriter): def __init__(self, stream, loop, acquire=True): self._stream = stream - self._transport, self._buffer = None, [] + self._transport = None + self._buffer = [] self.loop = loop self.length = None @@ -137,23 +138,34 @@ def __init__(self, stream, loop, acquire=True): self._drain_waiter = None if self._stream.available: - self._transport, self._buffer = self._stream.transport, None + self._transport = self._stream.transport + self._buffer = None self._stream.available = False elif acquire: self._stream.acquire(self) def set_transport(self, transport): - assert self._transport is None and self._buffer is not None + self._transport = transport - for b in self._buffer: - transport.write(b) - self._transport, self._buffer = transport, None + if self._buffer is not None: + for b in self._buffer: + transport.write(b) + self._buffer = None if self._drain_waiter is not None: waiter, self._drain_waiter = self._drain_waiter, None if not waiter.done(): waiter.set_result(None) + async def get_transport(self): + if self._transport is None: + if self._drain_waiter is None: + self._drain_waiter = create_future(self.loop) + await self._drain_waiter + + assert self._transport is not None + return self._transport + @property def tcp_nodelay(self): return self._stream.tcp_nodelay @@ -265,7 +277,4 @@ async def drain(self, last=False): await self._stream.drain() else: # wait for transport - if self._drain_waiter is None: - self._drain_waiter = self.loop.create_future() - - await self._drain_waiter + await self.get_transport() diff --git a/aiohttp/web_fileresponse.py b/aiohttp/web_fileresponse.py index 7ef19bb8931..37b0f6c5fff 100644 --- a/aiohttp/web_fileresponse.py +++ b/aiohttp/web_fileresponse.py @@ -18,17 +18,16 @@ class SendfilePayloadWriter(PayloadWriter): - def set_transport(self, transport): - self._transport = transport - - if self._drain_waiter is not None: - waiter, self._drain_maiter = self._drain_maiter, None - if not waiter.done(): - waiter.set_result(None) + def __init__(self, *args, **kwargs): + self.__buffer = [] + super().__init__(*args, **kwargs) def _write(self, chunk): + # we overwrite PayloadWriter._write, so nothing can be appended to + # _buffer, and nothing is written to the transport directly by the + # parent class self.output_size += len(chunk) - self._buffer.append(chunk) + self.__buffer.append(chunk) def _sendfile_cb(self, fut, out_fd, in_fd, offset, count, loop, registered): @@ -54,13 +53,9 @@ def _sendfile_cb(self, fut, out_fd, in_fd, fut.set_result(None) async def sendfile(self, fobj, count): - if self._transport is None: - if self._drain_waiter is None: - self._drain_waiter = self.loop.create_future() - - await self._drain_waiter + transport = await self.get_transport() - out_socket = self._transport.get_extra_info("socket").dup() + out_socket = transport.get_extra_info('socket').dup() out_socket.setblocking(False) out_fd = out_socket.fileno() in_fd = fobj.fileno() @@ -74,13 +69,12 @@ async def sendfile(self, fobj, count): await fut except Exception: server_logger.debug('Socket error') - self._transport.close() + transport.close() finally: out_socket.close() self.output_size += count - self._transport = None - self._stream.release() + await super().write_eof() async def write_eof(self, chunk=b''): pass From 7860d10ef147c4fac57b39ce560ddc8bb242575e Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Mon, 24 Jul 2017 18:34:06 +0200 Subject: [PATCH 04/10] on windows, some test is now raising a fully fledged ClientOSError instead of an empty ServerDisconnectedError --- tests/test_client_functional.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index e0377d2d146..2607bc5db64 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -2462,7 +2462,7 @@ def connection_lost(self, exc): await r.read() assert 1 == len(connector._conns) - with pytest.raises(aiohttp.ServerDisconnectedError): + with pytest.raises(aiohttp.ClientConnectionError): await session.request('GET', url) assert 0 == len(connector._conns) From 2c5d10aa8cf5199c315513ce84838ed5777dff1b Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Tue, 8 Aug 2017 15:16:58 +0200 Subject: [PATCH 05/10] code review --- aiohttp/http_writer.py | 10 ++++------ aiohttp/web_fileresponse.py | 7 ++++--- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index c3216137b5d..6d02480c637 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -148,8 +148,8 @@ def set_transport(self, transport): self._transport = transport if self._buffer is not None: - for b in self._buffer: - transport.write(b) + for chunk in self._buffer: + transport.write(chunk) self._buffer = None if self._drain_waiter is not None: @@ -160,10 +160,9 @@ def set_transport(self, transport): async def get_transport(self): if self._transport is None: if self._drain_waiter is None: - self._drain_waiter = create_future(self.loop) + self._drain_waiter = self.loop.create_future() await self._drain_waiter - assert self._transport is not None return self._transport @property @@ -195,7 +194,6 @@ def _write(self, chunk): # see set_transport: exactly one of _buffer or _transport is None if self._transport is not None: - assert self._buffer is None self._transport.write(chunk) else: self._buffer.append(chunk) @@ -272,7 +270,7 @@ async def write_eof(self, chunk=b''): self._transport = None self._stream.release() - async def drain(self, last=False): + async def drain(self): if self._transport is not None: await self._stream.drain() else: diff --git a/aiohttp/web_fileresponse.py b/aiohttp/web_fileresponse.py index 37b0f6c5fff..fdb68916ebf 100644 --- a/aiohttp/web_fileresponse.py +++ b/aiohttp/web_fileresponse.py @@ -19,7 +19,7 @@ class SendfilePayloadWriter(PayloadWriter): def __init__(self, *args, **kwargs): - self.__buffer = [] + self._sendfile_buffer = [] super().__init__(*args, **kwargs) def _write(self, chunk): @@ -27,7 +27,7 @@ def _write(self, chunk): # _buffer, and nothing is written to the transport directly by the # parent class self.output_size += len(chunk) - self.__buffer.append(chunk) + self._sendfile_buffer.append(chunk) def _sendfile_cb(self, fut, out_fd, in_fd, offset, count, loop, registered): @@ -62,8 +62,9 @@ async def sendfile(self, fobj, count): offset = fobj.tell() loop = self.loop + data = b''.join(self._sendfile_buffer) try: - await loop.sock_sendall(out_socket, b''.join(self._buffer)) + await loop.sock_sendall(out_socket, data) fut = loop.create_future() self._sendfile_cb(fut, out_fd, in_fd, offset, count, loop, False) await fut From 20bf8fae6efb2832da9d39a0c5c14acc827396fa Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Tue, 21 Nov 2017 17:43:56 +0100 Subject: [PATCH 06/10] add CHANGES file --- CHANGES/2126.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 CHANGES/2126.feature diff --git a/CHANGES/2126.feature b/CHANGES/2126.feature new file mode 100644 index 00000000000..ee2562d7df6 --- /dev/null +++ b/CHANGES/2126.feature @@ -0,0 +1 @@ +Speed up the `PayloadWriter.write` method for large request bodies. From 7e1768f353eda413bfb40bb866b6472714247292 Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Fri, 24 Nov 2017 10:19:18 +0100 Subject: [PATCH 07/10] in PayloadWriter, the waiter result is set and the self._drain_waiter is set back to None at the same time. So self._drain_waiter can never be done() --- aiohttp/http_writer.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aiohttp/http_writer.py b/aiohttp/http_writer.py index 6d02480c637..03869c44e5e 100644 --- a/aiohttp/http_writer.py +++ b/aiohttp/http_writer.py @@ -154,8 +154,7 @@ def set_transport(self, transport): if self._drain_waiter is not None: waiter, self._drain_waiter = self._drain_waiter, None - if not waiter.done(): - waiter.set_result(None) + waiter.set_result(None) async def get_transport(self): if self._transport is None: From a42d2cf535737356736b0ed47a5c09d94fb3d83e Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Fri, 24 Nov 2017 10:25:15 +0100 Subject: [PATCH 08/10] test compressing by chunks in the PayloadWriter (cover the branch where a chunk does not yield anything that should be written to the transport) --- tests/test_client_functional.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index 2607bc5db64..5fa0334a538 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -1694,6 +1694,27 @@ async def handler(request): resp.close() +async def test_encoding_gzip_write_by_chunks(loop, test_client): + + async def handler(request): + resp = web.StreamResponse() + resp.enable_compression(web.ContentCoding.gzip) + await resp.prepare(request) + await resp.write(b'0') + await resp.write(b'0') + return resp + + app = web.Application() + app.router.add_get('/', handler) + client = await test_client(app) + + resp = await client.get('/') + assert 200 == resp.status + txt = await resp.text() + assert txt == '00' + resp.close() + + async def test_encoding_gzip_nochunk(loop, test_client): async def handler(request): From 0c6c30ec79a110432acf8485bf60cd8832f520fc Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Fri, 24 Nov 2017 10:32:25 +0100 Subject: [PATCH 09/10] Test writing too many chunks to a PayloadWriter that has a Content-Length --- tests/test_client_functional.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index 5fa0334a538..60215de1f56 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -1799,6 +1799,26 @@ async def handler(request): resp.close() +async def test_payload_content_length_by_chunks(loop, test_client): + + async def handler(request): + resp = web.StreamResponse(headers={'content-length': '3'}) + await resp.prepare(request) + await resp.write(b'answer') + await resp.write(b'two') + request.transport.close() + return resp + + app = web.Application() + app.router.add_get('/', handler) + client = await test_client(app) + + resp = await client.get('/') + data = await resp.read() + assert data == b'ans' + resp.close() + + async def test_chunked(loop, test_client): async def handler(request): From a4c52fb9728cb889cefd9fed025fbac87a69dc70 Mon Sep 17 00:00:00 2001 From: Arthur Darcet Date: Fri, 24 Nov 2017 11:06:17 +0100 Subject: [PATCH 10/10] test multiple waiting drains in PayloadWriter --- tests/test_http_writer.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/tests/test_http_writer.py b/tests/test_http_writer.py index ac5280fc391..3530abcc7e8 100644 --- a/tests/test_http_writer.py +++ b/tests/test_http_writer.py @@ -1,5 +1,6 @@ """Tests for aiohttp/http_writer.py""" +import asyncio import zlib from unittest import mock @@ -148,3 +149,19 @@ def test_write_drain(stream, loop): msg.write(b'1', drain=True) assert msg.drain.called assert msg.buffer_size == 0 + + +async def test_multiple_drains(stream, loop): + stream.available = False + msg = http.PayloadWriter(stream, loop) + fut1 = loop.create_task(msg.drain()) + fut2 = loop.create_task(msg.drain()) + + assert not fut1.done() + assert not fut2.done() + + msg.set_transport(stream.transport) + + await asyncio.sleep(0) + assert fut1.done() + assert fut2.done()