Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source Amplitude: add error descriptions and fix events stream fail on 404 #12430

Merged
merged 5 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
- name: Amplitude
sourceDefinitionId: fa9f58c6-2d03-4237-aaa4-07d75e0c1396
dockerRepository: airbyte/source-amplitude
dockerImageTag: 0.1.4
dockerImageTag: 0.1.5
documentationUrl: https://docs.airbyte.io/integrations/sources/amplitude
icon: amplitude.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-amplitude:0.1.4"
- dockerImage: "airbyte/source-amplitude:0.1.5"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/amplitude"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]

LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.name=airbyte/source-amplitude
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.streams.http import HttpStream

from .errors import HTTP_ERROR_CODES, error_msg_from_status


class AmplitudeStream(HttpStream, ABC):

Expand All @@ -27,8 +29,12 @@ def next_page_token(self, response: requests.Response) -> Optional[Mapping[str,
return None

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()
yield from response_data.get(self.name, [])
status = response.status_code
if status in HTTP_ERROR_CODES.keys():
error_msg_from_status(status)
yield from []
else:
yield from response.json().get(self.data_field, [])

def path(self, **kwargs) -> str:
return f"{self.api_version}/{self.name}"
Expand All @@ -37,14 +43,12 @@ def path(self, **kwargs) -> str:
class Cohorts(AmplitudeStream):
primary_key = "id"
api_version = 3
data_field = "cohorts"


class Annotations(AmplitudeStream):
primary_key = "id"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json()
yield from response_data.get("data", [])
data_field = "data"


class IncrementalAmplitudeStream(AmplitudeStream, ABC):
Expand Down Expand Up @@ -124,6 +128,22 @@ def _parse_zip_file(self, zip_file: IO[bytes]) -> Iterable[Mapping]:
for record in file:
yield json.loads(record)

def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
slices = []
start = self._start_date
if stream_state:
start = pendulum.parse(stream_state.get(self.cursor_field))
end = pendulum.now()
while start <= end:
slices.append(
{
"start": start.strftime(self.date_template),
"end": self._get_end_date(start).strftime(self.date_template),
}
)
start = start.add(**self.time_interval)
return slices

def read_records(
self,
sync_mode: SyncMode,
Expand All @@ -132,34 +152,35 @@ def read_records(
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
stream_state = stream_state or {}
params = self.request_params(stream_state=stream_state, stream_slice=stream_slice, next_page_token=None)

# API returns data only when requested with a difference between 'start' and 'end' of 6 or more hours.
if pendulum.parse(params["start"]).add(hours=6) > pendulum.parse(params["end"]):
return []
start = pendulum.parse(stream_slice["start"]).add(hours=6)
end = pendulum.parse(stream_slice["end"])
if start > end:
yield from []

# sometimes the API throws a 404 error for not obvious reasons, we have to handle it and log it.
# for example, if there is no data from the specified time period, a 404 exception is thrown
# https://developers.amplitude.com/docs/export-api#status-codes

try:
self.logger.info(f"Fetching {self.name} time range: {start.strftime('%Y-%m-%d')} - {end.strftime('%Y-%m-%d')}")
yield from super().read_records(sync_mode, cursor_field, stream_slice, stream_state)
except requests.exceptions.HTTPError as error:
if error.response.status_code == 404:
self.logger.warn(f"Error during syncing {self.name} stream - {error}")
return []
status = error.response.status_code
if status in HTTP_ERROR_CODES.keys():
error_msg_from_status(status)
yield from []
else:
self.logger.error(f"Error during syncing {self.name} stream - {error}")
raise

def request_params(
self, stream_state: Mapping[str, Any], next_page_token: Mapping[str, Any] = None, **kwargs
) -> MutableMapping[str, Any]:
params = super().request_params(stream_state=stream_state, next_page_token=next_page_token, **kwargs)
if stream_state or next_page_token:
params["start"] = pendulum.parse(params["start"]).add(hours=1).strftime(self.date_template)
def request_params(self, stream_slice: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]:
params = self.base_params
params["start"] = pendulum.parse(stream_slice["start"]).strftime(self.date_template)
params["end"] = pendulum.parse(stream_slice["end"]).strftime(self.date_template)
return params

def path(
self, stream_state: Mapping[str, Any] = None, stream_slice: Mapping[str, Any] = None, next_page_token: Mapping[str, Any] = None
) -> str:
def path(self, **kwargs) -> str:
return f"{self.api_version}/export"


Expand All @@ -168,9 +189,10 @@ class ActiveUsers(IncrementalAmplitudeStream):
name = "active_users"
primary_key = "date"
time_interval = {"months": 1}
data_field = "data"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json().get("data", [])
response_data = response.json().get(self.data_field, [])
if response_data:
series = list(map(list, zip(*response_data["series"])))
for i, date in enumerate(response_data["xValues"]):
Expand All @@ -184,9 +206,10 @@ class AverageSessionLength(IncrementalAmplitudeStream):
name = "average_session_length"
primary_key = "date"
time_interval = {"days": 15}
data_field = "data"

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
response_data = response.json().get("data", [])
response_data = response.json().get(self.data_field, [])
if response_data:
# From the Amplitude documentation it follows that "series" is an array with one element which is itself
# an array that contains the average session length for each day.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import logging

LOGGER = logging.getLogger("airbyte")

HTTP_ERROR_CODES = {
400: {
"msg": "The file size of the exported data is too large. Shorten the time ranges and try again. The limit size is 4GB.",
"lvl": "ERROR",
},
404: {
"msg": "No data collected",
"lvl": "WARN",
},
504: {
"msg": "The amount of data is large causing a timeout. For large amounts of data, the Amazon S3 destination is recommended.",
"lvl": "ERROR",
},
}


def error_msg_from_status(status: int = None):
if status:
level = HTTP_ERROR_CODES[status]["lvl"]
message = HTTP_ERROR_CODES[status]["msg"]
if level == "ERROR":
LOGGER.error(message)
elif level == "WARN":
LOGGER.warn(message)
else:
LOGGER.error(f"Unknown error occured: code {status}")
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#

import airbyte_cdk.models
import pytest
import requests
from airbyte_cdk.models import SyncMode
from source_amplitude.api import Events


Expand All @@ -13,16 +13,29 @@ def __init__(self, status_code):
self.status_code = status_code


def test_http_error_handler(mocker):
def test_incremental_http_error_handler(mocker):
stream = Events(start_date="2021-01-01T00:00:00Z")
stream_slice = stream.stream_slices()[0]

mock_response = MockRequest(404)
send_request_mocker = mocker.patch.object(stream, "_send_request", side_effect=requests.HTTPError(**{"response": mock_response}))
with pytest.raises(StopIteration):
result = next(stream.read_records(sync_mode=airbyte_cdk.models.SyncMode.full_refresh))
result = next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
assert result == []

mock_response = MockRequest(403)
send_request_mocker.side_effect = requests.HTTPError(**{"response": mock_response})
with pytest.raises(requests.exceptions.HTTPError):
next(stream.read_records(sync_mode=airbyte_cdk.models.SyncMode.full_refresh))
next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))

mock_response = MockRequest(400)
send_request_mocker.side_effect = requests.HTTPError(**{"response": mock_response})
with pytest.raises(StopIteration):
result = next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
assert result == []

mock_response = MockRequest(504)
send_request_mocker.side_effect = requests.HTTPError(**{"response": mock_response})
with pytest.raises(StopIteration):
result = next(stream.read_records(sync_mode=SyncMode.full_refresh, stream_slice=stream_slice))
assert result == []
1 change: 1 addition & 0 deletions docs/integrations/sources/amplitude.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ Please read [How to get your API key and Secret key](https://help.amplitude.com/

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :----------------------------------------------------- | :------ |
| 0.1.5 | 2022-04-28 | [12430](https://github.com/airbytehq/airbyte/pull/12430) | Added HTTP error descriptions and fixed `Events` stream fail caused by `404` HTTP Error |
| 0.1.4 | 2021-12-23 | [8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
| 0.1.3 | 2021-10-12 | [6375](https://github.com/airbytehq/airbyte/pull/6375) | Log Transient 404 Error in Events stream |
| 0.1.2 | 2021-09-21 | [6353](https://github.com/airbytehq/airbyte/pull/6353) | Correct output schemas on cohorts, events, active\_users, and average\_session\_lengths streams |
Expand Down