diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/d8313939-3782-41b0-be29-b3ca20d8dd3a.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/d8313939-3782-41b0-be29-b3ca20d8dd3a.json index af8ad0f23e640..3e1baf8dd0463 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/d8313939-3782-41b0-be29-b3ca20d8dd3a.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/d8313939-3782-41b0-be29-b3ca20d8dd3a.json @@ -2,7 +2,7 @@ "sourceDefinitionId": "d8313939-3782-41b0-be29-b3ca20d8dd3a", "name": "Intercom", "dockerRepository": "airbyte/source-intercom", - "dockerImageTag": "0.1.11", + "dockerImageTag": "0.1.13", "documentationUrl": "https://docs.airbyte.io/integrations/sources/intercom", "icon": "intercom.svg" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 30b31d94d8219..1958801c835d0 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -321,7 +321,7 @@ - name: Intercom sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a dockerRepository: airbyte/source-intercom - dockerImageTag: 0.1.12 + dockerImageTag: 0.1.13 documentationUrl: https://docs.airbyte.io/integrations/sources/intercom icon: intercom.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index a413b67fed7d6..cf7cca7be7980 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3134,7 +3134,7 @@ oauthFlowInitParameters: [] oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-intercom:0.1.12" +- dockerImage: "airbyte/source-intercom:0.1.13" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/intercom" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-intercom/Dockerfile b/airbyte-integrations/connectors/source-intercom/Dockerfile index 7cea41697a667..2ae375afc8c42 100644 --- a/airbyte-integrations/connectors/source-intercom/Dockerfile +++ b/airbyte-integrations/connectors/source-intercom/Dockerfile @@ -35,5 +35,5 @@ COPY source_intercom ./source_intercom ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.12 +LABEL io.airbyte.version=0.1.13 LABEL io.airbyte.name=airbyte/source-intercom diff --git a/airbyte-integrations/connectors/source-intercom/source_intercom/source.py b/airbyte-integrations/connectors/source-intercom/source_intercom/source.py index 862b06985530a..8f99e9f473a51 100755 --- a/airbyte-integrations/connectors/source-intercom/source_intercom/source.py +++ b/airbyte-integrations/connectors/source-intercom/source_intercom/source.py @@ -171,6 +171,7 @@ class EndpointType(Enum): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._backoff_count = 0 + self._use_standard = False self._endpoint_type = self.EndpointType.scroll self._total_count = None # uses for saving of a total_count value once @@ -193,6 +194,9 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, return super().next_page_token(response) return None + def need_use_standard(self): + return not self.can_use_scroll() or self._use_standard + def can_use_scroll(self): """Check backoff count""" return self._backoff_count <= 3 @@ -202,38 +206,46 @@ def path(self, **kwargs) -> str: @classmethod def check_exists_scroll(cls, response: requests.Response) -> bool: - if response.status_code == 400: + if response.status_code in [400, 404]: # example response: # {..., "errors": [{'code': 'scroll_exists', 'message': 'scroll already exists for this workspace'}]} + # {..., "errors": [{'code': 'not_found', 'message':'scroll parameter not found'}]} err_body = response.json()["errors"][0] - if err_body["code"] == "scroll_exists": + if err_body["code"] in ["scroll_exists", "not_found"]: return True return False @property def raise_on_http_errors(self) -> bool: - if not self.can_use_scroll() and self._endpoint_type == self.EndpointType.scroll: + if self.need_use_standard() and self._endpoint_type == self.EndpointType.scroll: return False return True def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: yield None - if not self.can_use_scroll(): + if self.need_use_standard(): self._endpoint_type = self.EndpointType.standard yield None def should_retry(self, response: requests.Response) -> bool: if self.check_exists_scroll(response): self._backoff_count += 1 - if not self.can_use_scroll(): - self.logger.error("Can't create a new scroll request within an minute. " "Let's try to use a standard non-scroll endpoint.") + if self.need_use_standard(): + self.logger.error( + "Can't create a new scroll request within an minute or scroll param was expired. " + "Let's try to use a standard non-scroll endpoint." + ) return False return True return super().should_retry(response) def backoff_time(self, response: requests.Response) -> Optional[float]: + if response.status_code == 404: + self._use_standard = True + # Need return value greater than zero to use UserDefinedBackoffException class + return 0.01 if self.check_exists_scroll(response): self.logger.warning("A previous scroll request is exists. " "It must be deleted within an minute automatically") # try to check 3 times diff --git a/airbyte-integrations/connectors/source-intercom/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-intercom/unit_tests/unit_test.py index 3690bce61b080..40b0cfc2eb911 100644 --- a/airbyte-integrations/connectors/source-intercom/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-intercom/unit_tests/unit_test.py @@ -5,6 +5,7 @@ import pytest import requests +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http.auth import NoAuth from source_intercom.source import Companies, Contacts, IntercomStream @@ -46,3 +47,32 @@ def test_get_next_page_token(intercom_class, response_json, expected_output_toke test = intercom_class(authenticator=NoAuth).next_page_token(response) assert test == expected_output_token + + +def test_switch_to_standard_endpoint_if_scroll_expired(requests_mock): + """ + Test shows that if scroll param expired we try sync with standard API. + """ + + url = "https://api.intercom.io/companies/scroll" + requests_mock.get( + url, + json={"type": "company.list", "data": [{"type": "company", "id": "530370b477ad7120001d"}], "scroll_param": "expired_scroll_param"}, + ) + + url = "https://api.intercom.io/companies/scroll?scroll_param=expired_scroll_param" + requests_mock.get(url, json={"errors": [{"code": "not_found", "message": "scroll parameter not found"}]}, status_code=404) + + url = "https://api.intercom.io/companies" + requests_mock.get(url, json={"type": "company.list", "data": [{"type": "company", "id": "530370b477ad7120001d"}]}) + + stream1 = Companies(authenticator=NoAuth()) + + records = [] + + assert stream1._endpoint_type == Companies.EndpointType.scroll + + for slice in stream1.stream_slices(sync_mode=SyncMode.full_refresh): + records += list(stream1.read_records(sync_mode=SyncMode, stream_slice=slice)) + + assert stream1._endpoint_type == Companies.EndpointType.standard diff --git a/docs/integrations/sources/intercom.md b/docs/integrations/sources/intercom.md index 0d01b412efddc..537dba6b0a3f5 100644 --- a/docs/integrations/sources/intercom.md +++ b/docs/integrations/sources/intercom.md @@ -55,9 +55,10 @@ Please read [How to get your Access Token](https://developers.intercom.com/build | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.13 | 2022-01-14 | [9513](https://github.com/airbytehq/airbyte/pull/9513) | Added handling of scroll param when it expired | | 0.1.12 | 2021-12-14 | [8429](https://github.com/airbytehq/airbyte/pull/8429) | Updated fields and descriptions | | 0.1.11 | 2021-12-13 | [8685](https://github.com/airbytehq/airbyte/pull/8685) | Remove time.sleep for rate limit | -| 0.1.10 | 2021-12-10 | [8637](https://github.com/airbytehq/airbyte/pull/8637) | Fix 'conversations' order and sorting. Correction of the companies stream| +| 0.1.10 | 2021-12-10 | [8637](https://github.com/airbytehq/airbyte/pull/8637) | Fix 'conversations' order and sorting. Correction of the companies stream | | 0.1.9 | 2021-12-03 | [8395](https://github.com/airbytehq/airbyte/pull/8395) | Fix backoff of 'companies' stream | | 0.1.8 | 2021-11-09 | [7060](https://github.com/airbytehq/airbyte/pull/7060) | Added oauth support | | 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies | @@ -67,4 +68,4 @@ Please read [How to get your Access Token](https://developers.intercom.com/build | 0.1.3 | 2021-09-08 | [5908](https://github.com/airbytehq/airbyte/pull/5908) | Corrected timestamp and arrays in schemas | | 0.1.2 | 2021-08-19 | [5531](https://github.com/airbytehq/airbyte/pull/5531) | Corrected pagination | | 0.1.1 | 2021-07-31 | [5123](https://github.com/airbytehq/airbyte/pull/5123) | Corrected rate limit | -| 0.1.0 | 2021-07-19 | [4676](https://github.com/airbytehq/airbyte/pull/4676) | Release Slack CDK Connector | +| 0.1.0 | 2021-07-19 | [4676](https://github.com/airbytehq/airbyte/pull/4676) | Release Intercom CDK Connector |