From 0b1b75bd0f11a1c7bc5fb802be2629e851edb0fc Mon Sep 17 00:00:00 2001 From: "oleh.zorenko" <19872253+Zirochkaa@users.noreply.github.com> Date: Mon, 31 Jan 2022 10:45:50 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9B=20Source=20Delighted:=20output=20o?= =?UTF-8?q?nly=20records=20in=20which=20cursor=20field=20is=20greater=20th?= =?UTF-8?q?an=20the=20value=20in=20state=20for=20incremental=20streams=20(?= =?UTF-8?q?#9550)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 8906 Output only records in which cursor field is greater than the value in state for incremental streams * 8906 Fix full refresh read for SurveyResponses stream * 8906 Add tests + update docs * 8906 Update docs * 8906 Bump connector's version --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-delighted/Dockerfile | 2 +- .../connectors/source-delighted/setup.py | 1 + .../source_delighted/source.py | 34 +++++++-- .../source-delighted/unit_tests/unit_test.py | 76 +++++++++++++++++++ docs/integrations/sources/delighted.md | 1 + 7 files changed, 110 insertions(+), 8 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index eec7acfbc7fe1..54acb2ba7f465 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -154,7 +154,7 @@ - name: Delighted sourceDefinitionId: cc88c43f-6f53-4e8a-8c4d-b284baaf9635 dockerRepository: airbyte/source-delighted - dockerImageTag: 0.1.2 + dockerImageTag: 0.1.3 documentationUrl: https://docs.airbyte.io/integrations/sources/delighted icon: delighted.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 40a16d8462613..03ab053f60b81 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -1307,7 +1307,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-delighted:0.1.2" +- dockerImage: "airbyte/source-delighted:0.1.3" spec: documentationUrl: "https://docsurl.com" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-delighted/Dockerfile b/airbyte-integrations/connectors/source-delighted/Dockerfile index 41c613ad0ff75..c098a13136594 100644 --- a/airbyte-integrations/connectors/source-delighted/Dockerfile +++ b/airbyte-integrations/connectors/source-delighted/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-delighted diff --git a/airbyte-integrations/connectors/source-delighted/setup.py b/airbyte-integrations/connectors/source-delighted/setup.py index e5f41ecef635a..096214a03e443 100644 --- a/airbyte-integrations/connectors/source-delighted/setup.py +++ b/airbyte-integrations/connectors/source-delighted/setup.py @@ -12,6 +12,7 @@ TEST_REQUIREMENTS = [ "pytest~=6.1", "source-acceptance-test", + "responses~=0.13.3", ] setup( diff --git a/airbyte-integrations/connectors/source-delighted/source_delighted/source.py b/airbyte-integrations/connectors/source-delighted/source_delighted/source.py index 80e738728afaa..796c1362410a8 100644 --- a/airbyte-integrations/connectors/source-delighted/source_delighted/source.py +++ b/airbyte-integrations/connectors/source-delighted/source_delighted/source.py @@ -18,7 +18,6 @@ # Basic full refresh stream class DelightedStream(HttpStream, ABC): - url_base = "https://api.delighted.com/v1/" # Page size @@ -41,10 +40,9 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: + params = {"per_page": self.limit, "since": self.since} if next_page_token: - params = {"per_page": self.limit, **next_page_token} - else: - params = {"per_page": self.limit, "since": self.since} + params.update(**next_page_token) return params def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: @@ -52,7 +50,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp class IncrementalDelightedStream(DelightedStream, ABC): - # Getting page size as 'limit' from parrent class + # Getting page size as 'limit' from parent class @property def limit(self): return super().limit @@ -73,8 +71,17 @@ def request_params(self, stream_state=None, **kwargs): params["since"] = stream_state.get(self.cursor_field) return params + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: + for record in super().parse_response(response=response, stream_state=stream_state, **kwargs): + if self.cursor_field not in stream_state or record[self.cursor_field] > stream_state[self.cursor_field]: + yield record + class People(IncrementalDelightedStream): + """ + API docs: https://app.delighted.com/docs/api/listing-people + """ + def path(self, **kwargs) -> str: return "people.json" @@ -86,6 +93,10 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, class Unsubscribes(IncrementalDelightedStream): + """ + API docs: https://app.delighted.com/docs/api/listing-unsubscribed-people + """ + cursor_field = "unsubscribed_at" primary_key = "person_id" @@ -94,6 +105,10 @@ def path(self, **kwargs) -> str: class Bounces(IncrementalDelightedStream): + """ + API docs: https://app.delighted.com/docs/api/listing-bounced-people + """ + cursor_field = "bounced_at" primary_key = "person_id" @@ -102,6 +117,10 @@ def path(self, **kwargs) -> str: class SurveyResponses(IncrementalDelightedStream): + """ + API docs: https://app.delighted.com/docs/api/listing-survey-responses + """ + cursor_field = "updated_at" def path(self, **kwargs) -> str: @@ -110,8 +129,13 @@ def path(self, **kwargs) -> str: def request_params(self, stream_state=None, **kwargs): stream_state = stream_state or {} params = super().request_params(stream_state=stream_state, **kwargs) + + if "since" in params: + params["updated_since"] = params.pop("since") + if stream_state: params["updated_since"] = stream_state.get(self.cursor_field) + return params diff --git a/airbyte-integrations/connectors/source-delighted/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-delighted/unit_tests/unit_test.py index e1814314fc3b0..b6eddc0ecae5a 100644 --- a/airbyte-integrations/connectors/source-delighted/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-delighted/unit_tests/unit_test.py @@ -2,6 +2,82 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # +import pytest +import responses +from airbyte_cdk.models import SyncMode +from source_delighted.source import Bounces, People, SourceDelighted, SurveyResponses, Unsubscribes + + +@pytest.fixture(scope="module") +def test_config(): + return { + "api_key": "test_api_key", + "since": "1641289584", + } + + +@pytest.fixture(scope="module") +def state(): + return { + "bounces": {"bounced_at": 1641455286}, + "people": {"created_at": 1641455285}, + "survey_responses": {"updated_at": 1641289816}, + "unsubscribes": {"unsubscribed_at": 1641289584}, + } + + +BOUNCES_RESPONSE = """ +[ + {"person_id": "1046789984", "email": "foo_test204@airbyte.io", "name": "Foo Test204", "bounced_at": 1641455286}, + {"person_id": "1046789989", "email": "foo_test205@airbyte.io", "name": "Foo Test205", "bounced_at": 1641455286} +] +""" + + +PEOPLE_RESPONSE = """ +[ + {"id": "1046789989", "name": "Foo Test205", "email": "foo_test205@airbyte.io", "created_at": 1641455285, "last_sent_at": 1641455285, "last_responded_at": null, "next_survey_scheduled_at": null} +] +""" + + +SURVEY_RESPONSES_RESPONSE = """ +[ + {"id": "210554887", "person": "1042205953", "survey_type": "nps", "score": 0, "comment": "Test Comment202", "permalink": "https://app.delighted.com/r/0q7QEdWzosv5G5c3w9gakivDwEIM5Hq0", "created_at": 1641289816, "updated_at": 1641289816, "person_properties": null, "notes": [], "tags": [], "additional_answers": []}, + {"id": "210554885", "person": "1042205947", "survey_type": "nps", "score": 5, "comment": "Test Comment201", "permalink": "https://app.delighted.com/r/GhWWrBT2wayswOc0AfT7fxpM3UwSpitN", "created_at": 1641289816, "updated_at": 1641289816, "person_properties": null, "notes": [], "tags": [], "additional_answers": []} +] +""" + + +UNSUBSCRIBES_RESPONSE = """ +[ + {"person_id": "1040826319", "email": "foo_test64@airbyte.io", "name": "Foo Test64", "unsubscribed_at": 1641289584} +] +""" + + +@pytest.mark.parametrize( + ("stream_class", "url", "response_body"), + [ + (Bounces, "https://api.delighted.com/v1/bounces.json", BOUNCES_RESPONSE), + (People, "https://api.delighted.com/v1/people.json", PEOPLE_RESPONSE), + (SurveyResponses, "https://api.delighted.com/v1/survey_responses.json", SURVEY_RESPONSES_RESPONSE), + (Unsubscribes, "https://api.delighted.com/v1/unsubscribes.json", UNSUBSCRIBES_RESPONSE), + ], +) +@responses.activate +def test_not_output_records_where_cursor_field_equals_state(state, test_config, stream_class, url, response_body): + responses.add( + responses.GET, + url, + body=response_body, + status=200, + ) + + stream = stream_class(test_config["since"], authenticator=SourceDelighted()._get_authenticator(config=test_config)) + records = [r for r in stream.read_records(SyncMode.incremental, stream_state=state[stream.name])] + assert not records + def test_example_method(): assert True diff --git a/docs/integrations/sources/delighted.md b/docs/integrations/sources/delighted.md index 89a6db716ed5c..430ba77c977fa 100644 --- a/docs/integrations/sources/delighted.md +++ b/docs/integrations/sources/delighted.md @@ -37,6 +37,7 @@ This connector supports `API PASSWORD` as the authentication method. | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.3 | 2022-01-31 | [9550](https://github.com/airbytehq/airbyte/pull/9550) | Output only records in which cursor field is greater than the value in state for incremental streams | | 0.1.2 | 2022-01-06 | [9333](https://github.com/airbytehq/airbyte/pull/9333) | Add incremental sync mode to streams in `integration_tests/configured_catalog.json` | | 0.1.1 | 2022-01-04 | [9275](https://github.com/airbytehq/airbyte/pull/9275) | Fix pagination handling for `survey_responses`, `bounces` and `unsubscribes` streams | | 0.1.0 | 2021-10-27 | [4551](https://github.com/airbytehq/airbyte/pull/4551) | Add Delighted source connector |