From 41f9d1f792a5d11b41a7b98f18e091710df2ae65 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Wed, 8 Sep 2021 19:02:40 +0300 Subject: [PATCH 01/12] Add new streams --- .../acceptance-test-config.yml | 24 + .../source-iterable/acceptance-test-docker.sh | 15 + .../connectors/source-iterable/build.gradle | 11 +- .../integration_tests/__init__.py | 0 .../integration_tests/abnormal_state.json | 26 + .../integration_tests/acceptance.py | 33 + .../integration_tests/catalog.json | 186 +++ .../integration_tests/configured_catalog.json | 186 +++ .../integration_tests/invalid_config.json | 4 + .../source-iterable/requirements.txt | 2 + .../sample_files/configured_catalog.json | 1258 ----------------- .../sample_files/sample_config.json | 2 +- .../source-iterable/sample_files/state.json | 26 - .../source-iterable/source_iterable/api.py | 149 +- .../schemas/campaigns_metrics.json | 8 + .../source_iterable/schemas/events.json | 8 + .../source-iterable/source_iterable/source.py | 4 + 17 files changed, 627 insertions(+), 1315 deletions(-) create mode 100644 airbyte-integrations/connectors/source-iterable/acceptance-test-config.yml create mode 100755 airbyte-integrations/connectors/source-iterable/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-iterable/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-iterable/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-iterable/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-iterable/integration_tests/catalog.json create mode 100644 airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-iterable/integration_tests/invalid_config.json delete mode 100644 airbyte-integrations/connectors/source-iterable/sample_files/configured_catalog.json delete mode 100644 airbyte-integrations/connectors/source-iterable/sample_files/state.json mode change 100644 => 100755 airbyte-integrations/connectors/source-iterable/source_iterable/api.py create mode 100644 airbyte-integrations/connectors/source-iterable/source_iterable/schemas/campaigns_metrics.json create mode 100644 airbyte-integrations/connectors/source-iterable/source_iterable/schemas/events.json diff --git a/airbyte-integrations/connectors/source-iterable/acceptance-test-config.yml b/airbyte-integrations/connectors/source-iterable/acceptance-test-config.yml new file mode 100644 index 0000000000000..587d1172565aa --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/acceptance-test-config.yml @@ -0,0 +1,24 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/contributing-to-airbyte/building-new-connector/source-acceptance-tests.md) +# for more information about how to configure these tests +connector_image: airbyte/source-iterable:dev +tests: + spec: + - spec_path: "source_iterable/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/catalog.json" + empty_streams: ['email_send_skip', 'email_complaint'] + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/catalog.json" + incremental: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" diff --git a/airbyte-integrations/connectors/source-iterable/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-iterable/acceptance-test-docker.sh new file mode 100755 index 0000000000000..c522eebbd94e8 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/acceptance-test-docker.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input diff --git a/airbyte-integrations/connectors/source-iterable/build.gradle b/airbyte-integrations/connectors/source-iterable/build.gradle index f55ea0aca67ff..e95e96c3c0306 100644 --- a/airbyte-integrations/connectors/source-iterable/build.gradle +++ b/airbyte-integrations/connectors/source-iterable/build.gradle @@ -1,20 +1,13 @@ plugins { id 'airbyte-python' id 'airbyte-docker' - id 'airbyte-standard-source-test-file' + id 'airbyte-source-acceptance-test' } airbytePython { moduleDirectory 'source_iterable' } -airbyteStandardSourceTestFile { - specPath = "source_iterable/spec.json" - configPath = "secrets/config.json" - configuredCatalogPath = "sample_files/configured_catalog.json" -} - - dependencies { - implementation files(project(':airbyte-integrations:bases:base-standard-source-test-file').airbyteDocker.outputs) + implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs) } diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/__init__.py b/airbyte-integrations/connectors/source-iterable/integration_tests/__init__.py new file mode 100644 index 0000000000000..e69de29bb2d1d diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-iterable/integration_tests/abnormal_state.json new file mode 100644 index 0000000000000..0fd5d83fe5dd9 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/integration_tests/abnormal_state.json @@ -0,0 +1,26 @@ +{ + "users": { + "profileUpdatedAt": "2121-04-14T17:00:41+00:00" + }, + "email_unsubscribe": { + "createdAt": "2121-04-14T17:00:44+00:00" + }, + "email_subscribe": { + "createdAt": "2121-04-14T16:52:45+00:00" + }, + "email_send": { + "createdAt": "2121-04-14T16:25:56+00:00" + }, + "email_open": { + "createdAt": "2121-04-14T17:00:11+00:00" + }, + "email_click": { + "createdAt": "2121-04-14T16:55:14+00:00" + }, + "email_bounce": { + "createdAt": "2121-04-14T16:29:39+00:00" + }, + "templates": { + "createdAt": "2121-04-14T16:23:30.700000+00:00" + } +} diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-iterable/integration_tests/acceptance.py new file mode 100644 index 0000000000000..883e9bff2df34 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/integration_tests/acceptance.py @@ -0,0 +1,33 @@ +# +# MIT License +# +# Copyright (c) 2020 Airbyte +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """This fixture is a placeholder for external resources that acceptance test might require.""" + yield diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/catalog.json b/airbyte-integrations/connectors/source-iterable/integration_tests/catalog.json new file mode 100644 index 0000000000000..02f67b0177542 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/integration_tests/catalog.json @@ -0,0 +1,186 @@ +{ + "streams": [ + { + "stream": { + "name": "campaigns", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "campaigns_metrics", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "channels", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "email_bounce", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_click", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_complaint", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_open", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_send", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_send_skip", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_subscribe", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_unsubscribe", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "events", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "lists", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "list_users", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "message_types", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "metadata", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "templates", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "users", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["profileUpdatedAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..084091251aa39 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog.json @@ -0,0 +1,186 @@ +{ + "streams": [ + { + "stream": { + "name": "campaigns", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "campaigns_metrics", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "channels", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "email_bounce", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_click", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_complaint", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_open", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_send", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_send_skip", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_subscribe", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "email_unsubscribe", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "events", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "lists", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "list_users", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "message_types", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "metadata", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "templates", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["createdAt"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "users", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": true, + "default_cursor_field": ["profileUpdatedAt"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + } + ] +} diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-iterable/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..4d185f8e29387 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/integration_tests/invalid_config.json @@ -0,0 +1,4 @@ +{ + "api_key": "test-api-key", + "start_date": "2020-12-12T00:00:00Z" +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-iterable/requirements.txt b/airbyte-integrations/connectors/source-iterable/requirements.txt index d6e1198b1ab1f..7be17a56d745d 100644 --- a/airbyte-integrations/connectors/source-iterable/requirements.txt +++ b/airbyte-integrations/connectors/source-iterable/requirements.txt @@ -1 +1,3 @@ +# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies. +-e ../../bases/source-acceptance-test -e . diff --git a/airbyte-integrations/connectors/source-iterable/sample_files/configured_catalog.json b/airbyte-integrations/connectors/source-iterable/sample_files/configured_catalog.json deleted file mode 100644 index 2b59c1b02dcf1..0000000000000 --- a/airbyte-integrations/connectors/source-iterable/sample_files/configured_catalog.json +++ /dev/null @@ -1,1258 +0,0 @@ -{ - "streams": [ - { - "stream": { - "name": "campaigns", - "json_schema": { - "properties": { - "id": { - "type": ["null", "integer"] - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "updatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "startAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "endedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "name": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "messageMedium": { - "type": ["null", "string"] - }, - "createdByUserId": { - "type": ["null", "string"] - }, - "updatedByUserId": { - "type": ["null", "string"] - }, - "campaignState": { - "type": ["null", "string"] - }, - "listIds": { - "type": ["null", "array"], - "items": {} - }, - "suppressionListIds": { - "type": ["null", "array"], - "items": {} - }, - "sendSize": { - "type": ["null", "number"] - }, - "recurringCampaignId": { - "type": ["null", "number"] - }, - "workflowId": { - "type": ["null", "number"] - }, - "labels": { - "type": ["null", "array"], - "items": {} - }, - "type": { - "type": ["null", "string"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "channels", - "json_schema": { - "properties": { - "id": { - "type": ["null", "number"] - }, - "name": { - "type": ["null", "string"] - }, - "channelType": { - "type": ["null", "string"] - }, - "messageMedium": { - "type": ["null", "string"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "email_bounce", - "json_schema": { - "properties": { - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "campaignId": { - "type": ["null", "integer"] - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "messageId": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "email": { - "type": ["null", "string"] - }, - "recipientState": { - "type": ["null", "string"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "email_click", - "json_schema": { - "properties": { - "country": { - "type": ["null", "string"] - }, - "city": { - "type": ["null", "string"] - }, - "campaignId": { - "type": ["null", "integer"] - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "ip": { - "type": ["null", "string"] - }, - "contentId": { - "type": ["null", "integer"] - }, - "userAgentDevice": { - "type": ["null", "string"] - }, - "messageId": { - "type": ["null", "string"] - }, - "hrefIndex": { - "type": ["null", "integer"] - }, - "userAgent": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "url": { - "type": ["null", "string"] - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "region": { - "type": ["null", "string"] - }, - "email": { - "type": ["null", "string"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "email_complaint", - "json_schema": { - "properties": { - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "campaignId": { - "type": ["null", "integer"] - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "messageId": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "email": { - "type": ["null", "string"] - }, - "recipientState": { - "type": ["null", "string"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "email_open", - "json_schema": { - "properties": { - "country": { - "type": ["null", "string"] - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "city": { - "type": ["null", "string"] - }, - "campaignId": { - "type": ["null", "integer"] - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "ip": { - "type": ["null", "string"] - }, - "userAgentDevice": { - "type": ["null", "string"] - }, - "messageId": { - "type": ["null", "string"] - }, - "userAgent": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "region": { - "type": ["null", "string"] - }, - "email": { - "type": ["null", "string"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "email_send", - "json_schema": { - "properties": { - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "campaignId": { - "type": ["null", "integer"] - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "messageTypeId": { - "type": ["null", "integer"] - }, - "transactionalData": { - "type": ["null", "object"], - "properties": { - "inventory": { - "type": ["null", "integer"] - }, - "eventName": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "sku": { - "type": ["null", "string"] - }, - "email": { - "type": ["null", "string"] - }, - "url": { - "type": ["null", "string"] - }, - "description": { - "type": ["null", "string"] - }, - "price": { - "type": ["null", "integer"] - }, - "product_type": { - "type": ["null", "string"] - }, - "compare_at_price": { - "type": ["null", "number"] - }, - "id": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "product_id": { - "type": ["null", "string"] - }, - "categories": { - "type": ["null", "array"], - "items": {} - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "campaignId": { - "type": ["null", "integer"] - }, - "vendor": { - "type": ["null", "string"] - }, - "eventUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "discount": { - "type": ["null", "integer"] - }, - "imageUrl": { - "type": ["null", "string"] - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "handle": { - "type": ["null", "string"] - } - } - }, - "contentId": { - "type": ["null", "integer"] - }, - "messageId": { - "type": ["null", "string"] - }, - "messageBusId": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "email": { - "type": ["null", "string"] - }, - "channelId": { - "type": ["null", "integer"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "email_send_skip", - "json_schema": { - "properties": { - "reason": { - "type": ["null", "string"] - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "campaignId": { - "type": ["null", "integer"] - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "messageTypeId": { - "type": ["null", "integer"] - }, - "transactionalData": { - "type": ["null", "object"], - "properties": { - "inventory": { - "type": ["null", "integer"] - }, - "eventName": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "sku": { - "type": ["null", "string"] - }, - "email": { - "type": ["null", "string"] - }, - "url": { - "type": ["null", "string"] - }, - "description": { - "type": ["null", "string"] - }, - "price": { - "type": ["null", "integer"] - }, - "product_type": { - "type": ["null", "string"] - }, - "compare_at_price": { - "type": ["null", "number"] - }, - "id": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "product_id": { - "type": ["null", "string"] - }, - "categories": { - "type": ["null", "array"], - "items": {} - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "campaignId": { - "type": ["null", "integer"] - }, - "vendor": { - "type": ["null", "string"] - }, - "eventUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "discount": { - "type": ["null", "integer"] - }, - "imageUrl": { - "type": ["null", "string"] - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "handle": { - "type": ["null", "string"] - } - } - }, - "contentId": { - "type": ["null", "integer"] - }, - "messageId": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "email": { - "type": ["null", "string"] - }, - "channelId": { - "type": ["null", "integer"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "email_subscribe", - "json_schema": { - "properties": { - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "signupSource": { - "type": ["null", "string"] - }, - "emailListIds": { - "type": ["null", "array"], - "items": {} - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "emailListId": { - "type": ["null", "integer"] - }, - "email": { - "type": ["null", "string"] - }, - "profileUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "workflowId": { - "type": ["null", "integer"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "email_unsubscribe", - "json_schema": { - "properties": { - "unsubSource": { - "type": ["null", "string"] - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "campaignId": { - "type": ["null", "integer"] - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "emailListId": { - "type": ["null", "integer"] - }, - "emailListIds": { - "type": ["null", "array"], - "items": {} - }, - "workflowId": { - "type": ["null", "integer"] - }, - "messageId": { - "type": ["null", "string"] - }, - "templateId": { - "type": ["null", "integer"] - }, - "channelIds": { - "type": ["null", "array"], - "items": {} - }, - "email": { - "type": ["null", "string"] - }, - "channelId": { - "type": ["null", "integer"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "lists", - "json_schema": { - "properties": { - "id": { - "type": ["null", "integer"] - }, - "name": { - "type": ["null", "string"] - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "listType": { - "type": ["null", "string"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "list_users", - "json_schema": { - "properties": { - "email": { - "type": ["null", "string"] - }, - "listId": { - "type": ["null", "integer"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "message_types", - "json_schema": { - "properties": { - "id": { - "type": ["null", "number"] - }, - "name": { - "type": ["null", "string"] - }, - "channelId": { - "type": ["null", "number"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "metadata", - "json_schema": { - "properties": { - "table": { - "type": ["null", "string"] - }, - "key": { - "type": ["null", "string"] - }, - "size": { - "type": ["null", "integer"] - }, - "lastModified": { - "type": ["null", "string"], - "format": "date-time" - }, - "value": { - "type": ["null", "object"], - "properties": { - "inventory": { - "type": ["null", "integer"] - }, - "name": { - "type": ["null", "string"] - }, - "sku": { - "type": ["null", "string"] - }, - "url": { - "type": ["null", "string"] - }, - "description": { - "type": ["null", "string"] - }, - "price": { - "type": ["null", "integer"] - }, - "product_type": { - "type": ["null", "string"] - }, - "compare_at_price": { - "type": ["null", "number"] - }, - "id": { - "type": ["null", "string"] - }, - "product_id": { - "type": ["null", "string"] - }, - "categories": { - "type": ["null", "array"], - "items": {} - }, - "vendor": { - "type": ["null", "string"] - }, - "discount": { - "type": ["null", "integer"] - }, - "imageUrl": { - "type": ["null", "string"] - }, - "handle": { - "type": ["null", "string"] - } - } - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "overwrite" - }, - { - "stream": { - "name": "templates", - "json_schema": { - "properties": { - "templateId": { - "type": ["null", "number"] - }, - "createdAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "updatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "name": { - "type": ["null", "string"] - }, - "creatorUserId": { - "type": ["null", "string"] - }, - "messageTypeId": { - "type": ["null", "number"] - }, - "campaignId": { - "type": ["null", "number"] - }, - "clientTemplateId": { - "type": ["null", "string"] - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "users", - "json_schema": { - "properties": { - "country": { - "type": ["null", "string"] - }, - "firstOrderDate": { - "type": ["null", "string"], - "format": "date-time" - }, - "addresses": { - "type": ["null", "array"], - "items": { - "type": ["null", "object"], - "properties": { - "first_name": { - "type": ["null", "string"] - }, - "city": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "zip": { - "type": ["null", "string"] - }, - "country": { - "type": ["null", "string"] - }, - "address1": { - "type": ["null", "string"] - }, - "address2": { - "type": ["null", "string"] - }, - "company": { - "type": ["null", "string"] - }, - "country_code": { - "type": ["null", "string"] - }, - "default": { - "type": ["null", "boolean"] - }, - "id": { - "type": ["null", "string"] - }, - "last_name": { - "type": ["null", "string"] - }, - "province": { - "type": ["null", "string"] - }, - "province_code": { - "type": ["null", "string"] - }, - "country_name": { - "type": ["null", "string"] - }, - "phone": { - "type": ["null", "string"] - } - } - } - }, - "emailAcquiredDate": { - "type": ["null", "string"], - "format": "date-time" - }, - "emailSegmentStatus": { - "type": ["null", "string"] - }, - "admin_graphql_api_id": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "string"] - }, - "mostRecentEmailList": { - "type": ["null", "string"] - }, - "mostRecentEmailSegment": { - "type": ["null", "string"] - }, - "aov": { - "type": ["null", "number"] - }, - "firstCampaign": { - "type": ["null", "string"] - }, - "thirdMostRecentOrderDate": { - "type": ["null", "string"], - "format": "date-time" - }, - "firstPurchaseDate": { - "type": ["null", "string"], - "format": "date-time" - }, - "firstMedium": { - "type": ["null", "string"] - }, - "default_address": { - "type": ["null", "object"], - "properties": { - "first_name": { - "type": ["null", "string"] - }, - "city": { - "type": ["null", "string"] - }, - "name": { - "type": ["null", "string"] - }, - "zip": { - "type": ["null", "string"] - }, - "country": { - "type": ["null", "string"] - }, - "address2": { - "type": ["null", "string"] - }, - "company": { - "type": ["null", "string"] - }, - "country_code": { - "type": ["null", "string"] - }, - "id": { - "type": ["null", "string"] - }, - "last_name": { - "type": ["null", "string"] - }, - "address1": { - "type": ["null", "string"] - }, - "default": { - "type": ["null", "boolean"] - }, - "province": { - "type": ["null", "string"] - }, - "province_code": { - "type": ["null", "string"] - }, - "country_name": { - "type": ["null", "string"] - }, - "phone": { - "type": ["null", "string"] - } - } - }, - "emailListIds": { - "type": ["null", "array"], - "items": {} - }, - "accepts_marketing": { - "type": ["null", "boolean"] - }, - "secondMostRecentOrderDate": { - "type": ["null", "string"], - "format": "date-time" - }, - "state": { - "type": ["null", "string"] - }, - "mostRecentCampaign": { - "type": ["null", "string"] - }, - "zip": { - "type": ["null", "string"] - }, - "total_spent": { - "type": ["null", "number"] - }, - "mostRecentOrderDate": { - "type": ["null", "string"], - "format": "date-time" - }, - "last_order_id": { - "type": ["null", "string"] - }, - "tax_exempt": { - "type": ["null", "boolean"] - }, - "mostRecentSource": { - "type": ["null", "string"] - }, - "twelveMonthLtr": { - "type": ["null", "integer"] - }, - "verified_email": { - "type": ["null", "boolean"] - }, - "mostRecentMedium": { - "type": ["null", "string"] - }, - "orders_count": { - "type": ["null", "integer"] - }, - "firstName": { - "type": ["null", "string"] - }, - "lastInteractionTs": { - "type": ["null", "string"], - "format": "date-time" - }, - "boughtSas": { - "type": ["null", "boolean"] - }, - "secondMostRecentOrderCards": { - "type": ["null", "array"], - "items": {} - }, - "unsubscribedChannelIds": { - "type": ["null", "array"], - "items": {} - }, - "lastName": { - "type": ["null", "string"] - }, - "last_order_name": { - "type": ["null", "string"] - }, - "secondOrderDate": { - "type": ["null", "string"], - "format": "date-time" - }, - "hasAccount": { - "type": ["null", "boolean"] - }, - "city": { - "type": ["null", "string"] - }, - "mostRecentOrderCards": { - "type": ["null", "array"], - "items": {} - }, - "itblInternal": { - "type": ["null", "object"], - "properties": { - "emailDomain": { - "type": ["null", "string"] - }, - "documentUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "documentCreatedAt": { - "type": ["null", "string"], - "format": "date-time" - } - } - }, - "hasReminder": { - "type": ["null", "boolean"] - }, - "thirdOrderDate": { - "type": ["null", "string"], - "format": "date-time" - }, - "subscribedMessageTypeIds": { - "type": ["null", "array"], - "items": {} - }, - "firstSource": { - "type": ["null", "string"] - }, - "unsubscribedMessageTypeIds": { - "type": ["null", "array"], - "items": {} - }, - "first_name": { - "type": ["null", "string"] - }, - "email": { - "type": ["null", "string"] - }, - "thirdMostRecentOrderCards": { - "type": ["null", "array"], - "items": {} - }, - "profileUpdatedAt": { - "type": ["null", "string"], - "format": "date-time" - }, - "signupDate": { - "type": ["null", "string"], - "format": "date-time" - }, - "businessLines": { - "type": ["null", "array"], - "items": {} - }, - "secondOrderCards": { - "type": ["null", "array"], - "items": {} - }, - "address1": { - "type": ["null", "string"] - }, - "last_name": { - "type": ["null", "string"] - }, - "ltr": { - "type": ["null", "integer"] - }, - "userId": { - "type": ["null", "string"] - }, - "shopify_created_at": { - "type": ["null", "string"], - "format": "date-time" - }, - "signupSource": { - "type": ["null", "string"] - }, - "thirdOrderCards": { - "type": ["null", "array"], - "items": {} - }, - "firstOrderCards": { - "type": ["null", "array"], - "items": {} - }, - "totalOrders": { - "type": ["null", "integer"] - }, - "shopify_updated_at": { - "type": ["null", "string"], - "format": "date-time" - } - }, - "type": ["null", "object"] - }, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["profileUpdatedAt"] - }, - "sync_mode": "incremental", - "destination_sync_mode": "append" - } - ] -} diff --git a/airbyte-integrations/connectors/source-iterable/sample_files/sample_config.json b/airbyte-integrations/connectors/source-iterable/sample_files/sample_config.json index ea49b1ae643ad..fb46b1ec2582e 100644 --- a/airbyte-integrations/connectors/source-iterable/sample_files/sample_config.json +++ b/airbyte-integrations/connectors/source-iterable/sample_files/sample_config.json @@ -1,4 +1,4 @@ { - "api_key": ">", + "api_key": "", "start_date": "2021-04-01T00:00:00Z" } diff --git a/airbyte-integrations/connectors/source-iterable/sample_files/state.json b/airbyte-integrations/connectors/source-iterable/sample_files/state.json deleted file mode 100644 index eeb8474cdf8aa..0000000000000 --- a/airbyte-integrations/connectors/source-iterable/sample_files/state.json +++ /dev/null @@ -1,26 +0,0 @@ -{ - "users": { - "profileUpdatedAt": "2021-04-14T17:00:41+00:00" - }, - "email_unsubscribe": { - "createdAt": "2021-04-14T17:00:44+00:00" - }, - "email_subscribe": { - "createdAt": "2021-04-14T16:52:45+00:00" - }, - "email_send": { - "createdAt": "2021-04-14T16:25:56+00:00" - }, - "email_open": { - "createdAt": "2021-04-14T17:00:11+00:00" - }, - "email_click": { - "createdAt": "2021-04-14T16:55:14+00:00" - }, - "email_bounce": { - "createdAt": "2021-04-14T16:29:39+00:00" - }, - "templates": { - "createdAt": "2021-04-14T16:23:30.700000+00:00" - } -} diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py old mode 100644 new mode 100755 index b535148535c40..abf50dcbfceba --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py @@ -22,23 +22,33 @@ # SOFTWARE. # - import json import urllib.parse as urlparse from abc import ABC, abstractmethod -from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union +from typing import Any, Dict, Iterable, Mapping, MutableMapping, Optional, Union import pendulum import requests from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.streams.http import HttpStream +EVENT_ROWS_LIMIT = 200 + class IterableStream(HttpStream, ABC): - url_base = "https://api.iterable.com/api/" # Hardcode the value because it is not returned from the API BACKOFF_TIME_CONSTANT = 10.0 + # define date-time fields with potential wrong format + DATE_TIME_FIELDS = { + "createdAt", + "updatedAt", + "startAt", + "endedAt", + "profileUpdatedAt", + } + + url_base = "https://api.iterable.com/api/" primary_key = "id" def __init__(self, api_key, **kwargs): @@ -66,36 +76,77 @@ def request_params(self, **kwargs) -> MutableMapping[str, Any]: def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: response_json = response.json() - yield from response_json.get(self.data_field, []) + records = response_json.get(self.data_field, []) + for record in records: + yield self._convert_timestamp_fields_to_datetime(record) -class IterableExportStream(IterableStream, ABC): - def __init__(self, start_date, **kwargs): - super().__init__(**kwargs) - self._start_date = pendulum.parse(start_date) - self.stream_params = {"dataTypeName": self.data_field} + @staticmethod + def _parse_plain_text_to_dict(text: str) -> Dict[str, Any]: + """ + :param text: API endpoint response with plain/text format + :return: parsed API response + """ - cursor_field = "createdAt" + result = dict() + data = text.split("\n") + + names = data[0].split(",") + raw_values = data[1].split(",") + + for key, value in zip(names, raw_values): + try: + result[key] = int(value) + except ValueError: + result[key] = float(value) + + return result + + def _convert_timestamp_fields_to_datetime(self, record: Dict[str, Any]): + for field in self.DATE_TIME_FIELDS: + # check for optional timestamp fields and convert to date-time + if record.get(field): + record[field] = self._field_to_datetime(record[field]) + + return record @staticmethod def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: if isinstance(value, int): + # divide by 1000.0, because timestamp present in milliseconds value = pendulum.from_timestamp(value / 1000.0) + elif isinstance(value, str): value = pendulum.parse(value) + else: raise ValueError(f"Unsupported type of datetime field {type(value)}") + return value + +class IterableExportStream(IterableStream, ABC): + + cursor_field = "createdAt" + primary_key = None + + def __init__(self, start_date, **kwargs): + super().__init__(**kwargs) + self._start_date = pendulum.parse(start_date) + self.stream_params = {"dataTypeName": self.data_field} + + def path(self, **kwargs) -> str: + return "/export/data.json" + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object and returning an updated state object. """ - latest_benchmark = self._field_to_datetime(latest_record[self.cursor_field]) + latest_benchmark = latest_record[self.cursor_field] if current_stream_state.get(self.cursor_field): - return {self.cursor_field: str(max(latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field])))} - return {self.cursor_field: str(latest_benchmark)} + return {self.cursor_field: max(latest_benchmark, pendulum.parse(current_stream_state[self.cursor_field])).to_datetime_string()} + return {self.cursor_field: latest_benchmark.to_datetime_string()} def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: @@ -110,12 +161,9 @@ def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMa ) return params - def path(self, **kwargs) -> str: - return "/export/data.json" - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: for obj in response.iter_lines(): - yield json.loads(obj) + yield self._convert_timestamp_fields_to_datetime(json.loads(obj)) class Lists(IterableStream): @@ -126,17 +174,18 @@ def path(self, **kwargs) -> str: class ListUsers(IterableStream): + primary_key = "listId" data_field = "getUsers" name = "list_users" + def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str: + return f"lists/{self.data_field}?listId={stream_slice['list_id']}" + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: lists = Lists(api_key=self._api_key) for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)): yield {"list_id": list_record["id"]} - def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str: - return f"lists/{self.data_field}?listId={stream_slice['list_id']}" - def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: list_id = self._get_list_id(response.url) for user in response.iter_lines(): @@ -158,6 +207,34 @@ def path(self, **kwargs) -> str: return "campaigns" +class CampaignsMetrics(IterableStream): + primary_key = None + data_field = None + + def __init__(self, api_key: str, start_date: str): + super().__init__(api_key) + self.start_date = start_date + + def path(self, **kwargs) -> str: + return "campaigns/metrics" + + def request_params(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> MutableMapping[str, Any]: + params = super().request_params(**kwargs) + params["campaignId"] = stream_slice.get("campaign_id") + params["startDateTime"] = self.start_date + + return params + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + lists = Campaigns(api_key=self._api_key) + for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)): + yield {"campaign_id": list_record["id"]} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + plain_text = response.content.decode() + yield {"data": self._parse_plain_text_to_dict(plain_text)} + + class Channels(IterableStream): data_field = "channels" @@ -205,6 +282,32 @@ class EmailUnsubscribe(IterableExportStream): data_field = "emailUnsubscribe" +class Events(IterableStream): + primary_key = None + data_field = "events" + + def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str: + return f"events/{stream_slice['email']}" + + def request_params(self, **kwargs) -> MutableMapping[str, Any]: + params = super().request_params(**kwargs) + params["limit"] = EVENT_ROWS_LIMIT + + return params + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: + lists = ListUsers(api_key=self._api_key) + stream_slices = lists.stream_slices() + + for stream_slice in stream_slices: + for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh), stream_slice=stream_slice): + yield {"email": list_record["email"]} + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + for record in super().parse_response(response, **kwargs): + yield {"data": record} + + class MessageTypes(IterableStream): data_field = "messageTypes" name = "message_types" @@ -214,6 +317,7 @@ def path(self, **kwargs) -> str: class Metadata(IterableStream): + primary_key = None data_field = "results" def path(self, **kwargs) -> str: @@ -236,7 +340,10 @@ def read_records(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwarg def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: response_json = response.json() - yield from response_json.get(self.data_field, []) + records = response_json.get(self.data_field, []) + + for record in records: + yield self._convert_timestamp_fields_to_datetime(record) class Users(IterableExportStream): diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/campaigns_metrics.json b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/campaigns_metrics.json new file mode 100644 index 0000000000000..c7cc50b244279 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/campaigns_metrics.json @@ -0,0 +1,8 @@ +{ + "properties": { + "data": { + "type": ["null", "object"] + } + }, + "type": ["null", "object"] +} diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/events.json b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/events.json new file mode 100644 index 0000000000000..c7cc50b244279 --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/events.json @@ -0,0 +1,8 @@ +{ + "properties": { + "data": { + "type": ["null", "object"] + } + }, + "type": ["null", "object"] +} diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/source.py b/airbyte-integrations/connectors/source-iterable/source_iterable/source.py index ecd3c79cbfd38..95dcc092f4a26 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/source.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/source.py @@ -31,6 +31,7 @@ from .api import ( Campaigns, + CampaignsMetrics, Channels, EmailBounce, EmailClick, @@ -40,6 +41,7 @@ EmailSendSkip, EmailSubscribe, EmailUnsubscribe, + Events, Lists, ListUsers, MessageTypes, @@ -61,6 +63,7 @@ def check_connection(self, logger, config) -> Tuple[bool, any]: def streams(self, config: Mapping[str, Any]) -> List[Stream]: return [ Campaigns(api_key=config["api_key"]), + CampaignsMetrics(api_key=config["api_key"], start_date=config["start_date"]), Channels(api_key=config["api_key"]), EmailBounce(api_key=config["api_key"], start_date=config["start_date"]), EmailClick(api_key=config["api_key"], start_date=config["start_date"]), @@ -70,6 +73,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: EmailSendSkip(api_key=config["api_key"], start_date=config["start_date"]), EmailSubscribe(api_key=config["api_key"], start_date=config["start_date"]), EmailUnsubscribe(api_key=config["api_key"], start_date=config["start_date"]), + Events(api_key=config["api_key"]), Lists(api_key=config["api_key"]), ListUsers(api_key=config["api_key"]), MessageTypes(api_key=config["api_key"]), From 74c25b1ea5ef3a7fa0d77334a6656b355a8ba5e2 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Wed, 8 Sep 2021 22:56:17 +0300 Subject: [PATCH 02/12] Upd requirements versions --- airbyte-integrations/connectors/source-iterable/setup.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-iterable/setup.py b/airbyte-integrations/connectors/source-iterable/setup.py index 456dc819246d3..f2eee25bec8b5 100644 --- a/airbyte-integrations/connectors/source-iterable/setup.py +++ b/airbyte-integrations/connectors/source-iterable/setup.py @@ -26,12 +26,12 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk==0.1.3", - "pendulum==1.2.0", - "requests==2.25.1", + "airbyte-cdk~=0.1", + "pendulum~=1.2", + "requests", ] -TEST_REQUIREMENTS = ["pytest==6.1.2"] +TEST_REQUIREMENTS = ["pytest~=6.1"] setup( From 4f7e64ac36e7885d206725ca277a5da8d6192782 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Thu, 9 Sep 2021 15:06:35 +0300 Subject: [PATCH 03/12] Upd docs --- .../connectors/source-iterable/source_iterable/api.py | 6 ++++++ docs/integrations/sources/iterable.md | 8 ++++++++ 2 files changed, 14 insertions(+) diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py index abf50dcbfceba..4b62a932a7552 100755 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py @@ -212,6 +212,9 @@ class CampaignsMetrics(IterableStream): data_field = None def __init__(self, api_key: str, start_date: str): + """ + https://api.iterable.com/api/docs#campaigns_metrics + """ super().__init__(api_key) self.start_date = start_date @@ -283,6 +286,9 @@ class EmailUnsubscribe(IterableExportStream): class Events(IterableStream): + """ + https://api.iterable.com/api/docs#events_User_events + """ primary_key = None data_field = "events" diff --git a/docs/integrations/sources/iterable.md b/docs/integrations/sources/iterable.md index b5778a637ca0d..3c9f151b3f5ad 100644 --- a/docs/integrations/sources/iterable.md +++ b/docs/integrations/sources/iterable.md @@ -11,6 +11,7 @@ This source can sync data for the [Iterable API](https://api.iterable.com/api/do Several output streams are available from this source: * [Campaigns](https://api.iterable.com/api/docs#campaigns_campaigns) +* [Campaign Metrics](https://api.iterable.com/api/docs#campaigns_metrics) * [Channels](https://api.iterable.com/api/docs#channels_channels) * [Email Bounce](https://api.iterable.com/api/docs#export_exportDataJson) \(Incremental sync\) * [Email Click](https://api.iterable.com/api/docs#export_exportDataJson) \(Incremental sync\) @@ -20,6 +21,7 @@ Several output streams are available from this source: * [Email Send Skip](https://api.iterable.com/api/docs#export_exportDataJson) \(Incremental sync\) * [Email Subscribe](https://api.iterable.com/api/docs#export_exportDataJson) \(Incremental sync\) * [Email Unsubscribe](https://api.iterable.com/api/docs#export_exportDataJson) \(Incremental sync\) +* [Events](https://api.iterable.com/api/docs#events_User_events) * [Lists](https://api.iterable.com/api/docs#lists_getLists) * [List Users](https://api.iterable.com/api/docs#lists_getLists_0) * [Message Types](https://api.iterable.com/api/docs#messageTypes_messageTypes) @@ -52,3 +54,9 @@ The Iterable connector should not run into Iterable API limitations under normal Please read [How to find your API key](https://support.iterable.com/hc/en-us/articles/360043464871-API-Keys-#creating-api-keys). + +## CHANGELOG + +| Version | Date | Pull Request | Subject | +| :------ | :-------- | :----- | :------ | +| `0.1.7` | 2021-09-08 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Add new streams: events, campaign metrics | From 6a44a86b77072e82df840a17814b9dd522b37c9a Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Thu, 9 Sep 2021 15:07:32 +0300 Subject: [PATCH 04/12] Remove tests for the templates stream --- .../integration_tests/configured_catalog.json | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog.json index 084091251aa39..a6392effd040c 100644 --- a/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-iterable/integration_tests/configured_catalog.json @@ -160,17 +160,6 @@ "sync_mode": "full_refresh", "destination_sync_mode": "overwrite" }, - { - "stream": { - "name": "templates", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["createdAt"] - }, - "sync_mode": "full_refresh", - "destination_sync_mode": "append" - }, { "stream": { "name": "users", From 4a9f636b41575d1c6b0daa4d275d040165b961c5 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Tue, 14 Sep 2021 18:35:42 +0300 Subject: [PATCH 05/12] Upd csv field parsing --- .../source-iterable/source_iterable/api.py | 31 ++++++++++++------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py index 4b62a932a7552..e19a3f2c5b575 100755 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py @@ -22,9 +22,11 @@ # SOFTWARE. # +import csv import json import urllib.parse as urlparse from abc import ABC, abstractmethod +from io import StringIO from typing import Any, Dict, Iterable, Mapping, MutableMapping, Optional, Union import pendulum @@ -82,19 +84,25 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp yield self._convert_timestamp_fields_to_datetime(record) @staticmethod - def _parse_plain_text_to_dict(text: str) -> Dict[str, Any]: + def _parse_csv_string_to_dict(csv_string: str) -> Dict[str, Any]: """ - :param text: API endpoint response with plain/text format + Parse a response with a csv type to dict object + Example: + csv_string = "a,b,c + 1,2,3" + + output = {"a": 1, "b": 2, "c": 3} + + + :param csv_string: API endpoint response with csv format :return: parsed API response - """ - result = dict() - data = text.split("\n") + """ - names = data[0].split(",") - raw_values = data[1].split(",") + reader = csv.DictReader(StringIO(csv_string), delimiter=",") + result = next(reader) - for key, value in zip(names, raw_values): + for key, value in result.items(): try: result[key] = int(value) except ValueError: @@ -234,8 +242,8 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: yield {"campaign_id": list_record["id"]} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: - plain_text = response.content.decode() - yield {"data": self._parse_plain_text_to_dict(plain_text)} + content = response.content.decode() + yield {"data": self._parse_csv_string_to_dict(content)} class Channels(IterableStream): @@ -291,13 +299,14 @@ class Events(IterableStream): """ primary_key = None data_field = "events" + page_size = EVENT_ROWS_LIMIT def path(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> str: return f"events/{stream_slice['email']}" def request_params(self, **kwargs) -> MutableMapping[str, Any]: params = super().request_params(**kwargs) - params["limit"] = EVENT_ROWS_LIMIT + params["limit"] = self.page_size return params From a29692ee9fc1a2e920bbfa906e434b0055599abf Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Tue, 14 Sep 2021 18:39:48 +0300 Subject: [PATCH 06/12] Fix file permissions --- .../connectors/source-iterable/source_iterable/api.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) mode change 100755 => 100644 airbyte-integrations/connectors/source-iterable/source_iterable/api.py diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py old mode 100755 new mode 100644 From bb10fee28c5d54ed965b5cdf73195bbf93cab29d Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Fri, 17 Sep 2021 10:02:18 +0300 Subject: [PATCH 07/12] Set dependency version --- .../source-iterable/integration_tests/invalid_config.json | 2 +- airbyte-integrations/connectors/source-iterable/setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-iterable/integration_tests/invalid_config.json index 4d185f8e29387..33e5a4a510ec9 100644 --- a/airbyte-integrations/connectors/source-iterable/integration_tests/invalid_config.json +++ b/airbyte-integrations/connectors/source-iterable/integration_tests/invalid_config.json @@ -1,4 +1,4 @@ { "api_key": "test-api-key", "start_date": "2020-12-12T00:00:00Z" -} \ No newline at end of file +} diff --git a/airbyte-integrations/connectors/source-iterable/setup.py b/airbyte-integrations/connectors/source-iterable/setup.py index f2eee25bec8b5..e6709d0f327e8 100644 --- a/airbyte-integrations/connectors/source-iterable/setup.py +++ b/airbyte-integrations/connectors/source-iterable/setup.py @@ -28,7 +28,7 @@ MAIN_REQUIREMENTS = [ "airbyte-cdk~=0.1", "pendulum~=1.2", - "requests", + "requests~=2.25", ] TEST_REQUIREMENTS = ["pytest~=6.1"] From d8aaede5aaaa1ce1440e4a65785a8fd43f5e3730 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Fri, 24 Sep 2021 16:16:28 +0300 Subject: [PATCH 08/12] Refactor --- .../source-iterable/source_iterable/api.py | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) mode change 100644 => 100755 airbyte-integrations/connectors/source-iterable/source_iterable/api.py diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py old mode 100644 new mode 100755 index e19a3f2c5b575..5aacd27d87755 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py @@ -83,33 +83,6 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp for record in records: yield self._convert_timestamp_fields_to_datetime(record) - @staticmethod - def _parse_csv_string_to_dict(csv_string: str) -> Dict[str, Any]: - """ - Parse a response with a csv type to dict object - Example: - csv_string = "a,b,c - 1,2,3" - - output = {"a": 1, "b": 2, "c": 3} - - - :param csv_string: API endpoint response with csv format - :return: parsed API response - - """ - - reader = csv.DictReader(StringIO(csv_string), delimiter=",") - result = next(reader) - - for key, value in result.items(): - try: - result[key] = int(value) - except ValueError: - result[key] = float(value) - - return result - def _convert_timestamp_fields_to_datetime(self, record: Dict[str, Any]): for field in self.DATE_TIME_FIELDS: # check for optional timestamp fields and convert to date-time @@ -245,6 +218,33 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp content = response.content.decode() yield {"data": self._parse_csv_string_to_dict(content)} + @staticmethod + def _parse_csv_string_to_dict(csv_string: str) -> Dict[str, Any]: + """ + Parse a response with a csv type to dict object + Example: + csv_string = "a,b,c + 1,2,3" + + output = {"a": 1, "b": 2, "c": 3} + + + :param csv_string: API endpoint response with csv format + :return: parsed API response + + """ + + reader = csv.DictReader(StringIO(csv_string), delimiter=",") + result = next(reader) + + for key, value in result.items(): + try: + result[key] = int(value) + except ValueError: + result[key] = float(value) + + return result + class Channels(IterableStream): data_field = "channels" From b70ecafb8ed409f44d565b62d2fe5d5306c36370 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Mon, 27 Sep 2021 20:53:43 +0300 Subject: [PATCH 09/12] Merge --- .../connectors/source-iterable/Dockerfile | 2 +- .../source-iterable/source_iterable/api.py | 53 +++++++------------ .../source_iterable/schemas/campaigns.json | 12 ++--- .../source_iterable/schemas/lists.json | 3 +- .../source_iterable/schemas/metadata.json | 3 +- .../source_iterable/schemas/templates.json | 6 +-- docs/integrations/sources/iterable.md | 4 +- 7 files changed, 31 insertions(+), 52 deletions(-) diff --git a/airbyte-integrations/connectors/source-iterable/Dockerfile b/airbyte-integrations/connectors/source-iterable/Dockerfile index 317043458a53e..453697195e7b0 100644 --- a/airbyte-integrations/connectors/source-iterable/Dockerfile +++ b/airbyte-integrations/connectors/source-iterable/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/source-iterable diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py index 5aacd27d87755..a5f61872bce1f 100755 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py @@ -42,13 +42,6 @@ class IterableStream(HttpStream, ABC): # Hardcode the value because it is not returned from the API BACKOFF_TIME_CONSTANT = 10.0 # define date-time fields with potential wrong format - DATE_TIME_FIELDS = { - "createdAt", - "updatedAt", - "startAt", - "endedAt", - "profileUpdatedAt", - } url_base = "https://api.iterable.com/api/" primary_key = "id" @@ -81,29 +74,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp records = response_json.get(self.data_field, []) for record in records: - yield self._convert_timestamp_fields_to_datetime(record) - - def _convert_timestamp_fields_to_datetime(self, record: Dict[str, Any]): - for field in self.DATE_TIME_FIELDS: - # check for optional timestamp fields and convert to date-time - if record.get(field): - record[field] = self._field_to_datetime(record[field]) - - return record - - @staticmethod - def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: - if isinstance(value, int): - # divide by 1000.0, because timestamp present in milliseconds - value = pendulum.from_timestamp(value / 1000.0) - - elif isinstance(value, str): - value = pendulum.parse(value) - - else: - raise ValueError(f"Unsupported type of datetime field {type(value)}") - - return value + yield record class IterableExportStream(IterableStream, ABC): @@ -119,6 +90,16 @@ def __init__(self, start_date, **kwargs): def path(self, **kwargs) -> str: return "/export/data.json" + @staticmethod + def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: + if isinstance(value, int): + value = pendulum.from_timestamp(value / 1000.0) + elif isinstance(value, str): + value = pendulum.parse(value) + else: + raise ValueError(f"Unsupported type of datetime field {type(value)}") + return value + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]) -> Mapping[str, Any]: """ Return the latest state by comparing the cursor value in the latest record with the stream's most recent state object @@ -126,7 +107,11 @@ def get_updated_state(self, current_stream_state: MutableMapping[str, Any], late """ latest_benchmark = latest_record[self.cursor_field] if current_stream_state.get(self.cursor_field): - return {self.cursor_field: max(latest_benchmark, pendulum.parse(current_stream_state[self.cursor_field])).to_datetime_string()} + return { + self.cursor_field: max( + latest_benchmark, self._field_to_datetime(current_stream_state[self.cursor_field]) + ).to_datetime_string() + } return {self.cursor_field: latest_benchmark.to_datetime_string()} def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: @@ -144,7 +129,9 @@ def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMa def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: for obj in response.iter_lines(): - yield self._convert_timestamp_fields_to_datetime(json.loads(obj)) + record = json.loads(obj) + record[self.cursor_field] = self._field_to_datetime(record[self.cursor_field]) + yield record class Lists(IterableStream): @@ -358,7 +345,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp records = response_json.get(self.data_field, []) for record in records: - yield self._convert_timestamp_fields_to_datetime(record) + yield record class Users(IterableExportStream): diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/campaigns.json b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/campaigns.json index d0ac27776bc92..0d180577c5fb2 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/campaigns.json +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/campaigns.json @@ -4,20 +4,16 @@ "type": ["null", "integer"] }, "createdAt": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "integer"] }, "updatedAt": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "integer"] }, "startAt": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "integer"] }, "endedAt": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "integer"] }, "name": { "type": ["null", "string"] diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/lists.json b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/lists.json index 88fb1917f010a..2a0e0a029d7c0 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/lists.json +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/lists.json @@ -7,8 +7,7 @@ "type": ["null", "string"] }, "createdAt": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "integer"] }, "listType": { "type": ["null", "string"] diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/metadata.json b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/metadata.json index 82b0285aa69a5..6af7b89d55bee 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/metadata.json +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/metadata.json @@ -10,8 +10,7 @@ "type": ["null", "integer"] }, "lastModified": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "integer"] }, "value": { "type": ["null", "object"], diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/templates.json b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/templates.json index 0ec50b1a573c3..8ea961911c394 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/templates.json +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/templates.json @@ -4,12 +4,10 @@ "type": ["null", "number"] }, "createdAt": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "integer"] }, "updatedAt": { - "type": ["null", "string"], - "format": "date-time" + "type": ["null", "integer"] }, "name": { "type": ["null", "string"] diff --git a/docs/integrations/sources/iterable.md b/docs/integrations/sources/iterable.md index 3c9f151b3f5ad..9c249c82ef211 100644 --- a/docs/integrations/sources/iterable.md +++ b/docs/integrations/sources/iterable.md @@ -54,9 +54,9 @@ The Iterable connector should not run into Iterable API limitations under normal Please read [How to find your API key](https://support.iterable.com/hc/en-us/articles/360043464871-API-Keys-#creating-api-keys). - ## CHANGELOG | Version | Date | Pull Request | Subject | | :------ | :-------- | :----- | :------ | -| `0.1.7` | 2021-09-08 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Add new streams: events, campaign metrics | +| `0.1.8` | 2021-09-20 | [5915](https://github.com/airbytehq/airbyte/pull/5915) | Add new streams: campaign_metrics, events | +| `0.1.7` | 2021-09-20 | [6242](https://github.com/airbytehq/airbyte/pull/6242) | Updated schema for: campaigns, lists, templates, metadata | From 0e9f0c6878b88e1e1db897300e0c54cb8005e1c3 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Tue, 28 Sep 2021 16:42:23 +0300 Subject: [PATCH 10/12] Upd licence --- .../integration_tests/acceptance.py | 22 +------------------ .../connectors/source-iterable/main.py | 22 +------------------ .../connectors/source-iterable/setup.py | 22 +------------------ .../source-iterable/source_iterable/api.py | 22 +------------------ .../source-iterable/source_iterable/source.py | 22 +------------------ .../source-iterable/unit_tests/unit_test.py | 22 +------------------ 6 files changed, 6 insertions(+), 126 deletions(-) diff --git a/airbyte-integrations/connectors/source-iterable/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-iterable/integration_tests/acceptance.py index 883e9bff2df34..a294f4dcc5b7a 100644 --- a/airbyte-integrations/connectors/source-iterable/integration_tests/acceptance.py +++ b/airbyte-integrations/connectors/source-iterable/integration_tests/acceptance.py @@ -1,25 +1,5 @@ # -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. # import pytest diff --git a/airbyte-integrations/connectors/source-iterable/main.py b/airbyte-integrations/connectors/source-iterable/main.py index f751cdb7037a7..d2c437c5b0491 100644 --- a/airbyte-integrations/connectors/source-iterable/main.py +++ b/airbyte-integrations/connectors/source-iterable/main.py @@ -1,25 +1,5 @@ # -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. # diff --git a/airbyte-integrations/connectors/source-iterable/setup.py b/airbyte-integrations/connectors/source-iterable/setup.py index e6709d0f327e8..90f33d5300e34 100644 --- a/airbyte-integrations/connectors/source-iterable/setup.py +++ b/airbyte-integrations/connectors/source-iterable/setup.py @@ -1,25 +1,5 @@ # -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. # diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py index a5f61872bce1f..665ce4e1e41b2 100755 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py @@ -1,25 +1,5 @@ # -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. # import csv diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/source.py b/airbyte-integrations/connectors/source-iterable/source_iterable/source.py index 95dcc092f4a26..82aed9ed90c08 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/source.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/source.py @@ -1,25 +1,5 @@ # -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. # diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-iterable/unit_tests/unit_test.py index b8a8150b507fd..e1814314fc3b0 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/unit_test.py @@ -1,25 +1,5 @@ # -# MIT License -# -# Copyright (c) 2020 Airbyte -# -# Permission is hereby granted, free of charge, to any person obtaining a copy -# of this software and associated documentation files (the "Software"), to deal -# in the Software without restriction, including without limitation the rights -# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -# copies of the Software, and to permit persons to whom the Software is -# furnished to do so, subject to the following conditions: -# -# The above copyright notice and this permission notice shall be included in all -# copies or substantial portions of the Software. -# -# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -# SOFTWARE. +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. # From 87c6f3afce9a2f79b5ae2a657c14a6df7cc9c070 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Wed, 29 Sep 2021 15:08:06 +0300 Subject: [PATCH 11/12] Add bulk metrics retrieving --- .../source-iterable/source_iterable/api.py | 50 +++++++++++++------ 1 file changed, 35 insertions(+), 15 deletions(-) diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py index 665ce4e1e41b2..126c8837924b9 100755 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/api.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/api.py @@ -7,7 +7,7 @@ import urllib.parse as urlparse from abc import ABC, abstractmethod from io import StringIO -from typing import Any, Dict, Iterable, Mapping, MutableMapping, Optional, Union +from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Union import pendulum import requests @@ -15,6 +15,7 @@ from airbyte_cdk.sources.streams.http import HttpStream EVENT_ROWS_LIMIT = 200 +CAMPAIGNS_PER_REQUEST = 20 class IterableStream(HttpStream, ABC): @@ -171,29 +172,42 @@ def path(self, **kwargs) -> str: def request_params(self, stream_slice: Optional[Mapping[str, Any]] = None, **kwargs) -> MutableMapping[str, Any]: params = super().request_params(**kwargs) - params["campaignId"] = stream_slice.get("campaign_id") + params["campaignId"] = stream_slice.get("campaign_ids") params["startDateTime"] = self.start_date return params def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, any]]]: lists = Campaigns(api_key=self._api_key) + campaign_ids = [] for list_record in lists.read_records(sync_mode=kwargs.get("sync_mode", SyncMode.full_refresh)): - yield {"campaign_id": list_record["id"]} + campaign_ids.append(list_record["id"]) + + if len(campaign_ids) == CAMPAIGNS_PER_REQUEST: + yield {"campaign_ids": campaign_ids} + campaign_ids = [] + + if campaign_ids: + yield {"campaign_ids": campaign_ids} def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: content = response.content.decode() - yield {"data": self._parse_csv_string_to_dict(content)} + records = self._parse_csv_string_to_dict(content) + + for record in records: + yield {"data": record} @staticmethod - def _parse_csv_string_to_dict(csv_string: str) -> Dict[str, Any]: + def _parse_csv_string_to_dict(csv_string: str) -> List[Dict[str, Any]]: """ Parse a response with a csv type to dict object Example: - csv_string = "a,b,c - 1,2,3" + csv_string = "a,b,c,d + 1,2,,3 + 6,,1,2" - output = {"a": 1, "b": 2, "c": 3} + output = [{"a": 1, "b": 2, "d": 3}, + {"a": 6, "c": 1, "d": 2}] :param csv_string: API endpoint response with csv format @@ -202,13 +216,19 @@ def _parse_csv_string_to_dict(csv_string: str) -> Dict[str, Any]: """ reader = csv.DictReader(StringIO(csv_string), delimiter=",") - result = next(reader) - - for key, value in result.items(): - try: - result[key] = int(value) - except ValueError: - result[key] = float(value) + result = [] + + for row in reader: + for key, value in row.items(): + if value == "": + continue + try: + row[key] = int(value) + except ValueError: + row[key] = float(value) + row = {k: v for k, v in row.items() if v != ""} + + result.append(row) return result From 3df6cb5c586b15a2c675827eeb30458ac7fbcb21 Mon Sep 17 00:00:00 2001 From: Arthur Galuza Date: Fri, 1 Oct 2021 12:59:43 +0300 Subject: [PATCH 12/12] Actualize schema --- .../source_iterable/schemas/metadata.json | 62 +------------------ 1 file changed, 1 insertion(+), 61 deletions(-) diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/metadata.json b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/metadata.json index 6af7b89d55bee..98f35a056efbd 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/metadata.json +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/schemas/metadata.json @@ -1,67 +1,7 @@ { "properties": { - "table": { + "name": { "type": ["null", "string"] - }, - "key": { - "type": ["null", "string"] - }, - "size": { - "type": ["null", "integer"] - }, - "lastModified": { - "type": ["null", "integer"] - }, - "value": { - "type": ["null", "object"], - "properties": { - "inventory": { - "type": ["null", "integer"] - }, - "name": { - "type": ["null", "string"] - }, - "sku": { - "type": ["null", "string"] - }, - "url": { - "type": ["null", "string"] - }, - "description": { - "type": ["null", "string"] - }, - "price": { - "type": ["null", "integer"] - }, - "product_type": { - "type": ["null", "string"] - }, - "compare_at_price": { - "type": ["null", "number"] - }, - "id": { - "type": ["null", "string"] - }, - "product_id": { - "type": ["null", "string"] - }, - "categories": { - "type": ["null", "array"], - "items": {} - }, - "vendor": { - "type": ["null", "string"] - }, - "discount": { - "type": ["null", "integer"] - }, - "imageUrl": { - "type": ["null", "string"] - }, - "handle": { - "type": ["null", "string"] - } - } } }, "type": ["null", "object"]