Skip to content

Commit

Permalink
🐛 Source Delighted: output only records in which cursor field is grea…
Browse files Browse the repository at this point in the history
…ter than the value in state for incremental streams (#9550)

* 8906 Output only records in which cursor field is greater than the value in state for incremental streams

* 8906 Fix full refresh read for SurveyResponses stream

* 8906 Add tests + update docs

* 8906 Update docs

* 8906 Bump connector's version
  • Loading branch information
Zirochkaa authored Jan 31, 2022
1 parent 8f22595 commit 0b1b75b
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@
- name: Delighted
sourceDefinitionId: cc88c43f-6f53-4e8a-8c4d-b284baaf9635
dockerRepository: airbyte/source-delighted
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/delighted
icon: delighted.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1307,7 +1307,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-delighted:0.1.2"
- dockerImage: "airbyte/source-delighted:0.1.3"
spec:
documentationUrl: "https://docsurl.com"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-delighted
1 change: 1 addition & 0 deletions airbyte-integrations/connectors/source-delighted/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
TEST_REQUIREMENTS = [
"pytest~=6.1",
"source-acceptance-test",
"responses~=0.13.3",
]

setup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

# Basic full refresh stream
class DelightedStream(HttpStream, ABC):

url_base = "https://api.delighted.com/v1/"

# Page size
Expand All @@ -41,18 +40,17 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
params = {"per_page": self.limit, "since": self.since}
if next_page_token:
params = {"per_page": self.limit, **next_page_token}
else:
params = {"per_page": self.limit, "since": self.since}
params.update(**next_page_token)
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
yield from response.json()


class IncrementalDelightedStream(DelightedStream, ABC):
# Getting page size as 'limit' from parrent class
# Getting page size as 'limit' from parent class
@property
def limit(self):
return super().limit
Expand All @@ -73,8 +71,17 @@ def request_params(self, stream_state=None, **kwargs):
params["since"] = stream_state.get(self.cursor_field)
return params

def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
for record in super().parse_response(response=response, stream_state=stream_state, **kwargs):
if self.cursor_field not in stream_state or record[self.cursor_field] > stream_state[self.cursor_field]:
yield record


class People(IncrementalDelightedStream):
"""
API docs: https://app.delighted.com/docs/api/listing-people
"""

def path(self, **kwargs) -> str:
return "people.json"

Expand All @@ -86,6 +93,10 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,


class Unsubscribes(IncrementalDelightedStream):
"""
API docs: https://app.delighted.com/docs/api/listing-unsubscribed-people
"""

cursor_field = "unsubscribed_at"
primary_key = "person_id"

Expand All @@ -94,6 +105,10 @@ def path(self, **kwargs) -> str:


class Bounces(IncrementalDelightedStream):
"""
API docs: https://app.delighted.com/docs/api/listing-bounced-people
"""

cursor_field = "bounced_at"
primary_key = "person_id"

Expand All @@ -102,6 +117,10 @@ def path(self, **kwargs) -> str:


class SurveyResponses(IncrementalDelightedStream):
"""
API docs: https://app.delighted.com/docs/api/listing-survey-responses
"""

cursor_field = "updated_at"

def path(self, **kwargs) -> str:
Expand All @@ -110,8 +129,13 @@ def path(self, **kwargs) -> str:
def request_params(self, stream_state=None, **kwargs):
stream_state = stream_state or {}
params = super().request_params(stream_state=stream_state, **kwargs)

if "since" in params:
params["updated_since"] = params.pop("since")

if stream_state:
params["updated_since"] = stream_state.get(self.cursor_field)

return params


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,82 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import pytest
import responses
from airbyte_cdk.models import SyncMode
from source_delighted.source import Bounces, People, SourceDelighted, SurveyResponses, Unsubscribes


@pytest.fixture(scope="module")
def test_config():
return {
"api_key": "test_api_key",
"since": "1641289584",
}


@pytest.fixture(scope="module")
def state():
return {
"bounces": {"bounced_at": 1641455286},
"people": {"created_at": 1641455285},
"survey_responses": {"updated_at": 1641289816},
"unsubscribes": {"unsubscribed_at": 1641289584},
}


BOUNCES_RESPONSE = """
[
{"person_id": "1046789984", "email": "foo_test204@airbyte.io", "name": "Foo Test204", "bounced_at": 1641455286},
{"person_id": "1046789989", "email": "foo_test205@airbyte.io", "name": "Foo Test205", "bounced_at": 1641455286}
]
"""


PEOPLE_RESPONSE = """
[
{"id": "1046789989", "name": "Foo Test205", "email": "foo_test205@airbyte.io", "created_at": 1641455285, "last_sent_at": 1641455285, "last_responded_at": null, "next_survey_scheduled_at": null}
]
"""


SURVEY_RESPONSES_RESPONSE = """
[
{"id": "210554887", "person": "1042205953", "survey_type": "nps", "score": 0, "comment": "Test Comment202", "permalink": "https://app.delighted.com/r/0q7QEdWzosv5G5c3w9gakivDwEIM5Hq0", "created_at": 1641289816, "updated_at": 1641289816, "person_properties": null, "notes": [], "tags": [], "additional_answers": []},
{"id": "210554885", "person": "1042205947", "survey_type": "nps", "score": 5, "comment": "Test Comment201", "permalink": "https://app.delighted.com/r/GhWWrBT2wayswOc0AfT7fxpM3UwSpitN", "created_at": 1641289816, "updated_at": 1641289816, "person_properties": null, "notes": [], "tags": [], "additional_answers": []}
]
"""


UNSUBSCRIBES_RESPONSE = """
[
{"person_id": "1040826319", "email": "foo_test64@airbyte.io", "name": "Foo Test64", "unsubscribed_at": 1641289584}
]
"""


@pytest.mark.parametrize(
("stream_class", "url", "response_body"),
[
(Bounces, "https://api.delighted.com/v1/bounces.json", BOUNCES_RESPONSE),
(People, "https://api.delighted.com/v1/people.json", PEOPLE_RESPONSE),
(SurveyResponses, "https://api.delighted.com/v1/survey_responses.json", SURVEY_RESPONSES_RESPONSE),
(Unsubscribes, "https://api.delighted.com/v1/unsubscribes.json", UNSUBSCRIBES_RESPONSE),
],
)
@responses.activate
def test_not_output_records_where_cursor_field_equals_state(state, test_config, stream_class, url, response_body):
responses.add(
responses.GET,
url,
body=response_body,
status=200,
)

stream = stream_class(test_config["since"], authenticator=SourceDelighted()._get_authenticator(config=test_config))
records = [r for r in stream.read_records(SyncMode.incremental, stream_state=state[stream.name])]
assert not records


def test_example_method():
assert True
1 change: 1 addition & 0 deletions docs/integrations/sources/delighted.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ This connector supports `API PASSWORD` as the authentication method.

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.3 | 2022-01-31 | [9550](https://github.com/airbytehq/airbyte/pull/9550) | Output only records in which cursor field is greater than the value in state for incremental streams |
| 0.1.2 | 2022-01-06 | [9333](https://github.com/airbytehq/airbyte/pull/9333) | Add incremental sync mode to streams in `integration_tests/configured_catalog.json` |
| 0.1.1 | 2022-01-04 | [9275](https://github.com/airbytehq/airbyte/pull/9275) | Fix pagination handling for `survey_responses`, `bounces` and `unsubscribes` streams |
| 0.1.0 | 2021-10-27 | [4551](https://github.com/airbytehq/airbyte/pull/4551) | Add Delighted source connector |

0 comments on commit 0b1b75b

Please sign in to comment.