From 6dc9e1e10d281828ca7503b310a16e262a906831 Mon Sep 17 00:00:00 2001 From: Kris Date: Wed, 27 Jul 2022 15:48:13 -0700 Subject: [PATCH 1/7] Mixpanel: Filtering out individual items based on datetime in state --- .../source-mixpanel/integration_tests/sample_state.json | 2 +- .../source-mixpanel/source_mixpanel/streams/base.py | 9 +++++++-- .../source-mixpanel/source_mixpanel/streams/export.py | 6 ++++-- .../source-mixpanel/unit_tests/test_streams.py | 9 +++++++-- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json index 8c40297a83d61..667b6a67d26d6 100644 --- a/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json @@ -7,6 +7,6 @@ "date": "2021-07-01" }, "export": { - "date": "2021-06-16" + "date": "2021-06-16T12:00:00" } } diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 97e46a9e02f18..20264acbdbba4 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -79,10 +79,15 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma for record in data: yield record - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def parse_response( + self, + response: requests.Response, + stream_state: Mapping[str, Any], + **kwargs, + ) -> Iterable[Mapping]: # parse the whole response - yield from self.process_response(response, **kwargs) + yield from self.process_response(response, stream_state=stream_state, **kwargs) if self.reqs_per_hour_limit > 0: # we skip this block, if self.reqs_per_hour_limit = 0, diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py index bac592bdc8fd5..e3cb0071d5355 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py @@ -84,7 +84,7 @@ def url_base(self): def path(self, **kwargs) -> str: return "export" - def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def process_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: """Export API return response in JSONL format but each line is a valid JSON object Raw item example: { @@ -123,7 +123,9 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma if item.get("time") and item["time"].isdigit(): item["time"] = datetime.fromtimestamp(int(item["time"])).isoformat() - yield item + # Even though the API is on DATE level, we still filter items compared to state cursor + if not "date" in stream_state or stream_state["date"] <= item["time"]: + yield item def get_json_schema(self) -> Mapping[str, Any]: """ diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 0f91b0424ee21..3020cb04274e5 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -407,7 +407,7 @@ def export_response(): { "event": "Viewed E-commerce Page", "properties": { - "time": 1623860880, + "time": 1623860880, # 2021-06-16T16:28:00 "distinct_id": "1d694fd9-31a5-4b99-9eef-ae63112063ed", "$browser": "Chrome", "$browser_version": "91.0.4472.101", @@ -426,10 +426,15 @@ def test_export_stream(requests_mock, export_response): stream = Export(authenticator=MagicMock()) requests_mock.register_uri("GET", get_url_to_mock(stream), export_response) - + stream_state = {"date" : "2021-06-16T17:00:00"} stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} # read records for single slice records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice) records_length = sum(1 for _ in records) assert records_length == 1 + + # no records should be returned when state older than the record + records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice, stream_state=stream_state) + records_length = sum(1 for _ in records) + assert records_length == 0 From 47739cb0a572efa9ec5d91caaf58030089ea70fb Mon Sep 17 00:00:00 2001 From: Kris Date: Wed, 27 Jul 2022 15:48:13 -0700 Subject: [PATCH 2/7] Mixpanel: Filtering out individual items based on datetime in state --- .../source-mixpanel/integration_tests/sample_state.json | 2 +- .../source-mixpanel/source_mixpanel/streams/base.py | 9 +++++++-- .../source-mixpanel/source_mixpanel/streams/export.py | 6 ++++-- .../source-mixpanel/unit_tests/test_streams.py | 9 +++++++-- 4 files changed, 19 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json index 8c40297a83d61..667b6a67d26d6 100644 --- a/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json +++ b/airbyte-integrations/connectors/source-mixpanel/integration_tests/sample_state.json @@ -7,6 +7,6 @@ "date": "2021-07-01" }, "export": { - "date": "2021-06-16" + "date": "2021-06-16T12:00:00" } } diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 97e46a9e02f18..20264acbdbba4 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -79,10 +79,15 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma for record in data: yield record - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def parse_response( + self, + response: requests.Response, + stream_state: Mapping[str, Any], + **kwargs, + ) -> Iterable[Mapping]: # parse the whole response - yield from self.process_response(response, **kwargs) + yield from self.process_response(response, stream_state=stream_state, **kwargs) if self.reqs_per_hour_limit > 0: # we skip this block, if self.reqs_per_hour_limit = 0, diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py index bac592bdc8fd5..e3cb0071d5355 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py @@ -84,7 +84,7 @@ def url_base(self): def path(self, **kwargs) -> str: return "export" - def process_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + def process_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: """Export API return response in JSONL format but each line is a valid JSON object Raw item example: { @@ -123,7 +123,9 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma if item.get("time") and item["time"].isdigit(): item["time"] = datetime.fromtimestamp(int(item["time"])).isoformat() - yield item + # Even though the API is on DATE level, we still filter items compared to state cursor + if not "date" in stream_state or stream_state["date"] <= item["time"]: + yield item def get_json_schema(self) -> Mapping[str, Any]: """ diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 0f91b0424ee21..3020cb04274e5 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -407,7 +407,7 @@ def export_response(): { "event": "Viewed E-commerce Page", "properties": { - "time": 1623860880, + "time": 1623860880, # 2021-06-16T16:28:00 "distinct_id": "1d694fd9-31a5-4b99-9eef-ae63112063ed", "$browser": "Chrome", "$browser_version": "91.0.4472.101", @@ -426,10 +426,15 @@ def test_export_stream(requests_mock, export_response): stream = Export(authenticator=MagicMock()) requests_mock.register_uri("GET", get_url_to_mock(stream), export_response) - + stream_state = {"date" : "2021-06-16T17:00:00"} stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} # read records for single slice records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice) records_length = sum(1 for _ in records) assert records_length == 1 + + # no records should be returned when state older than the record + records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice, stream_state=stream_state) + records_length = sum(1 for _ in records) + assert records_length == 0 From 1e2b45b7d50ef1a9aa434cff40a8251700ad8096 Mon Sep 17 00:00:00 2001 From: Kris Date: Mon, 8 Aug 2022 15:11:37 -0700 Subject: [PATCH 3/7] Mixpanel: Use where API option to further filter out events --- .../source_mixpanel/streams/export.py | 20 +++++++++++++------ .../unit_tests/test_streams.py | 5 ----- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py index e3cb0071d5355..80872cb7b076a 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py @@ -4,7 +4,7 @@ import json from datetime import datetime -from typing import Any, Iterable, Mapping +from typing import Any, Iterable, Mapping, MutableMapping import requests from airbyte_cdk.models import SyncMode @@ -74,7 +74,7 @@ class Export(DateSlicesMixin, IncrementalMixpanelStream): """ primary_key: str = None - cursor_field: str = "time" + cursor_field: str = "mp_processing_time_ms" @property def url_base(self): @@ -122,10 +122,7 @@ def process_response(self, response: requests.Response, stream_state: Mapping[st # convert timestamp to datetime string if item.get("time") and item["time"].isdigit(): item["time"] = datetime.fromtimestamp(int(item["time"])).isoformat() - - # Even though the API is on DATE level, we still filter items compared to state cursor - if not "date" in stream_state or stream_state["date"] <= item["time"]: - yield item + yield item def get_json_schema(self) -> Mapping[str, Any]: """ @@ -151,3 +148,14 @@ def get_json_schema(self) -> Mapping[str, Any]: schema["properties"][result.transformed_name] = {"type": ["null", "string"]} return schema + + def request_params( + self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None + ) -> MutableMapping[str, Any]: + mapping = { + "from_date": stream_slice["start_date"], + "to_date": stream_slice["end_date"] + } + if "date" in stream_state: + mapping["where"] = f"properties[\"$time\"]>=datetime({int(datetime.fromisoformat(stream_state['date']).timestamp())})" + return mapping diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 3020cb04274e5..6ad27fefe04ce 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -433,8 +433,3 @@ def test_export_stream(requests_mock, export_response): records_length = sum(1 for _ in records) assert records_length == 1 - - # no records should be returned when state older than the record - records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice, stream_state=stream_state) - records_length = sum(1 for _ in records) - assert records_length == 0 From de4882303cacda1d72a7f090d7c02adcf8dabd84 Mon Sep 17 00:00:00 2001 From: Kris Date: Wed, 10 Aug 2022 10:36:29 -0700 Subject: [PATCH 4/7] Fixing unit tests --- .../source_mixpanel/streams/export.py | 4 ++-- .../unit_tests/test_streams.py | 20 ++++++++++++++----- 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py index 5d2dce80a5696..c4541e0b9894a 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py @@ -157,6 +157,6 @@ def request_params( "from_date": stream_slice["start_date"], "to_date": stream_slice["end_date"] } - if "date" in stream_state: - mapping["where"] = f"properties[\"$time\"]>=datetime({int(datetime.fromisoformat(stream_state['date']).timestamp())})" + if stream_state and "date" in stream_state: + mapping["where"] = f"properties[\"$time\"]>=datetime({int(datetime.fromisoformat(stream_state['date']).timestamp())})" return mapping diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 3020cb04274e5..09c9fd873d9ac 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -426,7 +426,6 @@ def test_export_stream(requests_mock, export_response): stream = Export(authenticator=MagicMock()) requests_mock.register_uri("GET", get_url_to_mock(stream), export_response) - stream_state = {"date" : "2021-06-16T17:00:00"} stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} # read records for single slice records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice) @@ -434,7 +433,18 @@ def test_export_stream(requests_mock, export_response): records_length = sum(1 for _ in records) assert records_length == 1 - # no records should be returned when state older than the record - records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice, stream_state=stream_state) - records_length = sum(1 for _ in records) - assert records_length == 0 + +def test_export_stream_request_params(): + stream = Export(authenticator=MagicMock()) + stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"} + stream_state = {"date": "2021-06-16T17:00:00"} + + request_params = stream.request_params(stream_state=None, stream_slice=stream_slice) + assert "where" not in request_params + + request_params = stream.request_params(stream_state={}, stream_slice=stream_slice) + assert "where" not in request_params + + request_params = stream.request_params(stream_state=stream_state, stream_slice=stream_slice) + assert "where" in request_params + assert request_params.get("where") == "properties[\"$time\"]>=datetime(1623888000)" From de2b1dc1747c3e5a58cf59bc8f757a06ae9954c1 Mon Sep 17 00:00:00 2001 From: Kris Date: Thu, 11 Aug 2022 16:10:54 -0700 Subject: [PATCH 5/7] Fixing unit test timezone issues --- .../connectors/source-mixpanel/unit_tests/test_streams.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 09c9fd873d9ac..2728f8a6f008f 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -1,7 +1,7 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # - +import datetime from datetime import timedelta from unittest.mock import MagicMock @@ -447,4 +447,5 @@ def test_export_stream_request_params(): request_params = stream.request_params(stream_state=stream_state, stream_slice=stream_slice) assert "where" in request_params - assert request_params.get("where") == "properties[\"$time\"]>=datetime(1623888000)" + timestamp = int(datetime.datetime.fromisoformat("2021-06-16T17:00:00").timestamp()) + assert request_params.get("where") == f"properties[\"$time\"]>=datetime({timestamp})" From e3ea523ade166707dc07ac46e33f1bf8afb3e507 Mon Sep 17 00:00:00 2001 From: Kris Date: Mon, 22 Aug 2022 11:22:09 -0700 Subject: [PATCH 6/7] Version bump + docs --- .../init/src/main/resources/seed/source_definitions.yaml | 2 +- airbyte-config/init/src/main/resources/seed/source_specs.yaml | 2 +- airbyte-integrations/connectors/source-mixpanel/Dockerfile | 2 +- docs/integrations/sources/mixpanel.md | 1 + 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 9750a5fffdf46..9cfe5921ecda6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -615,7 +615,7 @@ - name: Mixpanel sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a dockerRepository: airbyte/source-mixpanel - dockerImageTag: 0.1.19 + dockerImageTag: 0.1.20 documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel icon: mixpanel.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index b06892ddfc9ef..29cca3f1ed71e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5685,7 +5685,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-mixpanel:0.1.19" +- dockerImage: "airbyte/source-mixpanel:0.1.20" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-mixpanel/Dockerfile b/airbyte-integrations/connectors/source-mixpanel/Dockerfile index 51152d5dda225..1d81d3f9b885a 100644 --- a/airbyte-integrations/connectors/source-mixpanel/Dockerfile +++ b/airbyte-integrations/connectors/source-mixpanel/Dockerfile @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.19 +LABEL io.airbyte.version=0.1.20 LABEL io.airbyte.name=airbyte/source-mixpanel diff --git a/docs/integrations/sources/mixpanel.md b/docs/integrations/sources/mixpanel.md index 1f0e0f0ba1a46..c075223be5e47 100644 --- a/docs/integrations/sources/mixpanel.md +++ b/docs/integrations/sources/mixpanel.md @@ -61,6 +61,7 @@ Please note, that incremental sync could return duplicated \(old records\) for t | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------| +| 0.1.20 | 2022-08-22 | [15091](https://github.com/airbytehq/airbyte/pull/15091) | Improve `export` stream cursor support | | 0.1.19 | 2022-08-18 | [15739](https://github.com/airbytehq/airbyte/pull/15739) | Update `titile` and `description` for `Project Secret` field | | 0.1.18 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from schemas and specs | | 0.1.17 | 2022-06-01 | [12801](https://github.com/airbytehq/airbyte/pull/13372) | Acceptance tests fix, fixing some bugs for beta release | From 3ab29dc32337c32f8d49afc3427527a5c95aadd7 Mon Sep 17 00:00:00 2001 From: Octavia Squidington III Date: Tue, 23 Aug 2022 04:35:00 +0000 Subject: [PATCH 7/7] auto-bump connector version [ci skip] --- .../source-mixpanel/source_mixpanel/streams/base.py | 8 ++++---- .../source-mixpanel/source_mixpanel/streams/export.py | 5 +---- .../connectors/source-mixpanel/unit_tests/test_streams.py | 5 +++-- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py index 20264acbdbba4..44abe6d756282 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/base.py @@ -80,10 +80,10 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma yield record def parse_response( - self, - response: requests.Response, - stream_state: Mapping[str, Any], - **kwargs, + self, + response: requests.Response, + stream_state: Mapping[str, Any], + **kwargs, ) -> Iterable[Mapping]: # parse the whole response diff --git a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py index c4541e0b9894a..10b693a7d38f3 100644 --- a/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py +++ b/airbyte-integrations/connectors/source-mixpanel/source_mixpanel/streams/export.py @@ -153,10 +153,7 @@ def get_json_schema(self) -> Mapping[str, Any]: def request_params( self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None ) -> MutableMapping[str, Any]: - mapping = { - "from_date": stream_slice["start_date"], - "to_date": stream_slice["end_date"] - } + mapping = {"from_date": stream_slice["start_date"], "to_date": stream_slice["end_date"]} if stream_state and "date" in stream_state: mapping["where"] = f"properties[\"$time\"]>=datetime({int(datetime.fromisoformat(stream_state['date']).timestamp())})" return mapping diff --git a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py index 2728f8a6f008f..dc8ebc808c8c1 100644 --- a/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-mixpanel/unit_tests/test_streams.py @@ -1,6 +1,7 @@ # # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # + import datetime from datetime import timedelta from unittest.mock import MagicMock @@ -407,7 +408,7 @@ def export_response(): { "event": "Viewed E-commerce Page", "properties": { - "time": 1623860880, # 2021-06-16T16:28:00 + "time": 1623860880, # 2021-06-16T16:28:00 "distinct_id": "1d694fd9-31a5-4b99-9eef-ae63112063ed", "$browser": "Chrome", "$browser_version": "91.0.4472.101", @@ -448,4 +449,4 @@ def test_export_stream_request_params(): request_params = stream.request_params(stream_state=stream_state, stream_slice=stream_slice) assert "where" in request_params timestamp = int(datetime.datetime.fromisoformat("2021-06-16T17:00:00").timestamp()) - assert request_params.get("where") == f"properties[\"$time\"]>=datetime({timestamp})" + assert request_params.get("where") == f'properties["$time"]>=datetime({timestamp})'