Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mixpanel: Filtering out individual items based on datetime in state #15091

Merged
merged 10 commits into from
Aug 23, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@
- name: Mixpanel
sourceDefinitionId: 12928b32-bf0a-4f1e-964f-07e12e37153a
dockerRepository: airbyte/source-mixpanel
dockerImageTag: 0.1.19
dockerImageTag: 0.1.20
documentationUrl: https://docs.airbyte.io/integrations/sources/mixpanel
icon: mixpanel.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5685,7 +5685,7 @@
path_in_connector_config:
- "credentials"
- "client_secret"
- dockerImage: "airbyte/source-mixpanel:0.1.19"
- dockerImage: "airbyte/source-mixpanel:0.1.20"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mixpanel"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mixpanel/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.1.19
LABEL io.airbyte.version=0.1.20
LABEL io.airbyte.name=airbyte/source-mixpanel
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
"date": "2021-07-01"
},
"export": {
"date": "2021-06-16"
"date": "2021-06-16T12:00:00"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,10 +79,15 @@ def process_response(self, response: requests.Response, **kwargs) -> Iterable[Ma
for record in data:
yield record

def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]:
def parse_response(
self,
response: requests.Response,
stream_state: Mapping[str, Any],
**kwargs,
) -> Iterable[Mapping]:

# parse the whole response
yield from self.process_response(response, **kwargs)
yield from self.process_response(response, stream_state=stream_state, **kwargs)

if self.reqs_per_hour_limit > 0:
# we skip this block, if self.reqs_per_hour_limit = 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import json
from datetime import datetime
from typing import Any, Iterable, Mapping
from typing import Any, Iterable, Mapping, MutableMapping

import requests
from airbyte_cdk.models import SyncMode
Expand Down Expand Up @@ -149,3 +149,11 @@ def get_json_schema(self) -> Mapping[str, Any]:
schema["properties"][result.transformed_name] = {"type": ["null", "string"]}

return schema

def request_params(
self, stream_state: Mapping[str, Any], stream_slice: Mapping[str, any] = None, next_page_token: Mapping[str, Any] = None
) -> MutableMapping[str, Any]:
mapping = {"from_date": stream_slice["start_date"], "to_date": stream_slice["end_date"]}
if stream_state and "date" in stream_state:
mapping["where"] = f"properties[\"$time\"]>=datetime({int(datetime.fromisoformat(stream_state['date']).timestamp())})"
return mapping
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

import datetime
from datetime import timedelta
from unittest.mock import MagicMock

Expand Down Expand Up @@ -407,7 +408,7 @@ def export_response():
{
"event": "Viewed E-commerce Page",
"properties": {
"time": 1623860880,
"time": 1623860880, # 2021-06-16T16:28:00
"distinct_id": "1d694fd9-31a5-4b99-9eef-ae63112063ed",
"$browser": "Chrome",
"$browser_version": "91.0.4472.101",
Expand All @@ -426,10 +427,26 @@ def test_export_stream(requests_mock, export_response):

stream = Export(authenticator=MagicMock())
requests_mock.register_uri("GET", get_url_to_mock(stream), export_response)

stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"}
# read records for single slice
records = stream.read_records(sync_mode=SyncMode.incremental, stream_slice=stream_slice)

records_length = sum(1 for _ in records)
assert records_length == 1


def test_export_stream_request_params():
stream = Export(authenticator=MagicMock())
stream_slice = {"start_date": "2017-01-25T00:00:00Z", "end_date": "2017-02-25T00:00:00Z"}
stream_state = {"date": "2021-06-16T17:00:00"}

request_params = stream.request_params(stream_state=None, stream_slice=stream_slice)
assert "where" not in request_params

request_params = stream.request_params(stream_state={}, stream_slice=stream_slice)
assert "where" not in request_params

request_params = stream.request_params(stream_state=stream_state, stream_slice=stream_slice)
assert "where" in request_params
timestamp = int(datetime.datetime.fromisoformat("2021-06-16T17:00:00").timestamp())
assert request_params.get("where") == f'properties["$time"]>=datetime({timestamp})'
1 change: 1 addition & 0 deletions docs/integrations/sources/mixpanel.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ Please note, that incremental sync could return duplicated \(old records\) for t

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:-----------------------------------------------------------------------------------------------------|
| 0.1.20 | 2022-08-22 | [15091](https://github.com/airbytehq/airbyte/pull/15091) | Improve `export` stream cursor support |
| 0.1.19 | 2022-08-18 | [15739](https://github.com/airbytehq/airbyte/pull/15739) | Update `titile` and `description` for `Project Secret` field |
| 0.1.18 | 2022-07-21 | [14924](https://github.com/airbytehq/airbyte/pull/14924) | Remove `additionalProperties` field from schemas and specs |
| 0.1.17 | 2022-06-01 | [12801](https://github.com/airbytehq/airbyte/pull/13372) | Acceptance tests fix, fixing some bugs for beta release |
Expand Down