Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Source google ads: fix for page token expired #9608

Merged
merged 11 commits into from
Jan 24, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,16 @@


def chunk_date_range(
start_date: str, conversion_window: int, field: str, end_date: str = None, time_unit: str = "months", days_of_data_storage: int = None
start_date: str,
conversion_window: int,
field: str,
end_date: str = None,
days_of_data_storage: int = None,
range_days: int = None,
) -> Iterable[Mapping[str, any]]:
"""
Passing optional parameter end_date for testing
Returns a list of the beginning and ending timetsamps of each month between the start date and now.
Returns a list of the beginning and ending timestamps of each `range_days` between the start date and now.
The return value is a list of dicts {'date': str} which can be used directly with the Slack API
"""
intervals = []
Expand All @@ -38,7 +43,7 @@ def chunk_date_range(
# Each stream_slice contains the beginning and ending timestamp for a 24 hour period
while start_date < end_date:
intervals.append({field: start_date.to_date_string()})
start_date = start_date.add(**{time_unit: 1})
start_date = start_date.add(days=range_days)

return intervals

Expand All @@ -64,7 +69,7 @@ class IncrementalGoogleAdsStream(GoogleAdsStream, ABC):
days_of_data_storage = None
cursor_field = "segments.date"
primary_key = None
time_unit = "months"
range_days = 15

def __init__(self, start_date: str, conversion_window_days: int, time_zone: [pendulum.timezone, str], **kwargs):
self.conversion_window_days = conversion_window_days
Expand All @@ -80,19 +85,17 @@ def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Ite
start_date=start_date,
conversion_window=self.conversion_window_days,
field=self.cursor_field,
time_unit=self.time_unit,
days_of_data_storage=self.days_of_data_storage,
range_days=self.range_days,
)

def get_date_params(
self, stream_slice: Mapping[str, Any], cursor_field: str, end_date: pendulum.datetime = None, time_unit: str = "months"
):
def get_date_params(self, stream_slice: Mapping[str, Any], cursor_field: str, end_date: pendulum.datetime = None):
end_date = end_date or pendulum.yesterday(tz=self.time_zone)
start_date = pendulum.parse(stream_slice.get(cursor_field))
if start_date > pendulum.now():
return start_date.to_date_string(), start_date.add(days=1).to_date_string()

end_date = min(end_date, pendulum.parse(stream_slice.get(cursor_field)).add(**{time_unit: 1}))
end_date = min(end_date, pendulum.parse(stream_slice.get(cursor_field)).add(days=self.range_days))

# Fix issue #4806, start date should always be lower than end date.
if start_date.add(days=1).date() >= end_date.date():
Expand All @@ -116,7 +119,7 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late
return current_stream_state

def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
start_date, end_date = self.get_date_params(stream_slice, self.cursor_field, time_unit=self.time_unit)
start_date, end_date = self.get_date_params(stream_slice, self.cursor_field)
query = GoogleAds.convert_schema_into_query(
schema=self.get_json_schema(), report_name=self.name, from_date=start_date, to_date=end_date, cursor_field=self.cursor_field
)
Expand Down Expand Up @@ -214,5 +217,5 @@ class ClickView(IncrementalGoogleAdsStream):
ClickView stream: https://developers.google.com/google-ads/api/reference/rpc/v8/ClickView
"""

time_unit = "days"
days_of_data_storage = 90
range_days = 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And description of this overriding of the value coul be usefel as well.

Copy link
Contributor Author

@augan-rymkhan augan-rymkhan Jan 21, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before the changes provided in this PR, in the class ClickView time_unit was overriden with "days" and
start_date = start_date.add(**{time_unit: 1}), the final result means start_date.add(days=1)
So, to save the behaviour, I just set range_days to 1.

Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,23 @@ def test_get_fields_from_schema():


def test_interval_chunking():
mock_intervals = [{"segments.date": "2021-05-18"}, {"segments.date": "2021-06-18"}, {"segments.date": "2021-07-18"}]
intervals = chunk_date_range("2021-06-01", 14, "segments.date", "2021-08-15")
mock_intervals = [
{"segments.date": "2021-06-17"},
{"segments.date": "2021-06-27"},
{"segments.date": "2021-07-07"},
{"segments.date": "2021-07-17"},
{"segments.date": "2021-07-27"},
{"segments.date": "2021-08-06"},
]
intervals = chunk_date_range("2021-07-01", 14, "segments.date", "2021-08-15", range_days=10)

assert mock_intervals == intervals


def test_get_date_params():
# Please note that this is equal to inputted stream_slice start date + 1 day
mock_start_date = "2021-05-19"
mock_end_date = "2021-06-18"
mock_end_date = "2021-06-02"
mock_conversion_window_days = 14

incremental_stream_config = dict(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,17 @@ def test_chunk_date_range():
end_date = "2021-05-04"
conversion_window = 14
field = "date"
response = chunk_date_range(start_date, conversion_window, field, end_date)
assert [{"date": "2021-02-18"}, {"date": "2021-03-18"}, {"date": "2021-04-18"}] == response
response = chunk_date_range(start_date, conversion_window, field, end_date, range_days=10)
assert [
{"date": "2021-02-18"},
{"date": "2021-02-28"},
{"date": "2021-03-10"},
{"date": "2021-03-20"},
{"date": "2021-03-30"},
{"date": "2021-04-09"},
{"date": "2021-04-19"},
{"date": "2021-04-29"},
] == response


def test_streams_count(config):
Expand Down