Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source Iterable: Add permission check for stream #17602

Merged
merged 3 commits into from
Oct 6, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.18
LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.name=airbyte/source-iterable
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ python -m pytest unit_tests
#### Build
First, make sure you build the latest Docker image:
```
docker build . -t airbyte/iterable:dev
docker build . -t airbyte/source-iterable:dev
```

You can also build the connector image via Gradle:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-iterable/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from setuptools import find_packages, setup

MAIN_REQUIREMENTS = [
"airbyte-cdk~=0.1",
"airbyte-cdk",
"pendulum~=2.1.2",
"requests~=2.25",
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
from pendulum.datetime import DateTime
from requests.exceptions import ChunkedEncodingError
from requests import codes
from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice

EVENT_ROWS_LIMIT = 200
CAMPAIGNS_PER_REQUEST = 20


class IterableStream(HttpStream, ABC):
raise_on_http_errors = True

# Hardcode the value because it is not returned from the API
BACKOFF_TIME_CONSTANT = 10.0
Expand All @@ -43,6 +45,13 @@ def data_field(self) -> str:
:return: Default field name to get data from response
"""

def check_unauthorized_key(self, response: requests.Response) -> bool:
if response.status_code == codes.UNAUTHORIZED:
self.logger.warn(f'Provided API Key has not sufficient permissions to read from stream: {self.data_field}')
setattr(self, "raise_on_http_errors", False)
return False
return True

def backoff_time(self, response: requests.Response) -> Optional[float]:
return self.BACKOFF_TIME_CONSTANT

Expand All @@ -53,12 +62,20 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
return None

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
response_json = response.json()
records = response_json.get(self.data_field, [])

for record in records:
yield record

def should_retry(self, response: requests.Response) -> bool:
if not self.check_unauthorized_key(response):
return False
else:
return super().should_retry(response)


class IterableExportStream(IterableStream, ABC):
"""
Expand Down Expand Up @@ -151,6 +168,8 @@ def request_params(
return params

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
return None
for obj in response.iter_lines():
record = json.loads(obj)
record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field])
Expand Down Expand Up @@ -301,6 +320,8 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
yield {"list_id": list_record["id"]}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
list_id = self._get_list_id(response.url)
for user in response.iter_lines():
yield {"email": user.decode(), "listId": list_id}
Expand Down Expand Up @@ -359,6 +380,8 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]:
yield {"campaign_ids": campaign_ids}

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
content = response.content.decode()
records = self._parse_csv_string_to_dict(content)

Expand Down Expand Up @@ -456,7 +479,8 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp
Put common event fields at the top level.
Put the rest of the fields in the `data` subobject.
"""

if not self.check_unauthorized_key(response):
yield from []
jsonl_records = StringIO(response.text)
for record in jsonl_records:
record_dict = json.loads(record)
Expand Down Expand Up @@ -618,6 +642,8 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg
yield from super().read_records(stream_slice=stream_slice, **kwargs)

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
if not self.check_unauthorized_key(response):
yield from []
response_json = response.json()
records = response_json.get(self.data_field, [])

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/iterable.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ The Iterable source connector supports the following [sync modes](https://docs.a

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------|
| 0.1.19 | 2022-10-05 | [17602](https://github.com/airbytehq/airbyte/pull/17602) | Add check for stream permissions |
| 0.1.18 | 2022-10-04 | [17573](https://github.com/airbytehq/airbyte/pull/17573) | Limit time range for SATs |
| 0.1.17 | 2022-09-02 | [16067](https://github.com/airbytehq/airbyte/pull/16067) | added new events streams |
| 0.1.16 | 2022-08-15 | [15670](https://github.com/airbytehq/airbyte/pull/15670) | Api key is passed via header |
Expand Down