From 493526c9afe58518e35adf58d71e22c23eae7ec5 Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Wed, 5 Apr 2017 15:38:53 +0300 Subject: [PATCH 01/10] Add aiohttp2 support and fix ssl error handling --- .gitignore | 2 ++ aioelasticsearch/compat.py | 4 +++ aioelasticsearch/connection.py | 61 ++++++++++++++++++++++++---------- requirements.txt | 2 +- 4 files changed, 50 insertions(+), 19 deletions(-) diff --git a/.gitignore b/.gitignore index 8a3cc4db..8202029e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ # python specific env* +.cache/ *.pyc *.so *.pyd @@ -9,6 +10,7 @@ MANIFEST __pycache__/ *.egg-info/ .coverage +.python-version htmlcov # generic files to ignore diff --git a/aioelasticsearch/compat.py b/aioelasticsearch/compat.py index ab49718b..cf3302ed 100644 --- a/aioelasticsearch/compat.py +++ b/aioelasticsearch/compat.py @@ -2,10 +2,14 @@ import sys from functools import partial +import aiohttp + PY_344 = sys.version_info >= (3, 4, 4) PY_350 = sys.version_info >= (3, 5, 0) PY_352 = sys.version_info >= (3, 5, 2) +AIOHTTP_2 = aiohttp.__version__ >= '2.0.0' + def create_task(*, loop=None): if loop is None: diff --git a/aioelasticsearch/connection.py b/aioelasticsearch/connection.py index b1764545..4106633b 100644 --- a/aioelasticsearch/connection.py +++ b/aioelasticsearch/connection.py @@ -1,11 +1,19 @@ import asyncio +import ssl import aiohttp -from aiohttp.errors import ClientError, FingerprintMismatch -from elasticsearch.connection import Connection -from elasticsearch.exceptions import (ConnectionError, ConnectionTimeout, + +from .compat import AIOHTTP_2 # isort:skip + +if AIOHTTP_2: + from aiohttp import ClientError +else: + from aiohttp.errors import ClientError + +from elasticsearch.connection import Connection # noqa # isort:skip +from elasticsearch.exceptions import (ConnectionError, ConnectionTimeout, # noqa # isort:skip SSLError) -from yarl import URL +from yarl import URL # noqa # isort:skip class AIOHttpConnection(Connection): @@ -49,16 +57,28 @@ def __init__( self.session = kwargs.get('session') if self.session is None: - self.session = aiohttp.ClientSession( - auth=self.http_auth, - connector=aiohttp.TCPConnector( - limit=maxsize, - use_dns_cache=kwargs.get('use_dns_cache', False), - verify_ssl=self.verify_certs, + if AIOHTTP_2: + self.session = aiohttp.ClientSession( + auth=self.http_auth, conn_timeout=None, - loop=self.loop, - ), - ) + connector=aiohttp.TCPConnector( + limit=maxsize, + use_dns_cache=kwargs.get('use_dns_cache', False), + verify_ssl=self.verify_certs, + loop=self.loop, + ), + ) + else: + self.session = aiohttp.ClientSession( + auth=self.http_auth, + connector=aiohttp.TCPConnector( + limit=maxsize, + use_dns_cache=kwargs.get('use_dns_cache', False), + verify_ssl=self.verify_certs, + conn_timeout=None, + loop=self.loop, + ), + ) def close(self): return self.session.close() @@ -78,17 +98,22 @@ def perform_request(self, method, url, params=None, body=None, timeout=None, ign duration = self.loop.time() - start - except asyncio.TimeoutError as exc: + except ssl.CertificateError as exc: self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=exc) # noqa - raise ConnectionTimeout('TIMEOUT', str(exc), exc) + raise SSLError('N/A', str(exc), exc) - except FingerprintMismatch as exc: + except asyncio.TimeoutError as exc: self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=exc) # noqa - raise SSLError('N/A', str(exc), exc) + raise ConnectionTimeout('TIMEOUT', str(exc), exc) except ClientError as exc: self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=exc) # noqa - raise ConnectionError('N/A', str(exc), exc) + _exc = str(exc) + # aiohttp wraps ssl error + if 'SSL: CERTIFICATE_VERIFY_FAILED' in _exc: + raise SSLError('N/A', _exc, exc) + + raise ConnectionError('N/A', _exc, exc) finally: if response is not None: diff --git a/requirements.txt b/requirements.txt index 88f04fdb..3a4c1dc8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -aiohttp==1.3.5 +aiohttp==2.0.5 appdirs==1.4.3 appnope==0.1.0 async-timeout==1.2.0 From 2c08fb6153c05a69f9125cb734a1d1eeef28fc53 Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Wed, 5 Apr 2017 16:56:59 +0300 Subject: [PATCH 02/10] Fix ci to support aiohttp2 + ssl_context --- .travis.yml | 1 + aioelasticsearch/connection.py | 37 ++++++++++++++++------------------ setup.py | 2 +- tox.ini | 4 +++- 4 files changed, 22 insertions(+), 22 deletions(-) diff --git a/.travis.yml b/.travis.yml index 482655c5..9c133152 100644 --- a/.travis.yml +++ b/.travis.yml @@ -13,6 +13,7 @@ env: - ES="5.0.2" - ES="5.1.2" - ES="5.2.2" + - ES="5.3.0" install: - curl -O https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-$ES.deb && sudo dpkg -i --force-confnew elasticsearch-$ES.deb && sudo service elasticsearch start - pip install -U setuptools diff --git a/aioelasticsearch/connection.py b/aioelasticsearch/connection.py index 4106633b..285a551a 100644 --- a/aioelasticsearch/connection.py +++ b/aioelasticsearch/connection.py @@ -24,6 +24,7 @@ def __init__( port=9200, http_auth=None, use_ssl=False, + ssl_context=None, verify_certs=False, maxsize=10, headers=None, @@ -57,28 +58,24 @@ def __init__( self.session = kwargs.get('session') if self.session is None: + connector_kwargs = { + 'limit': maxsize, + 'use_dns_cache': kwargs.get('use_dns_cache', False), + 'ssl_context': ssl_context, + 'verify_ssl': self.verify_certs, + 'loop': self.loop, + } + session_kwargs = {'auth': self.http_auth} + if AIOHTTP_2: - self.session = aiohttp.ClientSession( - auth=self.http_auth, - conn_timeout=None, - connector=aiohttp.TCPConnector( - limit=maxsize, - use_dns_cache=kwargs.get('use_dns_cache', False), - verify_ssl=self.verify_certs, - loop=self.loop, - ), - ) + session_kwargs['conn_timeout'] = None else: - self.session = aiohttp.ClientSession( - auth=self.http_auth, - connector=aiohttp.TCPConnector( - limit=maxsize, - use_dns_cache=kwargs.get('use_dns_cache', False), - verify_ssl=self.verify_certs, - conn_timeout=None, - loop=self.loop, - ), - ) + connector_kwargs['conn_timeout'] = None + + self.session = aiohttp.ClientSession( + connector=aiohttp.TCPConnector(**connector_kwargs), + **session_kwargs, + ) def close(self): return self.session.close() diff --git a/setup.py b/setup.py index 73992ca5..9cdefdce 100644 --- a/setup.py +++ b/setup.py @@ -30,7 +30,7 @@ def read(*parts): long_description=read('README.rst'), install_requires=[ 'elasticsearch>=5.0.0,<6.0.0', - 'aiohttp>=1.3.0,<2.0.0', + 'aiohttp>=1.3.0', ], packages=['aioelasticsearch'], include_package_data=True, diff --git a/tox.ini b/tox.ini index dde9b8f9..24012e34 100644 --- a/tox.ini +++ b/tox.ini @@ -1,10 +1,12 @@ [tox] envlist = - py3{4,5,6} + py3{4,5,6}-aiohttp{1,2} skip_missing_interpreters = True [testenv] deps = + aiohttp1: aiohttp<2.0.0 + aiohttp2: aiohttp>=2.0.0 pytest pytest-cov flake8 From 1511bf9261fb920a5e8c45a8fc1dfc0fef3edbab Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Wed, 5 Apr 2017 17:28:24 +0300 Subject: [PATCH 03/10] fix flake --- aioelasticsearch/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aioelasticsearch/connection.py b/aioelasticsearch/connection.py index 285a551a..893b97de 100644 --- a/aioelasticsearch/connection.py +++ b/aioelasticsearch/connection.py @@ -74,7 +74,7 @@ def __init__( self.session = aiohttp.ClientSession( connector=aiohttp.TCPConnector(**connector_kwargs), - **session_kwargs, + **session_kwargs ) def close(self): From 151822d624ef9a2099c0132e6a678f4e55d9b367 Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Wed, 5 Apr 2017 19:12:05 +0300 Subject: [PATCH 04/10] Remove conn_timeout for aiohttp since its none by default --- aioelasticsearch/connection.py | 24 ++++++++---------------- 1 file changed, 8 insertions(+), 16 deletions(-) diff --git a/aioelasticsearch/connection.py b/aioelasticsearch/connection.py index 893b97de..ee0516ca 100644 --- a/aioelasticsearch/connection.py +++ b/aioelasticsearch/connection.py @@ -58,23 +58,15 @@ def __init__( self.session = kwargs.get('session') if self.session is None: - connector_kwargs = { - 'limit': maxsize, - 'use_dns_cache': kwargs.get('use_dns_cache', False), - 'ssl_context': ssl_context, - 'verify_ssl': self.verify_certs, - 'loop': self.loop, - } - session_kwargs = {'auth': self.http_auth} - - if AIOHTTP_2: - session_kwargs['conn_timeout'] = None - else: - connector_kwargs['conn_timeout'] = None - self.session = aiohttp.ClientSession( - connector=aiohttp.TCPConnector(**connector_kwargs), - **session_kwargs + auth=self.http_auth, + connector=aiohttp.TCPConnector( + limit=maxsize, + use_dns_cache=kwargs.get('use_dns_cache', False), + ssl_context=ssl_context, + verify_ssl=self.verify_certs, + loop=self.loop, + ), ) def close(self): From 966386b9f572a11d5fe3a207722a67971e2c0a01 Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Thu, 6 Apr 2017 11:22:25 +0300 Subject: [PATCH 05/10] Fix style and revome noqa --- aioelasticsearch/connection.py | 76 +++++++++++++++++++++++++++++----- 1 file changed, 66 insertions(+), 10 deletions(-) diff --git a/aioelasticsearch/connection.py b/aioelasticsearch/connection.py index ee0516ca..200e538c 100644 --- a/aioelasticsearch/connection.py +++ b/aioelasticsearch/connection.py @@ -73,7 +73,15 @@ def close(self): return self.session.close() @asyncio.coroutine - def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()): # noqa + def perform_request( + self, + method, + url, + params=None, + body=None, + timeout=None, + ignore=() + ): url_path = url url = (self.base_url / url.lstrip('/')).with_query(params) @@ -81,22 +89,50 @@ def perform_request(self, method, url, params=None, body=None, timeout=None, ign start = self.loop.time() response = None try: - with aiohttp.Timeout(timeout or self.timeout, loop=self.loop): # noqa - response = yield from self.session.request(method, url, data=body, headers=self.headers, timeout=None) # noqa + with aiohttp.Timeout(timeout or self.timeout, loop=self.loop): + response = yield from self.session.request( + method, + url, + data=body, + headers=self.headers, + timeout=None, + ) raw_data = yield from response.text() duration = self.loop.time() - start except ssl.CertificateError as exc: - self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=exc) # noqa + self.log_request_fail( + method, + url, + url_path, + body, + self.loop.time() - start, + exception=exc, + ) raise SSLError('N/A', str(exc), exc) except asyncio.TimeoutError as exc: - self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=exc) # noqa + self.log_request_fail( + method, + url, + url_path, + body, + self.loop.time() - start, + exception=exc, + ) raise ConnectionTimeout('TIMEOUT', str(exc), exc) except ClientError as exc: - self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=exc) # noqa + self.log_request_fail( + method, + url, + url_path, + body, + self.loop.time() - start, + exception=exc, + ) + _exc = str(exc) # aiohttp wraps ssl error if 'SSL: CERTIFICATE_VERIFY_FAILED' in _exc: @@ -108,11 +144,31 @@ def perform_request(self, method, url, params=None, body=None, timeout=None, ign if response is not None: yield from response.release() - # raise errors based on http status codes, let the client handle those if needed # noqa - if not (200 <= response.status < 300) and response.status not in ignore: # noqa - self.log_request_fail(method, url, url_path, body, duration, response.status, raw_data) # noqa + # raise errors based on http status codes + # let the client handle those if needed + if ( + not (200 <= response.status < 300) and + response.status not in ignore + ): + self.log_request_fail( + method, + url, + url_path, + body, + duration, + response.status, + raw_data, + ) self._raise_error(response.status, raw_data) - self.log_request_success(method, url, url_path, body, response.status, raw_data, duration) # noqa + self.log_request_success( + method, + url, + url_path, + body, + response.status, + raw_data, + duration, + ) return response.status, response.headers, raw_data From 12d60cafd8eb2a99424e2ddf1d9ae7f35971fad9 Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Mon, 10 Apr 2017 18:11:44 +0300 Subject: [PATCH 06/10] Consistent close --- aioelasticsearch/__init__.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/aioelasticsearch/__init__.py b/aioelasticsearch/__init__.py index 1ae39dac..c8dc2091 100644 --- a/aioelasticsearch/__init__.py +++ b/aioelasticsearch/__init__.py @@ -43,6 +43,5 @@ def close(self): def __aenter__(self): # noqa return self - @asyncio.coroutine def __aexit__(self, *exc_info): # noqa - yield from self.close() + return self.close() From 5559e0ffa8f2823a9702214619b1c1cbbe038b06 Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Tue, 11 Apr 2017 18:03:48 +0300 Subject: [PATCH 07/10] Fix connection pool close --- aioelasticsearch/pool.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/aioelasticsearch/pool.py b/aioelasticsearch/pool.py index 917d4f2d..5e4dbf43 100644 --- a/aioelasticsearch/pool.py +++ b/aioelasticsearch/pool.py @@ -6,6 +6,8 @@ from elasticsearch import RoundRobinSelector from elasticsearch.exceptions import ImproperlyConfigured +from .compat import AIOHTTP_2 + logger = logging.getLogger('elasticsearch') @@ -125,6 +127,9 @@ def close(self): _, connection = self.dead.get_nowait() coros.append(connection.close()) + if AIOHTTP_2: + return coros + return asyncio.gather(*coros, return_exceptions=True, loop=self.loop) From 6ce95ce67a281b8609b1b5d9ee80b78eb4e2b137 Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Tue, 11 Apr 2017 19:03:38 +0300 Subject: [PATCH 08/10] Fix sync connection close for pool with aiohttp2 --- aioelasticsearch/pool.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/aioelasticsearch/pool.py b/aioelasticsearch/pool.py index 5e4dbf43..dd46143c 100644 --- a/aioelasticsearch/pool.py +++ b/aioelasticsearch/pool.py @@ -6,7 +6,7 @@ from elasticsearch import RoundRobinSelector from elasticsearch.exceptions import ImproperlyConfigured -from .compat import AIOHTTP_2 +from .compat import AIOHTTP_2, create_future logger = logging.getLogger('elasticsearch') @@ -128,7 +128,9 @@ def close(self): coros.append(connection.close()) if AIOHTTP_2: - return coros + future = create_future(loop=self.loop) + future.set_result(None) + return future return asyncio.gather(*coros, return_exceptions=True, loop=self.loop) From 6948ee8361c7073d2d687e3d6002f732a2536137 Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Wed, 12 Apr 2017 11:06:36 +0300 Subject: [PATCH 09/10] Move fixed close to connection instead pool --- aioelasticsearch/connection.py | 10 ++++++++-- aioelasticsearch/pool.py | 7 ------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/aioelasticsearch/connection.py b/aioelasticsearch/connection.py index 200e538c..6b276657 100644 --- a/aioelasticsearch/connection.py +++ b/aioelasticsearch/connection.py @@ -3,7 +3,7 @@ import aiohttp -from .compat import AIOHTTP_2 # isort:skip +from .compat import AIOHTTP_2, create_future # isort:skip if AIOHTTP_2: from aiohttp import ClientError @@ -70,7 +70,13 @@ def __init__( ) def close(self): - return self.session.close() + coro = self.session.close() + if not AIOHTTP_2: + return coro + + future = create_future(loop=self.loop) + future.set_result(None) + return future @asyncio.coroutine def perform_request( diff --git a/aioelasticsearch/pool.py b/aioelasticsearch/pool.py index dd46143c..917d4f2d 100644 --- a/aioelasticsearch/pool.py +++ b/aioelasticsearch/pool.py @@ -6,8 +6,6 @@ from elasticsearch import RoundRobinSelector from elasticsearch.exceptions import ImproperlyConfigured -from .compat import AIOHTTP_2, create_future - logger = logging.getLogger('elasticsearch') @@ -127,11 +125,6 @@ def close(self): _, connection = self.dead.get_nowait() coros.append(connection.close()) - if AIOHTTP_2: - future = create_future(loop=self.loop) - future.set_result(None) - return future - return asyncio.gather(*coros, return_exceptions=True, loop=self.loop) From b3ef896dc175429ebaef1f2ebc90fab8c4c70e78 Mon Sep 17 00:00:00 2001 From: "a.ursulenko" Date: Thu, 27 Apr 2017 11:29:27 +0300 Subject: [PATCH 10/10] Fix import --- aioelasticsearch/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aioelasticsearch/connection.py b/aioelasticsearch/connection.py index 6b276657..0678d8b8 100644 --- a/aioelasticsearch/connection.py +++ b/aioelasticsearch/connection.py @@ -3,7 +3,7 @@ import aiohttp -from .compat import AIOHTTP_2, create_future # isort:skip +from aioelasticsearch.compat import AIOHTTP_2, create_future if AIOHTTP_2: from aiohttp import ClientError