Skip to content

Commit e43c53d

Browse files
antixarsherifnada
andauthored
🐛 Source Intercom: switching from scroll to standard endpoints (#8637)
* backoff for companies scroll * remove a unused companies stream property * fix tests * bump version * update source_specs * update scroll logic * update tests * Update airbyte-integrations/connectors/source-intercom/source_intercom/source.py Co-authored-by: Sherif A. Nada <snadalive@gmail.com> * update change log * update spec files Co-authored-by: Sherif A. Nada <snadalive@gmail.com>
1 parent e997612 commit e43c53d

File tree

6 files changed

+157
-47
lines changed

6 files changed

+157
-47
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/d8313939-3782-41b0-be29-b3ca20d8dd3a.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"sourceDefinitionId": "d8313939-3782-41b0-be29-b3ca20d8dd3a",
33
"name": "Intercom",
44
"dockerRepository": "airbyte/source-intercom",
5-
"dockerImageTag": "0.1.9",
5+
"dockerImageTag": "0.1.10",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/intercom",
77
"icon": "intercom.svg"
88
}

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@
302302
- name: Intercom
303303
sourceDefinitionId: d8313939-3782-41b0-be29-b3ca20d8dd3a
304304
dockerRepository: airbyte/source-intercom
305-
dockerImageTag: 0.1.9
305+
dockerImageTag: 0.1.10
306306
documentationUrl: https://docs.airbyte.io/integrations/sources/intercom
307307
icon: intercom.svg
308308
sourceType: api

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -2841,7 +2841,7 @@
28412841
oauthFlowInitParameters: []
28422842
oauthFlowOutputParameters:
28432843
- - "access_token"
2844-
- dockerImage: "airbyte/source-intercom:0.1.9"
2844+
- dockerImage: "airbyte/source-intercom:0.1.10"
28452845
spec:
28462846
documentationUrl: "https://docs.airbyte.io/integrations/sources/intercom"
28472847
connectionSpecification:

airbyte-integrations/connectors/source-intercom/integration_tests/integration_test.py

+68-27
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,17 @@
33
#
44

55
import json
6+
import pytest
67
import time
8+
from airbyte_cdk import AirbyteLogger
9+
from airbyte_cdk.models import SyncMode
710
from copy import deepcopy
811
from pathlib import Path
12+
from requests.exceptions import HTTPError
913
from typing import Mapping
14+
from unittest.mock import patch
1015

11-
import pytest
12-
from airbyte_cdk import AirbyteLogger
13-
from airbyte_cdk.models import SyncMode
14-
from requests.exceptions import HTTPError
15-
from source_intercom.source import Companies, SourceIntercom, VersionApiAuthenticator
16+
from source_intercom.source import Companies, ConversationParts, SourceIntercom, VersionApiAuthenticator
1617

1718
LOGGER = AirbyteLogger()
1819
# from unittest.mock import Mock
@@ -27,18 +28,19 @@ def stream_attributes() -> Mapping[str, str]:
2728
return json.load(json_file)
2829

2930

31+
@pytest.mark.skip(reason="need to refresh this test, it is very slow")
3032
@pytest.mark.parametrize(
3133
"version,not_supported_streams,custom_companies_data_field",
3234
(
33-
(1.0, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
34-
(1.1, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
35-
(1.2, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
36-
(1.3, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
37-
(1.4, ["company_segments"], "companies"),
38-
(2.0, [], "data"),
39-
(2.1, [], "data"),
40-
(2.2, [], "data"),
41-
(2.3, [], "data"),
35+
(1.0, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
36+
(1.1, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
37+
(1.2, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
38+
(1.3, ["company_segments", "company_attributes", "contact_attributes"], "companies"),
39+
(1.4, ["company_segments"], "companies"),
40+
(2.0, [], "data"),
41+
(2.1, [], "data"),
42+
(2.2, [], "data"),
43+
(2.3, [], "data"),
4244
),
4345
)
4446
def test_supported_versions(stream_attributes, version, not_supported_streams, custom_companies_data_field):
@@ -48,17 +50,17 @@ class CustomVersionApiAuthenticator(VersionApiAuthenticator):
4850
authenticator = CustomVersionApiAuthenticator(token=stream_attributes["access_token"])
4951
for stream in SourceIntercom().streams(deepcopy(stream_attributes)):
5052
stream._authenticator = authenticator
51-
5253
if stream.name == "companies":
5354
stream.data_fields = [custom_companies_data_field]
5455
elif hasattr(stream, "parent_stream_class") and stream.parent_stream_class == Companies:
5556
stream.parent_stream_class.data_fields = [custom_companies_data_field]
5657

57-
slices = list(stream.stream_slices(sync_mode=SyncMode.full_refresh))
5858
if stream.name in not_supported_streams:
5959
LOGGER.info(f"version {version} shouldn't be supported the stream '{stream.name}'")
6060
with pytest.raises(HTTPError) as err:
61-
next(stream.read_records(sync_mode=None, stream_slice=slices[0]), None)
61+
for slice in stream.stream_slices(sync_mode=SyncMode.full_refresh):
62+
next(stream.read_records(sync_mode=None, stream_slice=slice), None)
63+
break
6264
# example of response errors:
6365
# {"type": "error.list", "request_id": "000hjqhpf95ef3b8f8v0",
6466
# "errors": [{"code": "intercom_version_invalid", "message": "The requested version could not be found"}]}
@@ -67,12 +69,13 @@ class CustomVersionApiAuthenticator(VersionApiAuthenticator):
6769
LOGGER.info(f"version {version} doesn't support the stream '{stream.name}', error: {err_data}")
6870
else:
6971
LOGGER.info(f"version {version} should be supported the stream '{stream.name}'")
70-
records = stream.read_records(sync_mode=None, stream_slice=slices[0])
71-
if stream.name == "companies":
72-
# need to read all records for scroll resetting
73-
list(records)
74-
else:
75-
next(records, None)
72+
for slice in stream.stream_slices(sync_mode=SyncMode.full_refresh):
73+
records = stream.read_records(sync_mode=None, stream_slice=slice)
74+
if stream.name == "companies":
75+
# need to read all records for scroll resetting
76+
list(records)
77+
else:
78+
next(records, None)
7679

7780

7881
def test_companies_scroll(stream_attributes):
@@ -82,16 +85,54 @@ def test_companies_scroll(stream_attributes):
8285
stream3 = Companies(authenticator=authenticator)
8386

8487
# read the first stream and stop
85-
next(stream1.read_records(sync_mode=SyncMode.full_refresh))
88+
for slice in stream1.stream_slices(sync_mode=SyncMode.full_refresh):
89+
next(stream1.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice), None)
90+
break
8691

8792
start_time = time.time()
8893
# read all records
89-
records = list(stream2.read_records(sync_mode=SyncMode.full_refresh))
94+
records = []
95+
for slice in stream2.stream_slices(sync_mode=SyncMode.full_refresh):
96+
records += list(stream2.read_records(sync_mode=SyncMode, stream_slice=slice))
9097
assert len(records) == 3
9198
assert (time.time() - start_time) > 60.0
9299

93100
start_time = time.time()
94-
# read all records again
95-
records = list(stream3.read_records(sync_mode=SyncMode.full_refresh))
101+
# read all records
102+
records = []
103+
for slice in stream3.stream_slices(sync_mode=SyncMode.full_refresh):
104+
records += list(stream3.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice))
96105
assert len(records) == 3
97106
assert (time.time() - start_time) < 5.0
107+
108+
109+
@patch("source_intercom.source.Companies.can_use_scroll", lambda *args: False)
110+
def test_switch_to_standard_endpoint(stream_attributes):
111+
authenticator = VersionApiAuthenticator(token=stream_attributes["access_token"])
112+
stream1 = Companies(authenticator=authenticator)
113+
stream2 = Companies(authenticator=authenticator)
114+
stream3 = ConversationParts(authenticator=authenticator)
115+
116+
# read the first stream and stop
117+
for slice in stream1.stream_slices(sync_mode=SyncMode.full_refresh):
118+
next(stream1.read_records(sync_mode=SyncMode.full_refresh, stream_slice=slice), None)
119+
break
120+
121+
start_time = time.time()
122+
# read all records
123+
records = []
124+
assert stream2._endpoint_type == Companies.EndpointType.scroll
125+
for slice in stream2.stream_slices(sync_mode=SyncMode.full_refresh):
126+
records += list(stream2.read_records(sync_mode=SyncMode, stream_slice=slice))
127+
assert stream2._endpoint_type == Companies.EndpointType.standard
128+
assert stream2._total_count == 3
129+
assert len(records) == 3
130+
assert (time.time() - start_time) < 5.0
131+
132+
start_time = time.time()
133+
# read all children records
134+
records = []
135+
for slice in stream3.stream_slices(sync_mode=SyncMode.full_refresh):
136+
records += list(stream3.read_records(sync_mode=SyncMode, stream_slice=slice))
137+
assert len(records) == 12
138+
assert (time.time() - start_time) < 5.0

airbyte-integrations/connectors/source-intercom/source_intercom/source.py

+85-16
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import time
66
from abc import ABC
77
from datetime import datetime
8+
from enum import Enum
89
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple
910
from urllib.parse import parse_qsl, urlparse
1011

@@ -13,7 +14,8 @@
1314
from airbyte_cdk.sources import AbstractSource
1415
from airbyte_cdk.sources.streams import Stream
1516
from airbyte_cdk.sources.streams.http import HttpStream
16-
from airbyte_cdk.sources.streams.http.auth import HttpAuthenticator, TokenAuthenticator
17+
from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator
18+
from requests.auth import AuthBase
1719

1820

1921
class IntercomStream(HttpStream, ABC):
@@ -27,14 +29,24 @@ class IntercomStream(HttpStream, ABC):
2729

2830
def __init__(
2931
self,
30-
authenticator: HttpAuthenticator,
32+
authenticator: AuthBase,
3133
start_date: str = None,
3234
**kwargs,
3335
):
3436
self.start_date = start_date
3537

3638
super().__init__(authenticator=authenticator)
3739

40+
@property
41+
def authenticator(self):
42+
"""
43+
Fix of the bug when isinstance(authenticator, AuthBase) and
44+
default logic returns incorrect authenticator values
45+
"""
46+
if self._session.auth:
47+
return self._session.auth
48+
return super().authenticator
49+
3850
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
3951
"""
4052
Abstract method of HttpStream - should be overwritten.
@@ -95,7 +107,7 @@ def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mappin
95107
during the slicing.
96108
"""
97109

98-
if not stream_state or record[self.cursor_field] >= stream_state.get(self.cursor_field):
110+
if not stream_state or record[self.cursor_field] > stream_state.get(self.cursor_field):
99111
yield record
100112

101113
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
@@ -124,10 +136,12 @@ class ChildStreamMixin:
124136
parent_stream_class: Optional[IntercomStream] = None
125137

126138
def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
127-
for item in self.parent_stream_class(authenticator=self.authenticator, start_date=self.start_date).read_records(
128-
sync_mode=sync_mode
129-
):
130-
yield {"id": item["id"]}
139+
parent_stream = self.parent_stream_class(authenticator=self.authenticator, start_date=self.start_date)
140+
for slice in parent_stream.stream_slices(sync_mode=sync_mode):
141+
for item in self.parent_stream_class(
142+
authenticator=self.authenticator, start_date=self.start_date, stream_slice=slice
143+
).read_records(sync_mode=sync_mode):
144+
yield {"id": item["id"]}
131145

132146

133147
class Admins(IntercomStream):
@@ -144,24 +158,55 @@ def path(self, **kwargs) -> str:
144158

145159
class Companies(IncrementalIntercomStream):
146160
"""Return list of all companies.
147-
API Docs: https://developers.intercom.com/intercom-api-reference/reference#iterating-over-all-companies
148-
Endpoint: https://api.intercom.io/companies/scroll
161+
The Intercom API provides 2 similar endpoint for loading of companies:
162+
1) "standard" - https://developers.intercom.com/intercom-api-reference/reference#list-companies.
163+
But this endpoint does not work well for huge datasets and can have performance problems.
164+
2) "scroll" - https://developers.intercom.com/intercom-api-reference/reference#iterating-over-all-companies
165+
It has good performance but at same time only one script/client can use it across the client's entire account.
166+
167+
According to above circumstances no one endpoint can't be used permanently. That's why this stream tries can
168+
apply both endpoints according to the following logic:
169+
1) By default the stream tries to load data by "scroll" endpoint.
170+
2) Try to wait a "scroll" request within a minute (3 attempts with delay 20,5 seconds)
171+
if a "stroll" is busy by another script
172+
3) Switch to using of the "standard" endpoint.
149173
"""
150174

175+
class EndpointType(Enum):
176+
scroll = "companies/scroll"
177+
standard = "companies"
178+
179+
def __init__(self, *args, **kwargs):
180+
super().__init__(*args, **kwargs)
181+
self._backoff_count = 0
182+
self._endpoint_type = self.EndpointType.scroll
183+
self._total_count = None # uses for saving of a total_count value once
184+
151185
def next_page_token(self, response: requests.Response) -> Optional[Mapping[str, Any]]:
152186
"""For reset scroll needs to iterate pages untill the last.
153187
Another way need wait 1 min for the scroll to expire to get a new list for companies segments."""
154-
155188
data = response.json()
156-
scroll_param = data.get("scroll_param")
189+
if self._total_count is None and data.get("total_count"):
190+
self._total_count = data["total_count"]
191+
self.logger.info(f"found {self._total_count} companies")
192+
if self.can_use_scroll():
193+
194+
scroll_param = data.get("scroll_param")
195+
196+
# this stream always has only one data field
197+
data_field = self.data_fields[0]
198+
if scroll_param and data.get(data_field):
199+
return {"scroll_param": scroll_param}
200+
elif not data.get("errors"):
201+
return super().next_page_token(response)
202+
return None
157203

158-
# this stream always has only one data field
159-
data_field = self.data_fields[0]
160-
if scroll_param and data.get(data_field):
161-
return {"scroll_param": scroll_param}
204+
def can_use_scroll(self):
205+
"""Check backoff count"""
206+
return self._backoff_count <= 3
162207

163208
def path(self, **kwargs) -> str:
164-
return "companies/scroll"
209+
return self._endpoint_type.value
165210

166211
@classmethod
167212
def check_exists_scroll(cls, response: requests.Response) -> bool:
@@ -174,8 +219,25 @@ def check_exists_scroll(cls, response: requests.Response) -> bool:
174219

175220
return False
176221

222+
@property
223+
def raise_on_http_errors(self) -> bool:
224+
if not self.can_use_scroll() and self._endpoint_type == self.EndpointType.scroll:
225+
return False
226+
return True
227+
228+
def stream_slices(self, sync_mode, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
229+
yield None
230+
if not self.can_use_scroll():
231+
self._endpoint_type = self.EndpointType.standard
232+
yield None
233+
177234
def should_retry(self, response: requests.Response) -> bool:
178235
if self.check_exists_scroll(response):
236+
self._backoff_count += 1
237+
if not self.can_use_scroll():
238+
self.logger.error("Can't create a new scroll request within an minute. " "Let's try to use a standard non-scroll endpoint.")
239+
return False
240+
179241
return True
180242
return super().should_retry(response)
181243

@@ -186,6 +248,13 @@ def backoff_time(self, response: requests.Response) -> Optional[float]:
186248
return 20.5
187249
return super().backoff_time(response)
188250

251+
def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]:
252+
if not self.raise_on_http_errors:
253+
data = response.json()
254+
if data.get("errors"):
255+
return
256+
yield from super().parse_response(response, stream_state=stream_state, **kwargs)
257+
189258

190259
class CompanySegments(ChildStreamMixin, IncrementalIntercomStream):
191260
"""Return list of all company segments.

docs/integrations/sources/intercom.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ Please read [How to get your Access Token](https://developers.intercom.com/build
5555

5656
| Version | Date | Pull Request | Subject |
5757
| :--- | :--- | :--- | :--- |
58-
| 0.1.10 | 2021-12-07 | [8579](https://github.com/airbytehq/airbyte/pull/8579) | Fix 'conversations' order and sorting |
58+
| 0.1.10 | 2021-12-10 | [8637](https://github.com/airbytehq/airbyte/pull/8637) | Fix 'conversations' order and sorting. Correction of the companies stream|
5959
| 0.1.9 | 2021-12-03 | [8395](https://github.com/airbytehq/airbyte/pull/8395) | Fix backoff of 'companies' stream |
6060
| 0.1.8 | 2021-11-09 | [7060](https://github.com/airbytehq/airbyte/pull/7060) | Added oauth support |
6161
| 0.1.7 | 2021-11-08 | [7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |

0 commit comments

Comments
 (0)