Skip to content

Commit

Permalink
Merge branch 'alex/datetimeFormatTimestamp' into alex/configbasedsend…
Browse files Browse the repository at this point in the history
…grid
  • Loading branch information
girarda committed Aug 9, 2022
2 parents 028bdfb + 28f0588 commit 99caa58
Show file tree
Hide file tree
Showing 5 changed files with 107 additions and 26 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime
from typing import Union


class DatetimeParser:
"""
Parses and formats datetime objects according to a specified format.
This class mainly acts as a wrapper to properly handling timestamp formatting through the "%s" directive.
%s is part of the list of format codes required by the 1989 C standard, but it is unreliable because it ignores the time zone information
Instead of using the directive directly, we can use datetime.fromtimestamp and dt.timestamp()
"""

def parse(self, date: Union[str, int], format: str, timezone):
# "%s" is a valid (but unreliable) directive for formatting, but not for parsing
# It is defined as
# The number of seconds since the Epoch, 1970-01-01 00:00:00+0000 (UTC). https://man7.org/linux/man-pages/man3/strptime.3.html
#
# The recommended way to parse a date from its timestamp representation is to use datetime.fromtimestamp
# See https://stackoverflow.com/a/4974930
if format == "%s":
return datetime.datetime.fromtimestamp(int(date), tz=timezone)
else:
return datetime.datetime.strptime(str(date), format).replace(tzinfo=timezone)

def format(self, dt: datetime.datetime, format: str) -> str:
# strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on
# It's safer to use the timestamp() method than the %s directive
# See https://stackoverflow.com/a/4974930
if format == "%s":
return str(int(dt.timestamp()))
else:
return dt.strftime(format)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dataclasses import InitVar, dataclass, field
from typing import Any, Mapping, Union

from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from dataclasses_jsonschema import JsonSchemaMixin

Expand Down Expand Up @@ -40,6 +41,7 @@ class MinMaxDatetime(JsonSchemaMixin):
def __post_init__(self, options: Mapping[str, Any]):
self.datetime = InterpolatedString.create(self.datetime, options=options or {})
self.timezone = dt.timezone.utc
self._parser = DatetimeParser()
self.min_datetime = InterpolatedString.create(self.min_datetime, options=options) if self.min_datetime else None
self.max_datetime = InterpolatedString.create(self.max_datetime, options=options) if self.max_datetime else None

Expand All @@ -57,17 +59,13 @@ def get_datetime(self, config, **additional_options) -> dt.datetime:
if not datetime_format:
datetime_format = "%Y-%m-%dT%H:%M:%S.%f%z"

time = dt.datetime.strptime(str(self.datetime.eval(config, **additional_options)), datetime_format).replace(tzinfo=self._timezone)
time = self._parser.parse(str(self.datetime.eval(config, **additional_options)), datetime_format, self.timezone)

if self.min_datetime:
min_time = dt.datetime.strptime(str(self.min_datetime.eval(config, **additional_options)), datetime_format).replace(
tzinfo=self._timezone
)
min_time = self._parser.parse(str(self.min_datetime.eval(config, **additional_options)), datetime_format, self.timezone)
time = max(time, min_time)
if self.max_datetime:
max_time = dt.datetime.strptime(str(self.max_datetime.eval(config, **additional_options)), datetime_format).replace(
tzinfo=self._timezone
)
max_time = self._parser.parse(str(self.max_datetime.eval(config, **additional_options)), datetime_format, self.timezone)
time = min(time, max_time)
return time

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,10 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from typing import Any, Iterable, Mapping, Optional, Union
from typing import Any, Iterable, Mapping, Optional

from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser
from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation
Expand Down Expand Up @@ -77,6 +78,7 @@ def __post_init__(self, options: Mapping[str, Any]):
self.cursor_field = InterpolatedString.create(self.cursor_field, options=options)
self.stream_slice_field_start = InterpolatedString.create(self.stream_state_field_start or "start_time", options=options)
self.stream_slice_field_end = InterpolatedString.create(self.stream_state_field_end or "end_time", options=options)
self._parser = DatetimeParser()

# If datetime format is not specified then start/end datetime should inherit it from the stream slicer
if not self.start_datetime.datetime_format:
Expand Down Expand Up @@ -142,7 +144,12 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->

start_datetime = max(cursor_datetime, start_datetime)

state_date = self.parse_date(stream_state.get(self.cursor_field.eval(self.config, stream_state=stream_state)))
state_cursor_value = stream_state.get(self.cursor_field.eval(self.config, stream_state=stream_state))

if state_cursor_value:
state_date = self.parse_date(state_cursor_value)
else:
state_date = None
if state_date:
# If the input_state's date is greater than start_datetime, the start of the time window is the state's next day
next_date = state_date + datetime.timedelta(days=1)
Expand All @@ -151,13 +158,7 @@ def stream_slices(self, sync_mode: SyncMode, stream_state: Mapping[str, Any]) ->
return dates

def _format_datetime(self, dt: datetime.datetime):
# strftime("%s") is unreliable because it ignores the time zone information and assumes the time zone of the system it's running on
# It's safer to use the timestamp() method than the %s directive
# See https://stackoverflow.com/a/4974930
if self.datetime_format == "%s":
return str(int(dt.timestamp()))
else:
return dt.strftime(self.datetime_format)
return self._parser.format(dt, self.datetime_format)

def _partition_daterange(self, start, end, step: datetime.timedelta):
start_field = self.stream_slice_field_start.eval(self.config)
Expand All @@ -170,14 +171,11 @@ def _partition_daterange(self, start, end, step: datetime.timedelta):
return dates

def _get_date(self, cursor_value, default_date: datetime.datetime, comparator) -> datetime.datetime:
cursor_date = self.parse_date(cursor_value or default_date)
cursor_date = cursor_value or default_date
return comparator(cursor_date, default_date)

def parse_date(self, date: Union[str, datetime.datetime]) -> datetime.datetime:
if isinstance(date, str):
return datetime.datetime.strptime(str(date), self.datetime_format).replace(tzinfo=self._timezone)
else:
return date
def parse_date(self, date: str) -> datetime.datetime:
return self._parser.parse(date, self.datetime_format, self._timezone)

@classmethod
def _parse_timedelta(cls, time_str):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime

import pytest
from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser


@pytest.mark.parametrize(
"test_name, input_date, date_format, expected_output_date",
[
(
"test_parse_date_iso",
"2021-01-01T00:00:00.000000+0000",
"%Y-%m-%dT%H:%M:%S.%f%z",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
(
"test_parse_timestamp",
"1609459200",
"%s",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
("test_parse_date_number", "20210101", "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
],
)
def test_parse_date(test_name, input_date, date_format, expected_output_date):
parser = DatetimeParser()
output_date = parser.parse(input_date, date_format, datetime.timezone.utc)
assert expected_output_date == output_date


@pytest.mark.parametrize(
"test_name, input_dt, datetimeformat, expected_output",
[
("test_format_timestamp", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%s", "1609459200"),
("test_format_string", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y-%m-%d", "2021-01-01"),
("test_format_to_number", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y%m%d", "20210101"),
],
)
def test_format_datetime(test_name, input_dt, datetimeformat, expected_output):
parser = DatetimeParser()
output_date = parser.format(input_dt, datetimeformat)
assert expected_output == output_date
Original file line number Diff line number Diff line change
Expand Up @@ -454,13 +454,13 @@ def test_request_option(test_name, inject_into, field_name, expected_req_params,
"%Y-%m-%dT%H:%M:%S.%f%z",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
("test_parse_date_number", "20210101", "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
(
"test_parse_date_datetime",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
"%Y%m%d",
"test_parse_timestamp",
"1609459200",
"%s",
datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc),
),
("test_parse_date_number", "20210101", "%Y%m%d", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc)),
],
)
def test_parse_date(test_name, input_date, date_format, expected_output_date):
Expand All @@ -483,6 +483,7 @@ def test_parse_date(test_name, input_date, date_format, expected_output_date):
[
("test_format_timestamp", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%s", "1609459200"),
("test_format_string", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y-%m-%d", "2021-01-01"),
("test_format_to_number", datetime.datetime(2021, 1, 1, 0, 0, tzinfo=datetime.timezone.utc), "%Y%m%d", "20210101"),
],
)
def test_format_datetime(test_name, input_dt, datetimeformat, expected_output):
Expand Down

0 comments on commit 99caa58

Please sign in to comment.