Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

low-code connectors: fix parse and format methods #15326

Merged
merged 15 commits into from
Aug 9, 2022
3 changes: 3 additions & 0 deletions airbyte-cdk/python/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.72
- Bugfix: Fix bug in DatetimeStreamSlicer's parsing method

## 0.1.71
- Refactor declarative package to dataclasses
- Bugfix: Requester header always converted to string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class MinMaxDatetime(JsonSchemaMixin):
min_date, then min_date is returned. If date is greater than max_date, then max_date is returned.
If neither, the input date is returned.

The timestamp format accepts the same format codes as datetime.strfptime, which are
all the format codes required by the 1989 C standard.
Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html

Attributes:
datetime (Union[InterpolatedString, str]): InterpolatedString or string representing the datetime in the format specified by `datetime_format`
datetime_format (str): Format of the datetime passed as argument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,8 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional
from typing import Any, Iterable, Mapping, Optional, Union

import dateutil
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
Expand Down Expand Up @@ -35,6 +34,10 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):

For example, "1d" will produce windows of 1 day, and 2weeks windows of 2 weeks.

The timestamp format accepts the same format codes as datetime.strfptime, which are
all the format codes required by the 1989 C standard.
Full list of accepted format codes: https://man7.org/linux/man-pages/man3/strftime.3.html

Attributes:
start_datetime (MinMaxDatetime): the datetime that determines the earliest record that should be synced
end_datetime (MinMaxDatetime): the datetime that determines the last record that should be synced
Expand Down Expand Up @@ -128,7 +131,7 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
"""
stream_state = stream_state or {}
kwargs = {"stream_state": stream_state}
end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=datetime.timezone.utc))
end_datetime = min(self.end_datetime.get_datetime(self.config, **kwargs), datetime.datetime.now(tz=self._timezone))
lookback_delta = self._parse_timedelta(self.lookback_window.eval(self.config, **kwargs) if self.lookback_window else "0d")
start_datetime = self.start_datetime.get_datetime(self.config, **kwargs) - lookback_delta
start_datetime = min(start_datetime, end_datetime)
Expand All @@ -148,8 +151,11 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
return dates

def _format_datetime(self, dt: datetime.datetime):
if self.datetime_format == "timestamp":
return dt.timestamp()
# strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on
# It's safer to use the timestamp() method than the %s directive
# See https://stackoverflow.com/a/4974930
if self.datetime_format == "%s":
return str(int(dt.timestamp()))
else:
return dt.strftime(self.datetime_format)

Expand All @@ -167,22 +173,11 @@ def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) -
cursor_date = self.parse_date(cursor_value or default_date)
return comparator(cursor_date, default_date)

def parse_date(self, date: Any) -> datetime:
if date and isinstance(date, str):
if self.is_int(date):
return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone)
else:
return dateutil.parser.parse(date).replace(tzinfo=self._timezone)
elif isinstance(date, int):
return datetime.datetime.fromtimestamp(int(date)).replace(tzinfo=self._timezone)
return date

def is_int(self, s) -> bool:
try:
int(s)
return True
except ValueError:
return False
def parse_date(self, date: Union[str, datetime.datetime]) -> datetime.datetime:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't place the comment in the right spot so adding it here. Can you clarify in the doc string for datetime_format what the reference for formats is? eg add a link to it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if isinstance(date, str):
return datetime.datetime.strptime(str(date), self.datetime_format).replace(tzinfo=self._timezone)
else:
return date

@classmethod
def _parse_timedelta(cls, time_str):
Expand Down
2 changes: 1 addition & 1 deletion airbyte-cdk/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

setup(
name="airbyte-cdk",
version="0.1.71",
version="0.1.72",
description="A framework for writing Airbyte Connectors.",
long_description=README,
long_description_content_type="text/markdown",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,5 +445,61 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params,
assert expected_body_data == slicer.get_request_body_data(stream_slice=stream_slice)


@pytest.mark.parametrize(
"test_name, input_date, date_format, expected_output_date",
[
(
"test_parse_date_iso",
"2021-01-01T00:00:00.000000+0000",
"%Y-%m-%dT%H:%M:%S.%f%z",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
("test_parse_date_number", "20210101", "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
(
"test_parse_date_datetime",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
"%Y%m%d",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
],
)
def test_parse_date(test_name, input_date, date_format, expected_output_date):
slicer = DatetimeStreamSlicer(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", options={}),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", options={}),
step="1d",
cursor_field=InterpolatedString(cursor_field, options={}),
datetime_format=date_format,
lookback_window=InterpolatedString("0d", options={}),
config=config,
options={},
)
output_date = slicer.parse_date(input_date)
assert expected_output_date == output_date


@pytest.mark.parametrize(
"test_name, input_dt, datetimeformat, expected_output",
[
("test_format_timestamp", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%s", "1609459200"),
("test_format_string", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y-%m-%d", "2021-01-01"),
],
)
def test_format_datetime(test_name, input_dt, datetimeformat, expected_output):
slicer = DatetimeStreamSlicer(
start_datetime=MinMaxDatetime("2021-01-01T00:00:00.000000+0000", options={}),
end_datetime=MinMaxDatetime("2021-01-10T00:00:00.000000+0000", options={}),
step="1d",
cursor_field=InterpolatedString(cursor_field, options={}),
datetime_format=datetimeformat,
lookback_window=InterpolatedString("0d", options={}),
config=config,
options={},
)

output_date = slicer._format_datetime(input_dt)
assert expected_output == output_date


if __name__ == "__main__":
unittest.main()