Skip to content

Commit 05890bd

Browse files
committed
Fix #987: implement client timeouts
1 parent 1bfc02a commit 05890bd

7 files changed

+161
-36
lines changed

aiohttp/client.py

+10-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from .client_reqrep import ClientRequest, ClientResponse
1919
from .client_ws import ClientWebSocketResponse
2020
from .errors import WSServerHandshakeError
21-
from .helpers import CookieJar
21+
from .helpers import CookieJar, Timeout
2222

2323
__all__ = ('ClientSession', 'request', 'get', 'options', 'head',
2424
'delete', 'post', 'put', 'patch', 'ws_connect')
@@ -106,7 +106,8 @@ def request(self, method, url, *,
106106
expect100=False,
107107
read_until_eof=True,
108108
proxy=None,
109-
proxy_auth=None):
109+
proxy_auth=None,
110+
timeout=5*60):
110111
"""Perform HTTP request."""
111112

112113
return _RequestContextManager(
@@ -127,7 +128,8 @@ def request(self, method, url, *,
127128
expect100=expect100,
128129
read_until_eof=read_until_eof,
129130
proxy=proxy,
130-
proxy_auth=proxy_auth,))
131+
proxy_auth=proxy_auth,
132+
timeout=timeout))
131133

132134
@asyncio.coroutine
133135
def _request(self, method, url, *,
@@ -145,7 +147,8 @@ def _request(self, method, url, *,
145147
expect100=False,
146148
read_until_eof=True,
147149
proxy=None,
148-
proxy_auth=None):
150+
proxy_auth=None,
151+
timeout=5*60):
149152

150153
if version is not None:
151154
warnings.warn("HTTP version should be specified "
@@ -187,9 +190,10 @@ def _request(self, method, url, *,
187190
auth=auth, version=version, compress=compress, chunked=chunked,
188191
expect100=expect100,
189192
loop=self._loop, response_class=self._response_class,
190-
proxy=proxy, proxy_auth=proxy_auth,)
193+
proxy=proxy, proxy_auth=proxy_auth, timeout=timeout)
191194

192-
conn = yield from self._connector.connect(req)
195+
with Timeout(timeout, loop=self._loop):
196+
conn = yield from self._connector.connect(req)
193197
try:
194198
resp = req.send(conn.writer, conn.reader)
195199
try:

aiohttp/client_reqrep.py

+14-7
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import aiohttp
1616

1717
from . import hdrs, helpers, streams
18+
from .helpers import Timeout
1819
from .log import client_logger
1920
from .multipart import MultipartWriter
2021
from .protocol import HttpMessage
@@ -68,7 +69,8 @@ def __init__(self, method, url, *,
6869
version=aiohttp.HttpVersion11, compress=None,
6970
chunked=None, expect100=False,
7071
loop=None, response_class=None,
71-
proxy=None, proxy_auth=None):
72+
proxy=None, proxy_auth=None,
73+
timeout=5*60):
7274

7375
if loop is None:
7476
loop = asyncio.get_event_loop()
@@ -80,6 +82,7 @@ def __init__(self, method, url, *,
8082
self.compress = compress
8183
self.loop = loop
8284
self.response_class = response_class or ClientResponse
85+
self._timeout = timeout
8386

8487
if loop.get_debug():
8588
self._source_traceback = traceback.extract_stack(sys._getframe(1))
@@ -502,7 +505,8 @@ def send(self, writer, reader):
502505

503506
self.response = self.response_class(
504507
self.method, self.url, self.host,
505-
writer=self._writer, continue100=self._continue)
508+
writer=self._writer, continue100=self._continue,
509+
timeout=self._timeout)
506510
self.response._post_init(self.loop)
507511
return self.response
508512

@@ -546,7 +550,8 @@ class ClientResponse:
546550
_loop = None
547551
_closed = True # to allow __del__ for non-initialized properly response
548552

549-
def __init__(self, method, url, host='', *, writer=None, continue100=None):
553+
def __init__(self, method, url, host='', *, writer=None, continue100=None,
554+
timeout=5*60):
550555
super().__init__()
551556

552557
self.method = method
@@ -558,6 +563,7 @@ def __init__(self, method, url, host='', *, writer=None, continue100=None):
558563
self._closed = False
559564
self._should_close = True # override by message.should_close later
560565
self._history = ()
566+
self._timeout = timeout
561567

562568
def _post_init(self, loop):
563569
self._loop = loop
@@ -609,7 +615,7 @@ def _setup_connection(self, connection):
609615
self._reader = connection.reader
610616
self._connection = connection
611617
self.content = self.flow_control_class(
612-
connection.reader, loop=connection.loop)
618+
connection.reader, loop=connection.loop, timeout=self._timeout)
613619

614620
def _need_parse_response_body(self):
615621
return (self.method.lower() != 'head' and
@@ -624,7 +630,8 @@ def start(self, connection, read_until_eof=False):
624630
httpstream = self._reader.set_parser(self._response_parser)
625631

626632
# read response
627-
message = yield from httpstream.read()
633+
with Timeout(self._timeout, loop=self._loop):
634+
message = yield from httpstream.read()
628635
if message.code != 100:
629636
break
630637

@@ -643,11 +650,11 @@ def start(self, connection, read_until_eof=False):
643650
self.raw_headers = tuple(message.raw_headers)
644651

645652
# payload
646-
response_with_body = self._need_parse_response_body()
653+
rwb = self._need_parse_response_body()
647654
self._reader.set_parser(
648655
aiohttp.HttpPayloadParser(message,
649656
readall=read_until_eof,
650-
response_with_body=response_with_body),
657+
response_with_body=rwb),
651658
self.content)
652659

653660
# cookies

aiohttp/streams.py

+35-18
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class StreamReader(asyncio.StreamReader, AsyncStreamReaderMixin):
8383

8484
total_bytes = 0
8585

86-
def __init__(self, limit=DEFAULT_LIMIT, loop=None):
86+
def __init__(self, limit=DEFAULT_LIMIT, timeout=None, loop=None):
8787
self._limit = limit
8888
if loop is None:
8989
loop = asyncio.get_event_loop()
@@ -93,8 +93,10 @@ def __init__(self, limit=DEFAULT_LIMIT, loop=None):
9393
self._buffer_offset = 0
9494
self._eof = False
9595
self._waiter = None
96+
self._canceller = None
9697
self._eof_waiter = None
9798
self._exception = None
99+
self._timeout = timeout
98100

99101
def __repr__(self):
100102
info = ['StreamReader']
@@ -122,6 +124,11 @@ def set_exception(self, exc):
122124
if not waiter.cancelled():
123125
waiter.set_exception(exc)
124126

127+
canceller = self._canceller
128+
if canceller is not None:
129+
self._canceller = None
130+
canceller.cancel()
131+
125132
def feed_eof(self):
126133
self._eof = True
127134

@@ -131,6 +138,11 @@ def feed_eof(self):
131138
if not waiter.cancelled():
132139
waiter.set_result(True)
133140

141+
canceller = self._canceller
142+
if canceller is not None:
143+
self._canceller = None
144+
canceller.cancel()
145+
134146
waiter = self._eof_waiter
135147
if waiter is not None:
136148
self._eof_waiter = None
@@ -185,15 +197,32 @@ def feed_data(self, data):
185197
if not waiter.cancelled():
186198
waiter.set_result(False)
187199

188-
def _create_waiter(self, func_name):
200+
canceller = self._canceller
201+
if canceller is not None:
202+
self._canceller = None
203+
canceller.cancel()
204+
205+
@asyncio.coroutine
206+
def _wait(self, func_name):
189207
# StreamReader uses a future to link the protocol feed_data() method
190208
# to a read coroutine. Running two read coroutines at the same time
191209
# would have an unexpected behaviour. It would not possible to know
192210
# which coroutine would get the next data.
193211
if self._waiter is not None:
194212
raise RuntimeError('%s() called while another coroutine is '
195213
'already waiting for incoming data' % func_name)
196-
return helpers.create_future(self._loop)
214+
waiter = self._waiter = helpers.create_future(self._loop)
215+
if self._timeout:
216+
self._canceller = self._loop.call_later(self._timeout,
217+
self.set_exception,
218+
asyncio.TimeoutError())
219+
try:
220+
yield from waiter
221+
finally:
222+
self._waiter = None
223+
if self._canceller is not None:
224+
self._canceller.cancel()
225+
self._canceller = None
197226

198227
@asyncio.coroutine
199228
def readline(self):
@@ -222,11 +251,7 @@ def readline(self):
222251
break
223252

224253
if not_enough:
225-
self._waiter = self._create_waiter('readline')
226-
try:
227-
yield from self._waiter
228-
finally:
229-
self._waiter = None
254+
yield from self._wait('readline')
230255

231256
return b''.join(line)
232257

@@ -265,11 +290,7 @@ def read(self, n=-1):
265290
return b''.join(blocks)
266291

267292
if not self._buffer and not self._eof:
268-
self._waiter = self._create_waiter('read')
269-
try:
270-
yield from self._waiter
271-
finally:
272-
self._waiter = None
293+
yield from self._wait('read')
273294

274295
return self._read_nowait(n)
275296

@@ -279,11 +300,7 @@ def readany(self):
279300
raise self._exception
280301

281302
if not self._buffer and not self._eof:
282-
self._waiter = self._create_waiter('readany')
283-
try:
284-
yield from self._waiter
285-
finally:
286-
self._waiter = None
303+
yield from self._wait('readany')
287304

288305
return self._read_nowait()
289306

tests/test_client_functional.py

+34
Original file line numberDiff line numberDiff line change
@@ -474,3 +474,37 @@ def handler(request):
474474
resp = yield from client.delete('/')
475475
assert resp.status == 204
476476
yield from resp.release()
477+
478+
479+
@pytest.mark.run_loop
480+
def test_timeout_on_reading_headers(create_app_and_client, loop):
481+
482+
@asyncio.coroutine
483+
def handler(request):
484+
resp = web.StreamResponse()
485+
yield from asyncio.sleep(0.1, loop=loop)
486+
yield from resp.prepare(request)
487+
return resp
488+
489+
app, client = yield from create_app_and_client()
490+
app.router.add_route('GET', '/', handler)
491+
with pytest.raises(asyncio.TimeoutError):
492+
yield from client.get('/', timeout=0.01)
493+
494+
495+
@pytest.mark.run_loop
496+
def test_timeout_on_reading_data(create_app_and_client, loop):
497+
498+
@asyncio.coroutine
499+
def handler(request):
500+
resp = web.StreamResponse()
501+
yield from resp.prepare(request)
502+
yield from asyncio.sleep(0.1, loop=loop)
503+
return resp
504+
505+
app, client = yield from create_app_and_client()
506+
app.router.add_route('GET', '/', handler)
507+
resp = yield from client.get('/', timeout=0.05)
508+
509+
with pytest.raises(asyncio.TimeoutError):
510+
yield from resp.read()

tests/test_client_response.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -244,11 +244,11 @@ def side_effect(*args, **kwargs):
244244

245245
def test_override_flow_control(self):
246246
class MyResponse(ClientResponse):
247-
flow_control_class = aiohttp.FlowControlDataQueue
247+
flow_control_class = aiohttp.StreamReader
248248
response = MyResponse('get', 'http://my-cl-resp.org')
249249
response._post_init(self.loop)
250250
response._setup_connection(self.connection)
251-
self.assertIsInstance(response.content, aiohttp.FlowControlDataQueue)
251+
self.assertIsInstance(response.content, aiohttp.StreamReader)
252252
response.close()
253253

254254
@mock.patch('aiohttp.client_reqrep.chardet')

tests/test_client_session.py

+2
Original file line numberDiff line numberDiff line change
@@ -363,7 +363,9 @@ def create_connection(req):
363363
assert e.strerror == err.strerror
364364

365365

366+
@pytest.mark.run_loop
366367
def test_request_ctx_manager_props(loop):
368+
yield from asyncio.sleep(0, loop=loop) # to make it a task
367369
with aiohttp.ClientSession(loop=loop) as client:
368370
ctx_mgr = client.get('http://example.com')
369371

0 commit comments

Comments
 (0)