From df0a608d9612252ca2fb660735780dbf4ed988b1 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 16 May 2016 16:02:47 -0400 Subject: [PATCH 1/4] Add support for controlled paging via Gax. --- gcloud/pubsub/_gax.py | 37 +++++++++++++--- gcloud/pubsub/test__gax.py | 87 +++++++++++++++++++++++++++++++++++--- 2 files changed, 111 insertions(+), 13 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index fb6e5d9a19d4..f26da28bdf84 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -16,6 +16,7 @@ # pylint: disable=import-error from google.gax import CallOptions +from google.gax import INITIAL_PAGE from google.gax.errors import GaxError from google.gax.grpc import exc_to_code from google.pubsub.v1.pubsub_pb2 import PubsubMessage @@ -37,7 +38,7 @@ class _PublisherAPI(object): def __init__(self, gax_api): self._gax_api = gax_api - def list_topics(self, project): + def list_topics(self, project, page_token=None): """List topics for the project associated with this API. See: @@ -46,13 +47,21 @@ def list_topics(self, project): :type project: string :param project: project ID + :type page_token: string + :param page_token: opaque marker for the next "page" of topics. If not + passed, the API will return the first page of + topics. + :rtype: tuple, (list, str) :returns: list of ``Topic`` resource dicts, plus a "next page token" string: if not None, indicates that more topics can be retrieved with another call (pass that value as ``page_token``). """ - options = CallOptions(is_page_streaming=False) + if page_token is None: + page_token = INITIAL_PAGE + options = {'page_token': page_token} + options = CallOptions(**options) path = 'projects/%s' % (project,) response = self._gax_api.list_topics(path, options) topics = [{'name': topic_pb.name} for topic_pb in response.topics] @@ -152,7 +161,7 @@ def topic_publish(self, topic_path, messages): raise return response.message_ids - def topic_list_subscriptions(self, topic_path): + def topic_list_subscriptions(self, topic_path, page_token=None): """API call: list subscriptions bound to a topic See: @@ -162,13 +171,21 @@ def topic_list_subscriptions(self, topic_path): :param topic_path: fully-qualified path of the topic, in format ``projects//topics/``. + :type page_token: string + :param page_token: opaque marker for the next "page" of subscriptions. + If not passed, the API will return the first page + of subscriptions. + :rtype: list of strings :returns: fully-qualified names of subscriptions for the supplied topic. :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not exist """ - options = CallOptions(is_page_streaming=False) + if page_token is None: + page_token = INITIAL_PAGE + options = {'page_token': page_token} + options = CallOptions(**options) try: response = self._gax_api.list_topic_subscriptions( topic_path, options) @@ -190,7 +207,7 @@ class _SubscriberAPI(object): def __init__(self, gax_api): self._gax_api = gax_api - def list_subscriptions(self, project): + def list_subscriptions(self, project, page_token=None): """List subscriptions for the project associated with this API. See: @@ -199,13 +216,21 @@ def list_subscriptions(self, project): :type project: string :param project: project ID + :type page_token: string + :param page_token: opaque marker for the next "page" of subscriptions. + If not passed, the API will return the first page + of subscriptions. + :rtype: tuple, (list, str) :returns: list of ``Subscription`` resource dicts, plus a "next page token" string: if not None, indicates that more topics can be retrieved with another call (pass that value as ``page_token``). """ - options = CallOptions(is_page_streaming=False) + if page_token is None: + page_token = INITIAL_PAGE + options = {'page_token': page_token} + options = CallOptions(**options) path = 'projects/%s' % (project,) response = self._gax_api.list_subscriptions(path, options) subscriptions = [_subscription_pb_to_mapping(sub_pb) diff --git a/gcloud/pubsub/test__gax.py b/gcloud/pubsub/test__gax.py index 30194480ecf3..e6188daac38b 100644 --- a/gcloud/pubsub/test__gax.py +++ b/gcloud/pubsub/test__gax.py @@ -52,7 +52,9 @@ def test_ctor(self): self.assertTrue(api._gax_api is gax_api) def test_list_topics_no_paging(self): - response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)]) + from google.gax import INITIAL_PAGE + TOKEN = 'TOKEN' + response = _ListTopicsResponsePB([_TopicPB(self.TOPIC_PATH)], TOKEN) gax_api = _GAXPublisherAPI(_list_topics_response=response) api = self._makeOne(gax_api) @@ -62,11 +64,31 @@ def test_list_topics_no_paging(self): topic = topics[0] self.assertIsInstance(topic, dict) self.assertEqual(topic['name'], self.TOPIC_PATH) - self.assertEqual(next_token, None) + self.assertEqual(next_token, TOKEN) name, options = gax_api._list_topics_called_with self.assertEqual(name, self.PROJECT_PATH) - self.assertFalse(options.is_page_streaming) + self.assertTrue(options.page_token is INITIAL_PAGE) + + def test_list_topics_with_paging(self): + TOKEN = 'TOKEN' + NEW_TOKEN = 'NEW_TOKEN' + response = _ListTopicsResponsePB( + [_TopicPB(self.TOPIC_PATH)], NEW_TOKEN) + gax_api = _GAXPublisherAPI(_list_topics_response=response) + api = self._makeOne(gax_api) + + topics, next_token = api.list_topics(self.PROJECT, page_token=TOKEN) + + self.assertEqual(len(topics), 1) + topic = topics[0] + self.assertIsInstance(topic, dict) + self.assertEqual(topic['name'], self.TOPIC_PATH) + self.assertEqual(next_token, NEW_TOKEN) + + name, options = gax_api._list_topics_called_with + self.assertEqual(name, self.PROJECT_PATH) + self.assertEqual(options.page_token, TOKEN) def test_topic_create(self): topic_pb = _TopicPB(self.TOPIC_PATH) @@ -233,6 +255,7 @@ def test_topic_publish_error(self): self.assertEqual(options, None) def test_topic_list_subscriptions_no_paging(self): + from google.gax import INITIAL_PAGE response = _ListTopicSubscriptionsResponsePB([self.SUB_PATH]) gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response) api = self._makeOne(gax_api) @@ -249,9 +272,32 @@ def test_topic_list_subscriptions_no_paging(self): topic_path, options = gax_api._list_topic_subscriptions_called_with self.assertEqual(topic_path, self.TOPIC_PATH) - self.assertFalse(options.is_page_streaming) + self.assertTrue(options.page_token is INITIAL_PAGE) + + def test_topic_list_subscriptions_with_paging(self): + TOKEN = 'TOKEN' + NEW_TOKEN = 'NEW_TOKEN' + response = _ListTopicSubscriptionsResponsePB( + [self.SUB_PATH], NEW_TOKEN) + gax_api = _GAXPublisherAPI(_list_topic_subscriptions_response=response) + api = self._makeOne(gax_api) + + subscriptions, next_token = api.topic_list_subscriptions( + self.TOPIC_PATH, page_token=TOKEN) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertIsInstance(subscription, dict) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(next_token, NEW_TOKEN) + + name, options = gax_api._list_topic_subscriptions_called_with + self.assertEqual(name, self.TOPIC_PATH) + self.assertEqual(options.page_token, TOKEN) def test_topic_list_subscriptions_miss(self): + from google.gax import INITIAL_PAGE from gcloud.exceptions import NotFound gax_api = _GAXPublisherAPI() api = self._makeOne(gax_api) @@ -261,9 +307,10 @@ def test_topic_list_subscriptions_miss(self): topic_path, options = gax_api._list_topic_subscriptions_called_with self.assertEqual(topic_path, self.TOPIC_PATH) - self.assertFalse(options.is_page_streaming) + self.assertTrue(options.page_token is INITIAL_PAGE) def test_topic_list_subscriptions_error(self): + from google.gax import INITIAL_PAGE from google.gax.errors import GaxError gax_api = _GAXPublisherAPI(_random_gax_error=True) api = self._makeOne(gax_api) @@ -273,7 +320,7 @@ def test_topic_list_subscriptions_error(self): topic_path, options = gax_api._list_topic_subscriptions_called_with self.assertEqual(topic_path, self.TOPIC_PATH) - self.assertFalse(options.is_page_streaming) + self.assertTrue(options.page_token is INITIAL_PAGE) @unittest2.skipUnless(_HAVE_GAX, 'No gax-python') @@ -291,6 +338,7 @@ def test_ctor(self): self.assertTrue(api._gax_api is gax_api) def test_list_subscriptions_no_paging(self): + from google.gax import INITIAL_PAGE response = _ListSubscriptionsResponsePB([_SubscriptionPB( self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)]) gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response) @@ -310,7 +358,32 @@ def test_list_subscriptions_no_paging(self): name, options = gax_api._list_subscriptions_called_with self.assertEqual(name, self.PROJECT_PATH) - self.assertFalse(options.is_page_streaming) + self.assertTrue(options.page_token is INITIAL_PAGE) + + def test_list_subscriptions_with_paging(self): + TOKEN = 'TOKEN' + NEW_TOKEN = 'NEW_TOKEN' + response = _ListSubscriptionsResponsePB([_SubscriptionPB( + self.SUB_PATH, self.TOPIC_PATH, self.PUSH_ENDPOINT, 0)], NEW_TOKEN) + gax_api = _GAXSubscriberAPI(_list_subscriptions_response=response) + api = self._makeOne(gax_api) + + subscriptions, next_token = api.list_subscriptions( + self.PROJECT, page_token=TOKEN) + + self.assertEqual(len(subscriptions), 1) + subscription = subscriptions[0] + self.assertIsInstance(subscription, dict) + self.assertEqual(subscription['name'], self.SUB_PATH) + self.assertEqual(subscription['topic'], self.TOPIC_PATH) + self.assertEqual(subscription['pushConfig'], + {'pushEndpoint': self.PUSH_ENDPOINT}) + self.assertEqual(subscription['ackDeadlineSeconds'], 0) + self.assertEqual(next_token, NEW_TOKEN) + + name, options = gax_api._list_subscriptions_called_with + self.assertEqual(name, self.PROJECT_PATH) + self.assertEqual(options.page_token, TOKEN) def test_subscription_create(self): sub_pb = _SubscriptionPB(self.SUB_PATH, self.TOPIC_PATH, '', 0) From 128980609dc6b856d8fec653795d45516f13e995 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 13 Jun 2016 16:02:04 -0400 Subject: [PATCH 2/4] Loosen pin on 'grpcio'. Add explicit dep on 'google-gax'. --- setup.py | 3 ++- tox.ini | 16 +++++++++++++--- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/setup.py b/setup.py index 56ed96f41f58..15587a94a379 100644 --- a/setup.py +++ b/setup.py @@ -19,7 +19,8 @@ ] GRPC_EXTRAS = [ - 'grpcio == 0.13.1', + 'grpcio >= 0.14.0', + 'google-gax >= 0.11.0', 'gax-google-pubsub-v1', ] diff --git a/tox.ini b/tox.ini index 593929fcebe7..1936e43c354c 100644 --- a/tox.ini +++ b/tox.ini @@ -21,12 +21,20 @@ covercmd = --cover-branches \ --nocapture +[grpc] +deps = + grpcio >= 0.14.0 + google-gax >= 0.11.0 + gax-google-pubsub-v1 + [testenv:py27] basepython = python2.7 commands = - pip --quiet install gcloud[grpc] nosetests +deps = + {[testenv]deps} + {[grpc]deps} setenv = PYTHONPATH = @@ -45,10 +53,10 @@ deps = basepython = python2.7 commands = - pip --quiet install gcloud[grpc] {[testenv]covercmd} --cover-min-percentage=100 deps = {[testenv]deps} + {[grpc]deps} coverage setenv = PYTHONPATH = @@ -112,10 +120,12 @@ passenv = {[testenv:system-tests]passenv} basepython = python2.7 commands = - pip --quiet install gcloud[grpc] python {toxinidir}/system_tests/attempt_system_tests.py setenv = PYTHONPATH = +deps = + {[testenv]deps} + {[grpc]deps} passenv = GOOGLE_* GCLOUD_* TRAVIS* encrypted_* [testenv:system-tests3] From 2cd1ae5eb1e71b954c8d32a87bf44ddbbb07008f Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 13 Jun 2016 16:04:41 -0400 Subject: [PATCH 3/4] Re-enable display of missing lines in coverage report. --- .coveragerc | 1 + 1 file changed, 1 insertion(+) diff --git a/.coveragerc b/.coveragerc index 240882dc7d07..280c5674f5bd 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,6 +1,7 @@ [report] omit = */_generated/*.py +show_missing = True exclude_lines = # Re-enable the standard pragma pragma: NO COVER From eec245d5ac3a458717cb86a805193314f62f87a6 Mon Sep 17 00:00:00 2001 From: Tres Seaver Date: Mon, 13 Jun 2016 16:11:11 -0400 Subject: [PATCH 4/4] Factor out handling of paging options. Addresses: https://github.com/GoogleCloudPlatform/gcloud-python/pull/1855#discussion_r66855168. --- gcloud/pubsub/_gax.py | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/gcloud/pubsub/_gax.py b/gcloud/pubsub/_gax.py index f26da28bdf84..90468f80fff9 100644 --- a/gcloud/pubsub/_gax.py +++ b/gcloud/pubsub/_gax.py @@ -29,6 +29,14 @@ from gcloud._helpers import _to_bytes +def _build_paging_options(page_token=None): + """Helper for :meth:'_PublisherAPI.list_topics' et aliae.""" + if page_token is None: + page_token = INITIAL_PAGE + options = {'page_token': page_token} + return CallOptions(**options) + + class _PublisherAPI(object): """Helper mapping publisher-related APIs. @@ -58,10 +66,7 @@ def list_topics(self, project, page_token=None): more topics can be retrieved with another call (pass that value as ``page_token``). """ - if page_token is None: - page_token = INITIAL_PAGE - options = {'page_token': page_token} - options = CallOptions(**options) + options = _build_paging_options(page_token) path = 'projects/%s' % (project,) response = self._gax_api.list_topics(path, options) topics = [{'name': topic_pb.name} for topic_pb in response.topics] @@ -182,10 +187,7 @@ def topic_list_subscriptions(self, topic_path, page_token=None): :raises: :exc:`gcloud.exceptions.NotFound` if the topic does not exist """ - if page_token is None: - page_token = INITIAL_PAGE - options = {'page_token': page_token} - options = CallOptions(**options) + options = _build_paging_options(page_token) try: response = self._gax_api.list_topic_subscriptions( topic_path, options) @@ -227,10 +229,7 @@ def list_subscriptions(self, project, page_token=None): more topics can be retrieved with another call (pass that value as ``page_token``). """ - if page_token is None: - page_token = INITIAL_PAGE - options = {'page_token': page_token} - options = CallOptions(**options) + options = _build_paging_options(page_token) path = 'projects/%s' % (project,) response = self._gax_api.list_subscriptions(path, options) subscriptions = [_subscription_pb_to_mapping(sub_pb)