From b4f802090429d930d8bb6fb4b3fe33664511884f Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Wed, 13 Oct 2021 23:01:36 +1100 Subject: [PATCH 1/8] add source onesignal --- airbyte-integrations/builds.md | 1 + .../connectors/source-onesignal/.dockerignore | 7 + .../connectors/source-onesignal/Dockerfile | 38 ++ .../connectors/source-onesignal/README.md | 132 ++++++ .../acceptance-test-config.yml | 29 ++ .../acceptance-test-docker.sh | 16 + .../connectors/source-onesignal/bootstrap.md | 28 ++ .../connectors/source-onesignal/build.gradle | 14 + .../integration_tests/__init__.py | 3 + .../integration_tests/abnormal_state.json | 8 + .../integration_tests/acceptance.py | 16 + .../integration_tests/catalog.json | 30 ++ .../integration_tests/configured_catalog.json | 40 ++ .../integration_tests/expected_records.txt | 5 + .../integration_tests/invalid_config.json | 5 + .../integration_tests/sample_config.json | 5 + .../integration_tests/sample_state.json | 8 + .../connectors/source-onesignal/main.py | 13 + .../source-onesignal/requirements.txt | 2 + .../connectors/source-onesignal/setup.py | 30 ++ .../source_onesignal/__init__.py | 8 + .../source_onesignal/schemas/apps.json | 86 ++++ .../source_onesignal/schemas/devices.json | 74 ++++ .../schemas/notifications.json | 410 ++++++++++++++++++ .../source_onesignal/schemas/outcomes.json | 16 + .../source_onesignal/source.py | 39 ++ .../source_onesignal/spec.json | 28 ++ .../source_onesignal/streams.py | 219 ++++++++++ .../source-onesignal/unit_tests/__init__.py | 3 + .../unit_tests/test_incremental_streams.py | 150 +++++++ .../unit_tests/test_source.py | 24 + .../unit_tests/test_streams.py | 71 +++ docs/SUMMARY.md | 1 + docs/integrations/README.md | 1 + docs/integrations/sources/onesignal.md | 58 +++ 35 files changed, 1618 insertions(+) create mode 100644 airbyte-integrations/connectors/source-onesignal/.dockerignore create mode 100644 airbyte-integrations/connectors/source-onesignal/Dockerfile create mode 100644 airbyte-integrations/connectors/source-onesignal/README.md create mode 100644 airbyte-integrations/connectors/source-onesignal/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-onesignal/acceptance-test-docker.sh create mode 100644 airbyte-integrations/connectors/source-onesignal/bootstrap.md create mode 100644 airbyte-integrations/connectors/source-onesignal/build.gradle create mode 100644 airbyte-integrations/connectors/source-onesignal/integration_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-onesignal/integration_tests/abnormal_state.json create mode 100644 airbyte-integrations/connectors/source-onesignal/integration_tests/acceptance.py create mode 100644 airbyte-integrations/connectors/source-onesignal/integration_tests/catalog.json create mode 100644 airbyte-integrations/connectors/source-onesignal/integration_tests/configured_catalog.json create mode 100644 airbyte-integrations/connectors/source-onesignal/integration_tests/expected_records.txt create mode 100644 airbyte-integrations/connectors/source-onesignal/integration_tests/invalid_config.json create mode 100644 airbyte-integrations/connectors/source-onesignal/integration_tests/sample_config.json create mode 100644 airbyte-integrations/connectors/source-onesignal/integration_tests/sample_state.json create mode 100644 airbyte-integrations/connectors/source-onesignal/main.py create mode 100644 airbyte-integrations/connectors/source-onesignal/requirements.txt create mode 100644 airbyte-integrations/connectors/source-onesignal/setup.py create mode 100644 airbyte-integrations/connectors/source-onesignal/source_onesignal/__init__.py create mode 100644 airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/apps.json create mode 100644 airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/devices.json create mode 100644 airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/notifications.json create mode 100644 airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/outcomes.json create mode 100644 airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py create mode 100644 airbyte-integrations/connectors/source-onesignal/source_onesignal/spec.json create mode 100644 airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py create mode 100644 airbyte-integrations/connectors/source-onesignal/unit_tests/__init__.py create mode 100644 airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py create mode 100644 airbyte-integrations/connectors/source-onesignal/unit_tests/test_source.py create mode 100644 airbyte-integrations/connectors/source-onesignal/unit_tests/test_streams.py create mode 100644 docs/integrations/sources/onesignal.md diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index fb11a5ba0a6e0..b8c47ef309f3a 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -53,6 +53,7 @@ | Mixpanel | [![source-mixpanel](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mixpanel%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mixpanel) | | Mongo DB | [![source-mongodb-v2](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mongodb-v2%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mongodb-v2) | | MySQL | [![source-mysql](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-mysql%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-mysql) | +| OneSignal | [![source-onesignal](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-onesignal%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-onesignal) | | Oracle DB | [![source-oracle](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-oracle%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-oracle) | | Paypal Transaction | [![paypal-transaction](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-paypal-transaction%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-paypal-transaction) | | Pipedrive | [![source-pipedrive](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fsource-pipedrive%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/source-pipedrive) | diff --git a/airbyte-integrations/connectors/source-onesignal/.dockerignore b/airbyte-integrations/connectors/source-onesignal/.dockerignore new file mode 100644 index 0000000000000..e90abeb5b1e43 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/.dockerignore @@ -0,0 +1,7 @@ +* +!Dockerfile +!Dockerfile.test +!main.py +!source_onesignal +!setup.py +!secrets diff --git a/airbyte-integrations/connectors/source-onesignal/Dockerfile b/airbyte-integrations/connectors/source-onesignal/Dockerfile new file mode 100644 index 0000000000000..cdce97e72c688 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/Dockerfile @@ -0,0 +1,38 @@ +FROM python:3.7.11-alpine3.14 as base + +# build and load all requirements +FROM base as builder +WORKDIR /airbyte/integration_code + +# upgrade pip to the latest version +RUN apk --no-cache upgrade \ + && pip install --upgrade pip \ + && apk --no-cache add tzdata build-base + + +COPY setup.py ./ +# install necessary packages to a temporary folder +RUN pip install --prefix=/install . + +# build a clean environment +FROM base +WORKDIR /airbyte/integration_code + +# copy all loaded and built libraries to a pure basic image +COPY --from=builder /install /usr/local +# add default timezone settings +COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime +RUN echo "Etc/UTC" > /etc/timezone + +# bash is installed for more convenient debugging. +RUN apk --no-cache add bash + +# copy payload code only +COPY main.py ./ +COPY source_onesignal ./source_onesignal + +ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" +ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-onesignal diff --git a/airbyte-integrations/connectors/source-onesignal/README.md b/airbyte-integrations/connectors/source-onesignal/README.md new file mode 100644 index 0000000000000..fd8185c740bcc --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/README.md @@ -0,0 +1,132 @@ +# Onesignal Source + +This is the repository for the Onesignal source connector, written in Python. +For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/onesignal). + +## Local development + +### Prerequisites +**To iterate on this connector, make sure to complete this prerequisites section.** + +#### Minimum Python version required `= 3.7.0` + +#### Build & Activate Virtual Environment and install dependencies +From this connector directory, create a virtual environment: +``` +python -m venv .venv +``` + +This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your +development environment of choice. To activate it from the terminal, run: +``` +source .venv/bin/activate +pip install -r requirements.txt +pip install '.[tests]' +``` +If you are in an IDE, follow your IDE's instructions to activate the virtualenv. + +Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is +used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`. +If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything +should work as you expect. + +#### Building via Gradle +You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow. + +To build using Gradle, from the Airbyte repository root, run: +``` +./gradlew :airbyte-integrations:connectors:source-onesignal:build +``` + +#### Create credentials +**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/onesignal) +to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_onesignal/spec.json` file. +Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. +See `integration_tests/sample_config.json` for a sample config file. + +**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source onesignal test creds` +and place them into `secrets/config.json`. + +### Locally running the connector +``` +python main.py spec +python main.py check --config secrets/config.json +python main.py discover --config secrets/config.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json +``` + +### Locally running the connector docker image + +#### Build +First, make sure you build the latest Docker image: +``` +docker build . -t airbyte/source-onesignal:dev +``` + +You can also build the connector image via Gradle: +``` +./gradlew :airbyte-integrations:connectors:source-onesignal:airbyteDocker +``` +When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in +the Dockerfile. + +#### Run +Then run any of the connector commands as follows: +``` +docker run --rm airbyte/source-onesignal:dev spec +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-onesignal:dev check --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-onesignal:dev discover --config /secrets/config.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-onesignal:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json +``` +## Testing +Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. +First install test dependencies into your virtual environment: +``` +pip install .[tests] +``` +### Unit Tests +To run unit tests locally, from the connector directory run: +``` +python -m pytest unit_tests +``` + +### Integration Tests +There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector). +#### Custom Integration tests +Place custom tests inside `integration_tests/` folder, then, from the connector root, run +``` +python -m pytest integration_tests +``` +#### Acceptance Tests +Customize `acceptance-test-config.yml` file to configure tests. See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) for more information. +If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py. +To run your integration tests with acceptance tests, from the connector root, run +``` +python -m pytest integration_tests -p integration_tests.acceptance +``` +To run your integration tests with docker + +### Using gradle to run tests +All commands should be run from airbyte project root. +To run unit tests: +``` +./gradlew :airbyte-integrations:connectors:source-onesignal:unitTest +``` +To run acceptance and custom integration tests: +``` +./gradlew :airbyte-integrations:connectors:source-onesignal:integrationTest +``` + +## Dependency Management +All of your dependencies should go in `setup.py`, NOT `requirements.txt`. The requirements file is only used to connect internal Airbyte dependencies in the monorepo for local development. +We split dependencies between two groups, dependencies that are: +* required for your connector to work need to go to `MAIN_REQUIREMENTS` list. +* required for the testing need to go to `TEST_REQUIREMENTS` list + +### Publishing a new version of the connector +You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what? +1. Make sure your changes are passing unit and integration tests. +1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)). +1. Create a Pull Request. +1. Pat yourself on the back for being an awesome contributor. +1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master. diff --git a/airbyte-integrations/connectors/source-onesignal/acceptance-test-config.yml b/airbyte-integrations/connectors/source-onesignal/acceptance-test-config.yml new file mode 100644 index 0000000000000..68ba6b4ee03bd --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/acceptance-test-config.yml @@ -0,0 +1,29 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-onesignal:dev +tests: + spec: + - spec_path: "source_onesignal/spec.json" + connection: + - config_path: "secrets/config.json" + status: "succeed" + - config_path: "integration_tests/invalid_config.json" + status: "failed" + discovery: + - config_path: "secrets/config.json" + basic_read: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + empty_streams: [] + expect_records: + path: "integration_tests/expected_records.txt" + extra_fields: no + exact_order: no + extra_records: yes + incremental: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" + future_state_path: "integration_tests/abnormal_state.json" + full_refresh: + - config_path: "secrets/config.json" + configured_catalog_path: "integration_tests/configured_catalog.json" diff --git a/airbyte-integrations/connectors/source-onesignal/acceptance-test-docker.sh b/airbyte-integrations/connectors/source-onesignal/acceptance-test-docker.sh new file mode 100644 index 0000000000000..e4d8b1cef8961 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/acceptance-test-docker.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env sh + +# Build latest connector image +docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2) + +# Pull latest acctest image +docker pull airbyte/source-acceptance-test:latest + +# Run +docker run --rm -it \ + -v /var/run/docker.sock:/var/run/docker.sock \ + -v /tmp:/tmp \ + -v $(pwd):/test_input \ + airbyte/source-acceptance-test \ + --acceptance-test-config /test_input + diff --git a/airbyte-integrations/connectors/source-onesignal/bootstrap.md b/airbyte-integrations/connectors/source-onesignal/bootstrap.md new file mode 100644 index 0000000000000..25a9a5fac2cfa --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/bootstrap.md @@ -0,0 +1,28 @@ +# OneSignal + +## Overview + +OneSignal is a customer messaging and engagement platform that allows businesses to create meaningful customer connections. OneSignal REST API allows a developer to retrieve audience and messaging information on the OneSignal platform. + +## Endpoints + +OneSignal API consists of four endpoints which can be extracted data from: + +1. **App**: The collection of audience and messaging channels. +2. **Device**: A customer's device which can send message to, it is associated with app. +3. **Notification**: A messaging activity associated with app. +4. **Outcome**: Aggregated information associated with app, for example, session duration, number of clicks, etc. + +## Quick Notes + +- Each app has its own authentication key to retrieve its devices, notifications and outcomes. The key can be found in the app's endpoint response. + +- Device and notification endpoint has 300 and 50 records limit per request respectively, so the cursor pagination strategy is used for them. + +- Rate limiting is a standard exponential backoff when a 429 HTTP status code returned. + +- For the outcome endpoint, it needs to specify a comma-separated list of names and the value (sum/count) for the returned outcome data. So this requirement is added to the source spec. + +## API Reference + +The API reference documents: [https://documentation.onesignal.com/reference](https://documentation.onesignal.com/reference) diff --git a/airbyte-integrations/connectors/source-onesignal/build.gradle b/airbyte-integrations/connectors/source-onesignal/build.gradle new file mode 100644 index 0000000000000..eaf6332d50c0a --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/build.gradle @@ -0,0 +1,14 @@ +plugins { + id 'airbyte-python' + id 'airbyte-docker' + id 'airbyte-source-acceptance-test' +} + +airbytePython { + moduleDirectory 'source_onesignal' +} + +dependencies { + implementation files(project(':airbyte-integrations:bases:source-acceptance-test').airbyteDocker.outputs) + implementation files(project(':airbyte-integrations:bases:base-python').airbyteDocker.outputs) +} diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/__init__.py b/airbyte-integrations/connectors/source-onesignal/integration_tests/__init__.py new file mode 100644 index 0000000000000..46b7376756ec6 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/abnormal_state.json b/airbyte-integrations/connectors/source-onesignal/integration_tests/abnormal_state.json new file mode 100644 index 0000000000000..11e8591caa0d4 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/abnormal_state.json @@ -0,0 +1,8 @@ +{ + "devices": { + "created_at": 33190962600 + }, + "notifications": { + "queued_at": 33190962600 + } +} diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-onesignal/integration_tests/acceptance.py new file mode 100644 index 0000000000000..58c194c5d1376 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/acceptance.py @@ -0,0 +1,16 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import pytest + +pytest_plugins = ("source_acceptance_test.plugin",) + + +@pytest.fixture(scope="session", autouse=True) +def connector_setup(): + """ This fixture is a placeholder for external resources that acceptance test might require.""" + # TODO: setup test dependencies if needed. otherwise remove the TODO comments + yield + # TODO: clean up test dependencies diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/catalog.json b/airbyte-integrations/connectors/source-onesignal/integration_tests/catalog.json new file mode 100644 index 0000000000000..101a261a29650 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/catalog.json @@ -0,0 +1,30 @@ +{ + "streams": [ + { + "name": "apps", + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "json_schema": {} + }, + { + "name": "devices", + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": "created_at", + "json_schema": {} + }, + { + "name": "notifications", + "supported_sync_modes": ["full_refresh", "incremental"], + "source_defined_cursor": false, + "default_cursor_field": "queued_at", + "json_schema": {} + }, + { + "name": "outcomes", + "supported_sync_modes": ["full_refresh"], + "source_defined_cursor": false, + "json_schema": {} + } + ] +} diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-onesignal/integration_tests/configured_catalog.json new file mode 100644 index 0000000000000..368d6f6bfbcf2 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/configured_catalog.json @@ -0,0 +1,40 @@ +{ + "streams": [ + { + "stream": { + "name": "apps", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + }, + { + "stream": { + "name": "devices", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "notifications", + "json_schema": {}, + "supported_sync_modes": ["full_refresh", "incremental"] + }, + "sync_mode": "incremental", + "destination_sync_mode": "append" + }, + { + "stream": { + "name": "outcomes", + "json_schema": {}, + "supported_sync_modes": ["full_refresh"] + }, + "sync_mode": "full_refresh", + "destination_sync_mode": "overwrite" + } + ] +} diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/expected_records.txt b/airbyte-integrations/connectors/source-onesignal/integration_tests/expected_records.txt new file mode 100644 index 0000000000000..d74249dd06811 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/expected_records.txt @@ -0,0 +1,5 @@ +{"stream": "apps", "data": {"id": "5b25d0c6-cd4a-4948-8285-8f27d3cd0b8a", "name": "test2", "gcm_key": null, "chrome_key": null, "chrome_web_key": null, "chrome_web_origin": null, "chrome_web_gcm_sender_id": null, "chrome_web_default_notification_icon": null, "chrome_web_sub_domain": null, "apns_env": null, "apns_certificates": null, "safari_apns_certificate": null, "safari_site_origin": null, "safari_push_id": null, "safari_icon_16_16": "public/safari_packages/5b25d0c6-cd4a-4948-8285-8f27d3cd0b8a/icons/16x16.png", "safari_icon_32_32": "public/safari_packages/5b25d0c6-cd4a-4948-8285-8f27d3cd0b8a/icons/16x16@2x.png", "safari_icon_64_64": "public/safari_packages/5b25d0c6-cd4a-4948-8285-8f27d3cd0b8a/icons/32x32@2x.png", "safari_icon_128_128": "public/safari_packages/5b25d0c6-cd4a-4948-8285-8f27d3cd0b8a/icons/128x128.png", "safari_icon_256_256": "public/safari_packages/5b25d0c6-cd4a-4948-8285-8f27d3cd0b8a/icons/128x128@2x.png", "site_name": null, "created_at": "2021-10-11T00:38:51.906Z", "updated_at": "2021-10-11T00:38:52.038Z", "players": 0, "messageable_players": 0, "basic_auth_key": "ZTkwNjBkZjQtOTkxMC00NjFhLTgzNWMtMTU1YjYyNGNjMjQ3", "additional_data_is_root_payload": false}, "emitted_at": 1634085727000} +{"stream": "apps", "data": {"id": "01969af0-ed1d-48f5-941d-95aeacf868d1", "name": "test", "gcm_key": null, "chrome_key": null, "chrome_web_key": null, "chrome_web_origin": null, "chrome_web_gcm_sender_id": null, "chrome_web_default_notification_icon": null, "chrome_web_sub_domain": null, "apns_env": null, "apns_certificates": null, "safari_apns_certificate": null, "safari_site_origin": null, "safari_push_id": null, "safari_icon_16_16": "public/safari_packages/01969af0-ed1d-48f5-941d-95aeacf868d1/icons/16x16.png", "safari_icon_32_32": "public/safari_packages/01969af0-ed1d-48f5-941d-95aeacf868d1/icons/16x16@2x.png", "safari_icon_64_64": "public/safari_packages/01969af0-ed1d-48f5-941d-95aeacf868d1/icons/32x32@2x.png", "safari_icon_128_128": "public/safari_packages/01969af0-ed1d-48f5-941d-95aeacf868d1/icons/128x128.png", "safari_icon_256_256": "public/safari_packages/01969af0-ed1d-48f5-941d-95aeacf868d1/icons/128x128@2x.png", "site_name": null, "created_at": "2021-10-10T08:25:34.450Z", "updated_at": "2021-10-10T08:25:34.762Z", "players": 1, "messageable_players": 1, "basic_auth_key": "ZjgwNTU3ZjUtOWRkMC00ZTE4LWE4MWItZDkxZDk5MmE0NWJj", "additional_data_is_root_payload": false}, "emitted_at": 1634085727000} +{"stream": "notifications", "data": {"adm_big_picture": null, "adm_group": null, "adm_group_message": null, "adm_large_icon": null, "adm_small_icon": null, "adm_sound": null, "spoken_text": null, "alexa_ssml": null, "alexa_display_title": null, "amazon_background_data": null, "android_accent_color": null, "android_group": null, "android_group_message": null, "android_led_color": null, "android_sound": null, "android_visibility": null, "app_id": "01969af0-ed1d-48f5-941d-95aeacf868d1", "big_picture": null, "buttons": null, "canceled": false, "chrome_big_picture": null, "chrome_icon": null, "chrome_web_icon": null, "chrome_web_image": null, "chrome_web_badge": null, "content_available": null, "contents": {}, "converted": 0, "data": null, "delayed_option": "immediate", "delivery_time_of_day": "Wednesday, October 13, 2021 10:06 AM UTC+11:00", "errored": 0, "excluded_segments": [], "failed": 0, "firefox_icon": null, "global_image": null, "headings": {}, "id": "8ecb3bc9-f5a8-4afa-a389-1df9c358d324", "include_player_ids": null, "include_external_user_ids": null, "included_segments": ["Subscribed Users"], "thread_id": null, "ios_badgeCount": null, "ios_badgeType": null, "ios_category": null, "ios_interruption_level": null, "ios_relevance_score": null, "ios_sound": null, "apns_alert": {}, "target_content_identifier": null, "isAdm": null, "isAndroid": null, "isChrome": null, "isChromeWeb": null, "isAlexa": null, "isFirefox": null, "isIos": null, "isSafari": null, "isWP": null, "isWP_WNS": null, "isEdge": null, "isHuawei": null, "isSMS": null, "large_icon": null, "priority": null, "queued_at": 1634079976, "remaining": 0, "send_after": 1634079976, "completed_at": 1634079977, "small_icon": null, "successful": 1, "received": null, "tags": null, "filters": null, "template_id": null, "ttl": null, "url": null, "web_url": null, "app_url": null, "web_buttons": null, "web_push_topic": null, "wp_sound": null, "wp_wns_sound": null, "platform_delivery_stats": {"email": {"successful": 1, "failed": 0, "errored": 0, "converted": 0, "received": 0}}, "ios_attachments": null, "huawei_sound": null, "huawei_led_color": null, "huawei_accent_color": null, "huawei_visibility": null, "huawei_group": null, "huawei_group_message": null, "huawei_channel_id": null, "huawei_existing_channel_id": null, "huawei_small_icon": null, "huawei_large_icon": null, "huawei_big_picture": null, "huawei_msg_type": null, "throttle_rate_per_minute": null, "sms_from": null, "sms_media_urls": null, "name": null, "email_click_tracking_disabled": false, "fcap_group_ids": null, "fcap_status": "uncapped"}, "emitted_at": 1634085731000} +{"stream": "outcomes", "data": {"id": "os__click", "aggregation": "count", "value": 0}, "emitted_at": 1634085732000} +{"stream": "outcomes", "data": {"id": "os__click", "aggregation": "count", "value": 0}, "emitted_at": 1634085733000} diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/invalid_config.json b/airbyte-integrations/connectors/source-onesignal/integration_tests/invalid_config.json new file mode 100644 index 0000000000000..92189dab1ad28 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/invalid_config.json @@ -0,0 +1,5 @@ +{ + "user_auth_key": "wrong key", + "start_date": "2021-04-01T04:20:02Z", + "outcome_names": "" +} diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/sample_config.json b/airbyte-integrations/connectors/source-onesignal/integration_tests/sample_config.json new file mode 100644 index 0000000000000..c8a93ecb47152 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/sample_config.json @@ -0,0 +1,5 @@ +{ + "user_auth_key": "some auth key", + "start_date": "2021-04-01T04:20:02Z", + "outcome_names": "os__click.count" +} diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/sample_state.json b/airbyte-integrations/connectors/source-onesignal/integration_tests/sample_state.json new file mode 100644 index 0000000000000..fd1aa3a792702 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/sample_state.json @@ -0,0 +1,8 @@ +{ + "devices": { + "created_at": 1580510247 + }, + "notifications": { + "queued_at": 1580510247 + } +} diff --git a/airbyte-integrations/connectors/source-onesignal/main.py b/airbyte-integrations/connectors/source-onesignal/main.py new file mode 100644 index 0000000000000..39e79c05fd3ce --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/main.py @@ -0,0 +1,13 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import sys + +from airbyte_cdk.entrypoint import launch +from source_onesignal import SourceOnesignal + +if __name__ == "__main__": + source = SourceOnesignal() + launch(source, sys.argv[1:]) diff --git a/airbyte-integrations/connectors/source-onesignal/requirements.txt b/airbyte-integrations/connectors/source-onesignal/requirements.txt new file mode 100644 index 0000000000000..0411042aa0911 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/requirements.txt @@ -0,0 +1,2 @@ +-e ../../bases/source-acceptance-test +-e . diff --git a/airbyte-integrations/connectors/source-onesignal/setup.py b/airbyte-integrations/connectors/source-onesignal/setup.py new file mode 100644 index 0000000000000..c80f3bf47c84b --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/setup.py @@ -0,0 +1,30 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from setuptools import find_packages, setup + +MAIN_REQUIREMENTS = [ + "airbyte-cdk", +] + +TEST_REQUIREMENTS = [ + "pytest~=6.1", + "pytest-mock~=3.6.1", + "source-acceptance-test", + "requests-mock", +] + +setup( + name="source_onesignal", + description="Source implementation for Onesignal.", + author="Airbyte", + author_email="contact@airbyte.io", + packages=find_packages(), + install_requires=MAIN_REQUIREMENTS, + package_data={"": ["*.json", "schemas/*.json", "schemas/shared/*.json"]}, + extras_require={ + "tests": TEST_REQUIREMENTS, + }, +) diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/__init__.py b/airbyte-integrations/connectors/source-onesignal/source_onesignal/__init__.py new file mode 100644 index 0000000000000..4a3b71533fd63 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/__init__.py @@ -0,0 +1,8 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from .source import SourceOnesignal + +__all__ = ["SourceOnesignal"] diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/apps.json b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/apps.json new file mode 100644 index 0000000000000..8e59974b6c80d --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/apps.json @@ -0,0 +1,86 @@ +{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "type": "string" + }, + "name": { + "type": ["null", "string"] + }, + "players": { + "type": "integer" + }, + "messageable_players": { + "type": "integer" + }, + "updated_at": { + "type": ["null", "string"] + }, + "created_at": { + "type": ["null", "string"] + }, + "gcm_key": { + "type": ["null", "string"] + }, + "chrome_key": { + "type": ["null", "string"] + }, + "chrome_web_key": { + "type": ["null", "string"] + }, + "chrome_web_origin": { + "type": ["null", "string"] + }, + "chrome_web_gcm_sender_id": { + "type": ["null", "string"] + }, + "chrome_web_default_notification_icon": { + "type": ["null", "string"] + }, + "chrome_web_sub_domain": { + "type": ["null", "string"] + }, + "apns_env": { + "type": ["null", "string"] + }, + "apns_certificates": { + "type": ["null", "string"] + }, + "safari_apns_certificate": { + "type": ["null", "string"] + }, + "safari_site_origin": { + "type": ["null", "string"] + }, + "safari_push_id": { + "type": ["null", "string"] + }, + "safari_icon_16_16": { + "type": ["null", "string"] + }, + "safari_icon_32_32": { + "type": ["null", "string"] + }, + "safari_icon_64_64": { + "type": ["null", "string"] + }, + "safari_icon_128_128": { + "type": ["null", "string"] + }, + "safari_icon_256_256": { + "type": ["null", "string"] + }, + "site_name": { + "type": ["null", "string"] + }, + "basic_auth_key": { + "type": "string" + }, + "additional_data_is_root_payload": { + "type": ["null", "boolean"] + } + } +} + diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/devices.json b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/devices.json new file mode 100644 index 0000000000000..283b17ecba580 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/devices.json @@ -0,0 +1,74 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "type": "string" + }, + "identifier": { + "type": ["null", "string"] + }, + "session_count": { + "type": ["null", "integer"] + }, + "language": { + "type": ["null", "string"] + }, + "timezone": { + "type": ["null", "integer"] + }, + "game_version": { + "type": ["null", "string"] + }, + "device_os": { + "type": ["null", "string"] + }, + "device_type": { + "type": ["null", "integer"] + }, + "device_model": { + "type": ["null", "string"] + }, + "ad_id": { + "type": ["null", "string"] + }, + "tags": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": {} + }, + "last_active": { + "type": ["null", "integer"] + }, + "amount_spent": { + "type": ["null", "number"] + }, + "created_at": { + "type": ["null", "integer"] + }, + "invalid_identifier": { + "type": ["null", "boolean"] + }, + "badge_count": { + "type": ["null", "integer"] + }, + "playtime": { + "type": ["null", "integer"] + }, + "sdk": { + "type": ["null", "string"] + }, + "test_type": { + "type": ["null", "string"] + }, + "ip": { + "type": ["null", "string"] + }, + "external_user_id": { + "type": ["null", "string"] + } + } +} + + diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/notifications.json b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/notifications.json new file mode 100644 index 0000000000000..a19d5b63df2bd --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/notifications.json @@ -0,0 +1,410 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "adm_big_picture": { + "type": ["null", "string"] + }, + "adm_group": { + "type": ["null", "string"] + }, + "adm_group_message": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "en": { + "type": ["null", "string"] + } + } + }, + "adm_large_icon": { + "type": ["null", "string"] + }, + "adm_small_icon": { + "type": ["null", "string"] + }, + "adm_sound": { + "type": ["null", "string"] + }, + "spoken_text": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": {} + }, + "alexa_ssml": { + "type": ["null", "string"] + }, + "alexa_display_title": { + "type": ["null", "string"] + }, + "amazon_background_data": { + "type": ["null", "boolean"] + }, + "android_accent_color": { + "type": ["null", "string"] + }, + "android_group": { + "type": ["null", "string"] + }, + "android_group_message": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "en": { + "type": ["null", "string"] + } + } + }, + "android_led_color": { + "type": ["null", "string"] + }, + "android_sound": { + "type": ["null", "string"] + }, + "android_visibility": { + "type": ["null", "integer"] + }, + "app_id": { + "type": "string" + }, + "big_picture": { + "type": ["null", "string"] + }, + "buttons": { + "type": ["null", "string"] + }, + "canceled": { + "type": ["null", "boolean"] + }, + "chrome_big_picture": { + "type": ["null", "string"] + }, + "chrome_icon": { + "type": ["null", "string"] + }, + "chrome_web_icon": { + "type": ["null", "string"] + }, + "chrome_web_image": { + "type": ["null", "string"] + }, + "chrome_web_badge": { + "type": ["null", "string"] + }, + "content_available": { + "type": ["null", "boolean"] + }, + "name": { + "type": ["null", "string"] + }, + "contents": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "en": { + "type": ["null", "string"] + } + } + }, + "converted": { + "type": ["null", "integer"] + }, + "data": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": {} + }, + "delayed_option": { + "type": ["null", "string"] + }, + "delivery_time_of_day": { + "type": ["null", "string"] + }, + "errored": { + "type": ["null", "integer"] + }, + "excluded_segments": { + "type": ["null", "array"], + "items": { + "type": "string" + } + }, + "failed": { + "type": ["null", "integer"] + }, + "firefox_icon": { + "type": ["null", "string"] + }, + "global_image": { + "type": ["null", "string"] + }, + "headings": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "en": { + "type": ["null", "string"] + } + } + }, + + "huawei_accent_color": { + "type": ["null", "string"] + }, + "huawei_big_picture": { + "type": ["null", "string"] + }, + "huawei_channel_id": { + "type": ["null", "string"] + }, + "huawei_existing_channel_id": { + "type": ["null", "string"] + }, + "huawei_group": { + "type": ["null", "string"] + }, + "huawei_group_message": { + "type": ["null", "string"] + }, + "huawei_large_icon": { + "type": ["null", "string"] + }, + "huawei_led_color": { + "type": ["null", "string"] + }, + "huawei_msg_type": { + "type": ["null", "string"] + }, + "huawei_small_icon": { + "type": ["null", "string"] + }, + "huawei_sound": { + "type": ["null", "string"] + }, + "huawei_visibility": { + "type": ["null", "string"] + }, + "id": { + "type": "string" + }, + "include_player_ids": { + "type": ["null", "string"] + }, + "include_external_user_ids": { + "type": ["null", "string"] + }, + "included_segments": { + "type": "array", + "items": { + "type": "string" + } + }, + "thread_id": { + "type": ["null", "string"] + }, + "ios_badgeCount": { + "type": ["null", "integer"] + }, + "ios_badgeType": { + "type": ["null", "string"] + }, + "ios_category": { + "type": ["null", "string"] + }, + "ios_interruption_level": { + "type": ["null", "string"] + }, + "ios_relevance_score": { + "type": ["null", "number"] + }, + "ios_sound": { + "type": ["null", "string"] + }, + "apns_alert": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": {} + }, + "target_content_identifier": { + "type": ["null", "string"] + }, + "isAdm": { + "type": ["null", "boolean"] + }, + "isAndroid": { + "type": ["null", "boolean"] + }, + "isChrome": { + "type": ["null", "boolean"] + }, + "isChromeWeb": { + "type": ["null", "boolean"] + }, + "isAlexa": { + "type": ["null", "boolean"] + }, + "isFirefox": { + "type": ["null", "boolean"] + }, + "isIos": { + "type": ["null", "boolean"] + }, + "isSafari": { + "type": ["null", "boolean"] + }, + "isWP": { + "type": ["null", "boolean"] + }, + "isWP_WNS": { + "type": ["null", "boolean"] + }, + "isEdge": { + "type": ["null", "boolean"] + }, + "isHuawei": { + "type": ["null", "boolean"] + }, + "isSMS": { + "type": ["null", "boolean"] + }, + "large_icon": { + "type": ["null", "string"] + }, + "priority": { + "type": ["null", "integer"] + }, + "queued_at": { + "type": "integer" + }, + "remaining": { + "type": ["null", "integer"] + }, + "send_after": { + "type": ["null", "integer"] + }, + "completed_at": { + "type": ["null", "integer"] + }, + "small_icon": { + "type": ["null", "string"] + }, + "successful": { + "type": ["null", "integer"] + }, + "received": { + "type": ["null", "integer"] + }, + "tags": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": {} + }, + "filters": { + "type": ["null", "string"] + }, + "template_id": { + "type": ["null", "string"] + }, + "ttl": { + "type": ["null", "integer"] + }, + "url": { + "type": ["null", "string"] + }, + "web_url": { + "type": ["null", "string"] + }, + "app_url": { + "type": ["null", "string"] + }, + "web_buttons": { + "type": ["null", "string"] + }, + "web_push_topic": { + "type": ["null", "string"] + }, + "wp_sound": { + "type": ["null", "string"] + }, + "wp_wns_sound": { + "type": ["null", "string"] + }, + "platform_delivery_stats": { + "type": "object", + "additionalProperties": true, + "properties": { + "android": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "successful": { + "type": "integer" + }, + "errored": { + "type": "integer" + }, + "failed": { + "type": "integer" + }, + "converted": { + "type": "integer" + }, + "received": { + "type": "integer" + } + } + }, + "ios": { + "type": ["null", "object"], + "additionalProperties": false, + "properties": { + "successful": { + "type": "integer" + }, + "errored": { + "type": "integer" + }, + "failed": { + "type": "integer" + }, + "converted": { + "type": "integer" + }, + "received": { + "type": "integer" + } + } + } + } + }, + "ios_attachments": { + "type": ["null", "object"], + "additionalProperties": true, + "properties": { + "id": { + "type": ["null", "string"] + } + } + }, + "throttle_rate_per_minute": { + "type": ["null", "integer"] + }, + "sms_from": { + "type": ["null", "string"] + }, + "sms_media_urls": { + "type": ["null", "string"] + }, + "email_click_tracking_disabled": { + "type": ["null", "boolean"] + }, + "fcap_group_ids": { + "type": ["null", "string"] + }, + "fcap_status": { + "type": ["null", "string"] + } + } +} + diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/outcomes.json b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/outcomes.json new file mode 100644 index 0000000000000..e030d67a47641 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/outcomes.json @@ -0,0 +1,16 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "type": "object", + "additionalProperties": false, + "properties": { + "id": { + "type": "string" + }, + "value": { + "type": "integer" + }, + "aggregation": { + "type": "string" + } + } +} diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py b/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py new file mode 100644 index 0000000000000..abf75a0ee372f --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py @@ -0,0 +1,39 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +from abc import ABC +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple + +import requests +from airbyte_cdk.logger import AirbyteLogger +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator + +from .streams import Apps, Devices, Notifications, Outcomes + + +class SourceOnesignal(AbstractSource): + def check_connection(self, logger, config) -> Tuple[bool, any]: + try: + authenticator = TokenAuthenticator(config["user_auth_key"], "Basic") + stream = Apps(authenticator=authenticator, config=config) + records = stream.read_records(sync_mode=SyncMode.full_refresh) + next(records) + return True, None + except requests.exceptions.RequestException as e: + return False, e + + def streams(self, config: Mapping[str, Any]) -> List[Stream]: + AirbyteLogger().log("INFO", f"Using start_date: {config['start_date']}") + + authenticator = TokenAuthenticator(config["user_auth_key"], "Basic") + args = { "authenticator": authenticator, "config": config } + apps = Apps(**args) + args = { "parent": apps, **args } + + return [apps, Devices(**args), Notifications(**args), Outcomes(**args)] diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/spec.json b/airbyte-integrations/connectors/source-onesignal/source_onesignal/spec.json new file mode 100644 index 0000000000000..3fa62e1f8a110 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/spec.json @@ -0,0 +1,28 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/onesignal", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "OneSignal Source Spec", + "type": "object", + "required": ["user_auth_key", "start_date", "outcome_names"], + "additionalProperties": false, + "properties": { + "user_auth_key": { + "type": "string", + "description": "OneSignal User Auth Key, see the docs for more information on how to obtain this key.", + "airbyte_secret": true + }, + "start_date": { + "type": "string", + "description": "The date from which you'd like to replicate data for OneSignal API, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.", + "examples": ["2020-11-16T00:00:00Z"], + "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$" + }, + "outcome_names": { + "type": "string", + "description": "Comma-separated list of names and the value (sum/count) for the returned outcome data. See the docs for more details", + "examples": ["os__session_duration.count,os__click.count,CustomOutcomeName.sum"] + } + } + } +} diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py new file mode 100644 index 0000000000000..46cb01dfb364c --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py @@ -0,0 +1,219 @@ +import time +from abc import ABC +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +from datetime import datetime + +import requests +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator + + +class OnesignalStream(HttpStream, ABC): + + url_base = "https://onesignal.com/api/v1/" + + primary_key = "id" + + def __init__(self, config: Mapping[str, Any], **kwargs): + super().__init__(**kwargs) + self._auth_token = config["user_auth_key"] + self.start_date = 0 + if isinstance(config.get("start_date"), str): + # OneSignal uses epoch timestamp, so we need to convert the start_date + # config to epoch timestamp too + # start_date example: 2021-01-01T00:00:00Z + self.start_date = int(datetime.fromisoformat( + config["start_date"].replace('Z','+00:00') + ).timestamp()) + + def next_page_token( + self, + response: requests.Response, + ) -> Optional[Mapping[str, Any]]: + return None + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + # OneSignal needs different auth token for each app, so we inject it + # right before read records request. + # Note: The _session.auth can be replaced when HttpStream provided + # some ways to get its request authenticator in the future + token = self._auth_token + if stream_slice: + token = stream_slice.get("rest_api_key", token) + self._session.auth = TokenAuthenticator(token, "Basic") + + return super().read_records(sync_mode, cursor_field, stream_slice, stream_state) + + def parse_response( + self, + response: requests.Response, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Iterable[Mapping]: + data = response.json() + yield from data + + # wait a while to avoid rate limit + time.sleep(1) + + +class ChildStreamMixin(HttpSubStream): + def stream_slices( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Optional[Mapping[str, Any]]]: + parents = super().stream_slices(sync_mode, cursor_field, stream_state) + for item in parents: + parent = item["parent"] + yield { + "app_id": parent["id"], + "rest_api_key": parent["basic_auth_key"], + } + yield from [] + + # default record filter, do nothing + def filter_by_state( + self, + stream_state: Mapping[str, Any] = None, + record: Mapping[str, Any] = None + ) -> Iterable: + yield record + + def parse_response( + self, + response: requests.Response, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> Iterable[Mapping]: + data = response.json().get(self.data_field) + for record in data: + yield from self.filter_by_state(stream_state=stream_state, record=record) + + +class IncrementalOnesignalStream(ChildStreamMixin, OnesignalStream, ABC): + + cursor_field = "updated_at" + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state, stream_slice, next_page_token) + params["app_id"] = stream_slice["app_id"] + params["limit"] = self.page_size + if next_page_token: + params["offset"] = next_page_token["offset"] + return params + + def next_page_token( + self, + response: requests.Response, + ) -> Optional[Mapping[str, Any]]: + resp = response.json() + total = resp["total_count"] + next_offset = resp["offset"] + resp["limit"] + if next_offset < total: + return { "offset": next_offset } + + def filter_by_state( + self, + stream_state: Mapping[str, Any] = None, + record: Mapping[str, Any] = None + ) -> Iterable: + value = 0 + if record: + value = record.get(self.cursor_field, value) + if not stream_state or value >= stream_state.get(self.cursor_field, 0): + yield record + + def get_updated_state( + self, + current_stream_state: MutableMapping[str, Any], + latest_record: Mapping[str, Any], + ) -> Mapping[str, Any]: + state_date = current_stream_state.get(self.cursor_field, self.start_date) + record_date = latest_record.get(self.cursor_field, self.start_date) + return { self.cursor_field: max(state_date, record_date) } + + +class Apps(OnesignalStream): + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> str: + return "apps" + + +class Devices(IncrementalOnesignalStream): + + cursor_field = "created_at" + data_field = "players" + page_size = 300 # page size limit set by OneSignal + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> str: + return "players" + + +class Notifications(IncrementalOnesignalStream): + + cursor_field = "queued_at" + data_field = "notifications" + page_size = 50 # page size limit set by OneSignal + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> str: + return "notifications" + + +class Outcomes(ChildStreamMixin, OnesignalStream): + + data_field = "outcomes" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.outcome_names = kwargs["config"]["outcome_names"] + + def path( + self, + stream_state: Mapping[str, Any] = None, + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> str: + return f"apps/{stream_slice['app_id']}/outcomes" + + def request_params( + self, + stream_state: Mapping[str, Any], + stream_slice: Mapping[str, Any] = None, + next_page_token: Mapping[str, Any] = None, + ) -> MutableMapping[str, Any]: + params = super().request_params(stream_state, stream_slice, next_page_token) + params["outcome_names"] = self.outcome_names + return params + diff --git a/airbyte-integrations/connectors/source-onesignal/unit_tests/__init__.py b/airbyte-integrations/connectors/source-onesignal/unit_tests/__init__.py new file mode 100644 index 0000000000000..46b7376756ec6 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/unit_tests/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# diff --git a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py new file mode 100644 index 0000000000000..71cd08c7daa32 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py @@ -0,0 +1,150 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + + +import requests +from airbyte_cdk.models import SyncMode +from pytest import fixture +from source_onesignal.streams import IncrementalOnesignalStream, Apps, Devices, Notifications + +@fixture +def patch_incremental_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(IncrementalOnesignalStream, "path", "v0/example_endpoint") + mocker.patch.object(IncrementalOnesignalStream, "primary_key", "test_primary_key") + mocker.patch.object(IncrementalOnesignalStream, "__abstractmethods__", set()) + + +@fixture +def args(): + return { + "authenticator": None, + "config": { + "user_auth_key": "", + "start_date": "2021-01-01T00:00:00Z", + "outcome_names": "" + } + } + + +@fixture +def parent(args): + return Apps(**args) + + +@fixture +def stream(patch_incremental_base_class, parent, args): + return IncrementalOnesignalStream(parent=parent, **args) + + +def test_cursor_field(stream): + expected_cursor_field = "updated_at" + assert stream.cursor_field == expected_cursor_field + + +def test_get_updated_state(stream): + inputs = { + "current_stream_state": { "updated_at": 1580510247 }, + "latest_record": { "updated_at": 1580510248 } + } + expected_state = { "updated_at": 1580510248 } + assert stream.get_updated_state(**inputs) == expected_state + + inputs = { + "current_stream_state": { "updated_at": 1580510248 }, + "latest_record": { "updated_at": 1580510247 } + } + assert stream.get_updated_state(**inputs) == expected_state + + +def test_stream_slices(stream, requests_mock): + requests_mock.get("https://onesignal.com/api/v1/apps", json=[ + { "id": "abc", "basic_auth_key": "key 1" }, + { "id": "def", "basic_auth_key": "key 2" } + ]) + + inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} + expected_stream_slice = [ + { "app_id": "abc", "rest_api_key": "key 1" }, + { "app_id": "def", "rest_api_key": "key 2" } + ] + assert list(stream.stream_slices(**inputs)) == expected_stream_slice + + +def test_supports_incremental(patch_incremental_base_class, mocker, parent, args): + mocker.patch.object(IncrementalOnesignalStream, "cursor_field", "dummy_field") + stream = IncrementalOnesignalStream(parent=parent, **args) + assert stream.supports_incremental + + +def test_source_defined_cursor(stream): + assert stream.source_defined_cursor + + +def test_stream_checkpoint_interval(stream): + expected_checkpoint_interval = None + assert stream.state_checkpoint_interval == expected_checkpoint_interval + + +def test_next_page_token(stream, requests_mock): + requests_mock.get("https://dummy", json={ + "total_count": 123, + "offset": 22, + "limit": 33 + }) + resp = requests.get("https://dummy") + expected_next_page_token = { "offset": 55 } + assert stream.next_page_token(resp) == expected_next_page_token + + requests_mock.get("https://dummy", json={ + "total_count": 123, + "offset": 100, + "limit": 33 + }) + resp = requests.get("https://dummy") + expected_next_page_token = None + assert stream.next_page_token(resp) == expected_next_page_token + + +def test_request_params(stream, parent, args): + inputs = { "stream_state": {}, "stream_slice": { "app_id": "abc" } } + inputs2 = { + "stream_state": {}, + "stream_slice": { "app_id": "abc" }, + "next_page_token": { "offset": 42 } + } + expected_request_params = { "app_id": "abc", "limit": None } + expected_request_params2 = { "app_id": "abc", "limit": None, "offset": 42 } + + assert stream.request_params(**inputs) == expected_request_params + assert stream.request_params(**inputs2) == expected_request_params2 + + stream2 = Devices(parent=parent, **args) + expected_request_params["limit"] = 300 + expected_request_params2["limit"] = 300 + assert stream2.request_params(**inputs) == expected_request_params + assert stream2.request_params(**inputs2) == expected_request_params2 + + stream3 = Notifications(parent=parent, **args) + expected_request_params["limit"] = 50 + expected_request_params2["limit"] = 50 + assert stream3.request_params(**inputs) == expected_request_params + assert stream3.request_params(**inputs2) == expected_request_params2 + + +def test_filter_by_state(stream): + inputs = { + "stream_state": { "updated_at": 1580510247 }, + "record": { "updated_at": 1580510248 } + } + expected_filter_by_state = [{ "updated_at": 1580510248 }] + assert list(stream.filter_by_state(**inputs)) == expected_filter_by_state + + inputs = { + "stream_state": { "updated_at": 1580510248 }, + "record": { "updated_at": 1580510247 } + } + expected_filter_by_state = [] + assert list(stream.filter_by_state(**inputs)) == expected_filter_by_state + diff --git a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_source.py b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_source.py new file mode 100644 index 0000000000000..29f291f32fae3 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_source.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +from unittest.mock import MagicMock + +from source_onesignal.source import SourceOnesignal + +def test_check_connection(mocker, requests_mock): + source = SourceOnesignal() + logger_mock, config_mock = MagicMock(), MagicMock() + requests_mock.get("https://onesignal.com/api/v1/apps", json=[{ + "id": "92911750-242d-4260-9e00-9d9034f139ce", + "name": "Your app 1", + }]) + assert source.check_connection(logger_mock, config_mock) == (True, None) + + +def test_streams(mocker): + source = SourceOnesignal() + config_mock = MagicMock() + streams = source.streams(config_mock) + expected_streams_number = 4 + assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_streams.py new file mode 100644 index 0000000000000..8a12f2092b180 --- /dev/null +++ b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_streams.py @@ -0,0 +1,71 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import requests +from http import HTTPStatus +from unittest.mock import MagicMock + +import pytest +from source_onesignal.streams import OnesignalStream + + +@pytest.fixture +def patch_base_class(mocker): + # Mock abstract methods to enable instantiating abstract class + mocker.patch.object(OnesignalStream, "path", "v0/example_endpoint") + mocker.patch.object(OnesignalStream, "primary_key", "test_primary_key") + mocker.patch.object(OnesignalStream, "__abstractmethods__", set()) + + +def test_next_page_token(patch_base_class): + stream = OnesignalStream(MagicMock()) + inputs = {"response": MagicMock()} + expected_token = None + assert stream.next_page_token(**inputs) == expected_token + + +def test_parse_response(patch_base_class, requests_mock): + requests_mock.get("https://dummy", json=[{ "a": 123, "b": "xx" }]) + resp = requests.get("https://dummy") + + stream = OnesignalStream(MagicMock()) + inputs = { "response": resp, "stream_state": MagicMock() } + expected_parsed_object = { "a": 123, "b": "xx" } + assert next(stream.parse_response(**inputs)) == expected_parsed_object + + +def test_request_headers(patch_base_class): + stream = OnesignalStream(MagicMock()) + inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} + expected_headers = {} + assert stream.request_headers(**inputs) == expected_headers + + +def test_http_method(patch_base_class): + stream = OnesignalStream(MagicMock()) + expected_method = "GET" + assert stream.http_method == expected_method + + +@pytest.mark.parametrize( + ("http_status", "should_retry"), + [ + (HTTPStatus.OK, False), + (HTTPStatus.BAD_REQUEST, False), + (HTTPStatus.TOO_MANY_REQUESTS, True), + (HTTPStatus.INTERNAL_SERVER_ERROR, True), + ], +) +def test_should_retry(patch_base_class, http_status, should_retry): + response_mock = MagicMock() + response_mock.status_code = http_status + stream = OnesignalStream(MagicMock()) + assert stream.should_retry(response_mock) == should_retry + + +def test_backoff_time(patch_base_class): + response_mock = MagicMock() + stream = OnesignalStream(MagicMock()) + expected_backoff_time = None + assert stream.backoff_time(response_mock) == expected_backoff_time diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 9c3a25a8c8fdd..24f8e7e509bf0 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -90,6 +90,7 @@ * [Mongo DB](integrations/sources/mongodb-v2.md) * [MySQL](integrations/sources/mysql.md) * [Okta](integrations/sources/okta.md) + * [OneSignal](integrations/sources/onesignal.md) * [Oracle DB](integrations/sources/oracle.md) * [Oracle Peoplesoft](integrations/sources/oracle-peoplesoft.md) * [Oracle Siebel CRM](integrations/sources/oracle-siebel-crm.md) diff --git a/docs/integrations/README.md b/docs/integrations/README.md index 0ce0f7212f37e..f2ccd36a5fcf4 100644 --- a/docs/integrations/README.md +++ b/docs/integrations/README.md @@ -73,6 +73,7 @@ Airbyte uses a grading system for connectors to help users understand what to ex | [Mongo DB](sources/mongodb-v2.md) | Beta | | [MySQL](sources/mysql.md) | Certified | | [Okta](sources/okta.md) | Beta | +| [OneSignal](sources/onesignal.md) | Alpha | | [Oracle DB](sources/oracle.md) | Certified | | [Oracle PeopleSoft](sources/oracle-peoplesoft.md) | Beta | | [Oracle Siebel CRM](sources/oracle-siebel-crm.md) | Beta | diff --git a/docs/integrations/sources/onesignal.md b/docs/integrations/sources/onesignal.md new file mode 100644 index 0000000000000..392af1fa8a11f --- /dev/null +++ b/docs/integrations/sources/onesignal.md @@ -0,0 +1,58 @@ +# OneSignal + +## Sync overview + +This source can sync data for the [OneSignal API](https://documentation.onesignal.com/reference). It supports both Full Refresh and Incremental syncs. You can choose if this connector will copy only the new or updated data, or all rows in the tables and columns you set up for replication, every time a sync is run. + +### Output schema + +This Source is capable of syncing the following core Streams: + +* [Apps](https://documentation.onesignal.com/reference/view-apps-apps) +* [Devices](https://documentation.onesignal.com/reference/view-devices) \(Incremental\) +* [Notifications](https://documentation.onesignal.com/reference/view-notification) \(Incremental\) +* [Outcomes](https://documentation.onesignal.com/reference/view-outcomes) + +The `Outcomes` stream requires `outcome_names` parameter to filter out outcomes, see the [API docs](https://documentation.onesignal.com/reference/view-outcomes) for more details. + +### Data type mapping + +| Integration Type | Airbyte Type | Notes | +| :--- | :--- | :--- | +| `string` | `string` | | +| `integer` | `integer` | | +| `number` | `number` | | +| `array` | `array` | | +| `object` | `object` | | + +### Features + +| Feature | Supported?\(Yes/No\) | Notes | +| :--- | :--- | :--- | +| Full Refresh Sync | Yes | | +| Incremental Sync | Yes | | +| Namespaces | No | | + +### Performance considerations + +The connector is restricted by normal OneSignal [rate limits](https://documentation.onesignal.com/docs/rate-limits). + +The OneSignal connector should not run into OneSignal API limitations under normal usage. Please [create an issue](https://github.com/airbytehq/airbyte/issues) if you see any rate limit issues that are not automatically retried successfully. + +## Getting started + +### Requirements + +* OneSignal account +* OneSignal user auth Key + +### Setup guide + +Please register on OneSignal and follow this [docs](https://documentation.onesignal.com/docs/accounts-and-keys#user-auth-key) to get your user auth key. + +## Changelog + +| Version | Date | Pull Request | Subject | +| :--- | :--- | :--- | :--- | +| 0.1.0 | 2021-10-13 | [9999](https://github.com/airbytehq/airbyte/pull/9999) | Initial Release | + From 12a22f226f0eae7d27f8aaa2225f0a8010ef786b Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Wed, 13 Oct 2021 23:44:17 +1100 Subject: [PATCH 2/8] update PR number in change log --- docs/integrations/sources/onesignal.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/integrations/sources/onesignal.md b/docs/integrations/sources/onesignal.md index 392af1fa8a11f..4df4ee0c53a00 100644 --- a/docs/integrations/sources/onesignal.md +++ b/docs/integrations/sources/onesignal.md @@ -54,5 +54,5 @@ Please register on OneSignal and follow this [docs](https://documentation.onesig | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | -| 0.1.0 | 2021-10-13 | [9999](https://github.com/airbytehq/airbyte/pull/9999) | Initial Release | +| 0.1.0 | 2021-10-13 | [6998](https://github.com/airbytehq/airbyte/pull/6998) | Initial Release | From 9c4708757dcf1412a7602550bed2b7a1179b3581 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Sun, 17 Oct 2021 01:15:00 +1100 Subject: [PATCH 3/8] change source define cursor and sync mode --- .../source-onesignal/integration_tests/catalog.json | 12 ++++++------ .../integration_tests/configured_catalog.json | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/catalog.json b/airbyte-integrations/connectors/source-onesignal/integration_tests/catalog.json index 101a261a29650..4fdb1714c296f 100644 --- a/airbyte-integrations/connectors/source-onesignal/integration_tests/catalog.json +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/catalog.json @@ -3,27 +3,27 @@ { "name": "apps", "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false, + "source_defined_cursor": true, "json_schema": {} }, { "name": "devices", - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": false, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, "default_cursor_field": "created_at", "json_schema": {} }, { "name": "notifications", - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": false, + "supported_sync_modes": ["incremental"], + "source_defined_cursor": true, "default_cursor_field": "queued_at", "json_schema": {} }, { "name": "outcomes", "supported_sync_modes": ["full_refresh"], - "source_defined_cursor": false, + "source_defined_cursor": true, "json_schema": {} } ] diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-onesignal/integration_tests/configured_catalog.json index 368d6f6bfbcf2..c2e96e8878484 100644 --- a/airbyte-integrations/connectors/source-onesignal/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/configured_catalog.json @@ -13,7 +13,7 @@ "stream": { "name": "devices", "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"] + "supported_sync_modes": ["incremental"] }, "sync_mode": "incremental", "destination_sync_mode": "append" @@ -22,7 +22,7 @@ "stream": { "name": "notifications", "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"] + "supported_sync_modes": ["incremental"] }, "sync_mode": "incremental", "destination_sync_mode": "append" From 3bda9ecc995ffbc4fd53019386ba751e28e0561d Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Sun, 17 Oct 2021 11:53:12 +1100 Subject: [PATCH 4/8] get correct max cursor time across stream slices --- .../source_onesignal/source.py | 30 +++++++++++++++++-- .../source_onesignal/streams.py | 12 ++++++-- .../unit_tests/test_incremental_streams.py | 3 +- 3 files changed, 40 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py b/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py index abf75a0ee372f..3da9a634fd799 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py @@ -4,15 +4,16 @@ from abc import ABC -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple +from typing import Any, Iterator, Iterable, List, Mapping, MutableMapping, Optional, Tuple import requests from airbyte_cdk.logger import AirbyteLogger -from airbyte_cdk.models import SyncMode +from airbyte_cdk.models import SyncMode, ConfiguredAirbyteStream, AirbyteMessage from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator +from airbyte_cdk.sources.utils.schema_helpers import InternalConfig from .streams import Apps, Devices, Notifications, Outcomes @@ -37,3 +38,28 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: args = { "parent": apps, **args } return [apps, Devices(**args), Notifications(**args), Outcomes(**args)] + + # Override AbstractSource._read_incremental() function to emit an extra state + # message after finishing syncing the whole stream. + # It is for updating the maximum cursor field date state, because the cursor + # time isn't sorted across stream slices. There is no way to get the + # maximum cursor field time unless reading until the end of the stream. + # This is a dirty hack and might be removed if HttpStream provided an + # end-of-synching-stream event hook in the future. + # This hack only applies to incremental streams, that is, Devices and Notifications. + def _read_incremental( + self, + logger: AirbyteLogger, + stream: Stream, + configured_stream: ConfiguredAirbyteStream, + connector_state: MutableMapping[str, Any], + internal_config: InternalConfig, + ) -> Iterator[AirbyteMessage]: + yield from super()._read_incremental(logger, stream, configured_stream, connector_state, internal_config) + + stream_name = configured_stream.stream.name + stream_state = connector_state.get(stream_name, {}) + state_date = stream_state.get(stream.cursor_field, stream.start_date) + stream_state = { stream.cursor_field: max(state_date, stream.max_cursor_time) } + + yield self._checkpoint_state(stream_name, stream_state, connector_state, logger) diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py index 46cb01dfb364c..c0337584bf1c8 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py @@ -106,6 +106,12 @@ class IncrementalOnesignalStream(ChildStreamMixin, OnesignalStream, ABC): cursor_field = "updated_at" + def __init__(self, **kwargs): + super().__init__(**kwargs) + + # largest time in cursor field across all stream slices + self.max_cursor_time = self.start_date + def request_params( self, stream_state: Mapping[str, Any], @@ -145,9 +151,11 @@ def get_updated_state( current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any], ) -> Mapping[str, Any]: - state_date = current_stream_state.get(self.cursor_field, self.start_date) + # we don't update state here, just keep a record of maximum cursor field + # time, state will be updated after syncing the whole stream record_date = latest_record.get(self.cursor_field, self.start_date) - return { self.cursor_field: max(state_date, record_date) } + self.max_cursor_time = max(self.max_cursor_time, record_date) + return current_stream_state class Apps(OnesignalStream): diff --git a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py index 71cd08c7daa32..20c2c06243099 100644 --- a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py +++ b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py @@ -48,13 +48,14 @@ def test_get_updated_state(stream): "current_stream_state": { "updated_at": 1580510247 }, "latest_record": { "updated_at": 1580510248 } } - expected_state = { "updated_at": 1580510248 } + expected_state = { "updated_at": 1580510247 } # note state shouldn't change assert stream.get_updated_state(**inputs) == expected_state inputs = { "current_stream_state": { "updated_at": 1580510248 }, "latest_record": { "updated_at": 1580510247 } } + expected_state = { "updated_at": 1580510248 } # note state shouldn't change assert stream.get_updated_state(**inputs) == expected_state From fcb0e2324646aae8a81d94d35c2cd08a103c2fe0 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Wed, 20 Oct 2021 15:03:25 +1100 Subject: [PATCH 5/8] code improvement as code review advices --- .../connectors/source-onesignal/bootstrap.md | 2 +- .../source_onesignal/schemas/apps.json | 1 - .../source_onesignal/schemas/devices.json | 2 - .../schemas/notifications.json | 1 - .../source_onesignal/source.py | 36 +-- .../source_onesignal/spec.json | 4 +- .../source_onesignal/streams.py | 206 +++++++++++------- .../unit_tests/test_incremental_streams.py | 129 +++++------ .../unit_tests/test_source.py | 31 ++- .../unit_tests/test_streams.py | 34 +-- 10 files changed, 237 insertions(+), 209 deletions(-) diff --git a/airbyte-integrations/connectors/source-onesignal/bootstrap.md b/airbyte-integrations/connectors/source-onesignal/bootstrap.md index 25a9a5fac2cfa..64b2f230b6515 100644 --- a/airbyte-integrations/connectors/source-onesignal/bootstrap.md +++ b/airbyte-integrations/connectors/source-onesignal/bootstrap.md @@ -19,7 +19,7 @@ OneSignal API consists of four endpoints which can be extracted data from: - Device and notification endpoint has 300 and 50 records limit per request respectively, so the cursor pagination strategy is used for them. -- Rate limiting is a standard exponential backoff when a 429 HTTP status code returned. +- Rate limiting follows [https://documentation.onesignal.com/docs/rate-limits](https://documentation.onesignal.com/docs/rate-limits), when a 429 HTTP status code returned. - For the outcome endpoint, it needs to specify a comma-separated list of names and the value (sum/count) for the returned outcome data. So this requirement is added to the source spec. diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/apps.json b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/apps.json index 8e59974b6c80d..0dc6b50f8d0a6 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/apps.json +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/apps.json @@ -83,4 +83,3 @@ } } } - diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/devices.json b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/devices.json index 283b17ecba580..950712ff160d2 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/devices.json +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/devices.json @@ -70,5 +70,3 @@ } } } - - diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/notifications.json b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/notifications.json index a19d5b63df2bd..86446f87c3643 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/notifications.json +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/schemas/notifications.json @@ -407,4 +407,3 @@ } } } - diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py b/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py index 3da9a634fd799..b3665a39161f5 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/source.py @@ -3,17 +3,14 @@ # -from abc import ABC -from typing import Any, Iterator, Iterable, List, Mapping, MutableMapping, Optional, Tuple +from typing import Any, List, Mapping, Tuple import requests from airbyte_cdk.logger import AirbyteLogger -from airbyte_cdk.models import SyncMode, ConfiguredAirbyteStream, AirbyteMessage +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator -from airbyte_cdk.sources.utils.schema_helpers import InternalConfig from .streams import Apps, Devices, Notifications, Outcomes @@ -33,33 +30,8 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: AirbyteLogger().log("INFO", f"Using start_date: {config['start_date']}") authenticator = TokenAuthenticator(config["user_auth_key"], "Basic") - args = { "authenticator": authenticator, "config": config } + args = {"authenticator": authenticator, "config": config} apps = Apps(**args) - args = { "parent": apps, **args } + args = {"parent": apps, **args} return [apps, Devices(**args), Notifications(**args), Outcomes(**args)] - - # Override AbstractSource._read_incremental() function to emit an extra state - # message after finishing syncing the whole stream. - # It is for updating the maximum cursor field date state, because the cursor - # time isn't sorted across stream slices. There is no way to get the - # maximum cursor field time unless reading until the end of the stream. - # This is a dirty hack and might be removed if HttpStream provided an - # end-of-synching-stream event hook in the future. - # This hack only applies to incremental streams, that is, Devices and Notifications. - def _read_incremental( - self, - logger: AirbyteLogger, - stream: Stream, - configured_stream: ConfiguredAirbyteStream, - connector_state: MutableMapping[str, Any], - internal_config: InternalConfig, - ) -> Iterator[AirbyteMessage]: - yield from super()._read_incremental(logger, stream, configured_stream, connector_state, internal_config) - - stream_name = configured_stream.stream.name - stream_state = connector_state.get(stream_name, {}) - state_date = stream_state.get(stream.cursor_field, stream.start_date) - stream_state = { stream.cursor_field: max(state_date, stream.max_cursor_time) } - - yield self._checkpoint_state(stream_name, stream_state, connector_state, logger) diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/spec.json b/airbyte-integrations/connectors/source-onesignal/source_onesignal/spec.json index 3fa62e1f8a110..b2b4dd5ddfa59 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/spec.json +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/spec.json @@ -21,7 +21,9 @@ "outcome_names": { "type": "string", "description": "Comma-separated list of names and the value (sum/count) for the returned outcome data. See the docs for more details", - "examples": ["os__session_duration.count,os__click.count,CustomOutcomeName.sum"] + "examples": [ + "os__session_duration.count,os__click.count,CustomOutcomeName.sum" + ] } } } diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py index c0337584bf1c8..16b34f7f8c407 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py @@ -1,11 +1,15 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + import time from abc import ABC -from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Tuple -from datetime import datetime +from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, TypeVar +import pendulum +import pydantic import requests from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator @@ -19,14 +23,12 @@ class OnesignalStream(HttpStream, ABC): def __init__(self, config: Mapping[str, Any], **kwargs): super().__init__(**kwargs) self._auth_token = config["user_auth_key"] - self.start_date = 0 - if isinstance(config.get("start_date"), str): - # OneSignal uses epoch timestamp, so we need to convert the start_date - # config to epoch timestamp too - # start_date example: 2021-01-01T00:00:00Z - self.start_date = int(datetime.fromisoformat( - config["start_date"].replace('Z','+00:00') - ).timestamp()) + self._app_slices = [] + + # OneSignal uses epoch timestamp, so we need to convert the start_date + # config to epoch timestamp too. + # start_date example: 2021-01-01T00:00:00Z + self.start_date = pendulum.parse(config["start_date"]).int_timestamp def next_page_token( self, @@ -34,6 +36,24 @@ def next_page_token( ) -> Optional[Mapping[str, Any]]: return None + def backoff_time(self, response: requests.Response) -> Optional[float]: + """ + OneSignal's API rates limit is 1 request per second with a 10/second burst. + RateLimit* headers will indicate how much quota is left at the time of + the request. For example: + + ratelimit-limit: 10 + ratelimit-remaining: 9 + ratelimit-reset: 1633654403 + + Docs: https://documentation.onesignal.com/docs/rate-limits + """ + reset_time = response.headers.get("ratelimit-reset") + backoff_time = float(reset_time) - time.time() if reset_time else 60 + if backoff_time < 0: + backoff_time = 60 + return backoff_time + def read_records( self, sync_mode: SyncMode, @@ -52,66 +72,73 @@ def read_records( return super().read_records(sync_mode, cursor_field, stream_slice, stream_state) - def parse_response( - self, - response: requests.Response, - stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> Iterable[Mapping]: + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: data = response.json() yield from data - # wait a while to avoid rate limit - time.sleep(1) + +T = TypeVar("T") + + +class StateValueWrapper(pydantic.BaseModel): + stream: T + state_value: int + max_cursor_time = 0 + + def __repr__(self): + """Overrides print view""" + return str(self.value) + + @property + def value(self): + """Return max cursor time after stream sync is finished.""" + return self.max_cursor_time if self.stream.is_finished else self.state_value + + def dict(self, **kwargs): + """Overrides default logic to return current value only.""" + return {pydantic.utils.ROOT_KEY: self.value} class ChildStreamMixin(HttpSubStream): + + is_finished = False + def stream_slices( self, sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_state: Mapping[str, Any] = None, + **kwargs, ) -> Iterable[Optional[Mapping[str, Any]]]: - parents = super().stream_slices(sync_mode, cursor_field, stream_state) - for item in parents: - parent = item["parent"] - yield { - "app_id": parent["id"], - "rest_api_key": parent["basic_auth_key"], - } - yield from [] + # get stream slices from parent app's stream slice cache, if it is not + # set yet, start a full refresh request to get it in full + app_slices = self.parent._app_slices + if not app_slices: + all(super().stream_slices(SyncMode.full_refresh, **kwargs)) + app_slices = self.parent._app_slices + for app in app_slices: + # stream sync is finished when it is on the last slice + self.is_finished = app["app_id"] == app_slices[-1]["app_id"] + yield app # default record filter, do nothing - def filter_by_state( - self, - stream_state: Mapping[str, Any] = None, - record: Mapping[str, Any] = None - ) -> Iterable: - yield record + def filter_by_state(self, **kwargs) -> bool: + return True def parse_response( self, response: requests.Response, stream_state: Mapping[str, Any], - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, + **kwargs ) -> Iterable[Mapping]: data = response.json().get(self.data_field) for record in data: - yield from self.filter_by_state(stream_state=stream_state, record=record) + if self.filter_by_state(stream_state=stream_state, record=record): + yield record class IncrementalOnesignalStream(ChildStreamMixin, OnesignalStream, ABC): cursor_field = "updated_at" - def __init__(self, **kwargs): - super().__init__(**kwargs) - - # largest time in cursor field across all stream slices - self.max_cursor_time = self.start_date - def request_params( self, stream_state: Mapping[str, Any], @@ -129,77 +156,96 @@ def next_page_token( self, response: requests.Response, ) -> Optional[Mapping[str, Any]]: + """ + An example of response is: + { + "total_count": 553, + "offset": 0, + "limit": 1, + "notifications": [ ... ] + } + """ resp = response.json() total = resp["total_count"] next_offset = resp["offset"] + resp["limit"] if next_offset < total: - return { "offset": next_offset } + return {"offset": next_offset} def filter_by_state( self, stream_state: Mapping[str, Any] = None, record: Mapping[str, Any] = None - ) -> Iterable: + ) -> bool: value = 0 if record: value = record.get(self.cursor_field, value) - if not stream_state or value >= stream_state.get(self.cursor_field, 0): - yield record + return not stream_state or value >= int(stream_state.get(self.cursor_field, "0")) def get_updated_state( self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any], ) -> Mapping[str, Any]: - # we don't update state here, just keep a record of maximum cursor field - # time, state will be updated after syncing the whole stream - record_date = latest_record.get(self.cursor_field, self.start_date) - self.max_cursor_time = max(self.max_cursor_time, record_date) - return current_stream_state + state_value = (current_stream_state or {}).get(self.cursor_field, 0) + if not isinstance(state_value, StateValueWrapper): + state_value = StateValueWrapper(stream=self, state_value=state_value) + + record_time = latest_record.get(self.cursor_field, self.start_date) + state_value.max_cursor_time = max(state_value.max_cursor_time, record_time) + + return {self.cursor_field: state_value} class Apps(OnesignalStream): + """ + Docs: https://documentation.onesignal.com/reference/view-apps-apps + """ - def path( - self, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> str: - return "apps" + # stream slices cache for child streams + _app_slices = [] + + def path(self, **kwargs) -> str: + return self.name + + def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapping]: + # parse response and save to stream slice cache + self._app_slices = [] + for app in super().parse_response(response, **kwargs): + slice = {"app_id": app["id"], "rest_api_key": app["basic_auth_key"]} + self._app_slices.append(slice) + yield app class Devices(IncrementalOnesignalStream): + """ + Docs: https://documentation.onesignal.com/reference/view-devices + """ cursor_field = "created_at" data_field = "players" - page_size = 300 # page size limit set by OneSignal + page_size = 300 # page size limit set by OneSignal - def path( - self, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> str: + def path(self, **kwargs) -> str: return "players" class Notifications(IncrementalOnesignalStream): + """ + Docs: https://documentation.onesignal.com/reference/view-notifications + """ cursor_field = "queued_at" data_field = "notifications" - page_size = 50 # page size limit set by OneSignal + page_size = 50 # page size limit set by OneSignal - def path( - self, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> str: - return "notifications" + def path(self, **kwargs) -> str: + return self.name class Outcomes(ChildStreamMixin, OnesignalStream): + """ + Docs: https://documentation.onesignal.com/reference/view-outcomes + """ data_field = "outcomes" @@ -207,12 +253,7 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self.outcome_names = kwargs["config"]["outcome_names"] - def path( - self, - stream_state: Mapping[str, Any] = None, - stream_slice: Mapping[str, Any] = None, - next_page_token: Mapping[str, Any] = None, - ) -> str: + def path(self, stream_slice: Mapping[str, Any] = None, **kwargs) -> str: return f"apps/{stream_slice['app_id']}/outcomes" def request_params( @@ -224,4 +265,3 @@ def request_params( params = super().request_params(stream_state, stream_slice, next_page_token) params["outcome_names"] = self.outcome_names return params - diff --git a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py index 20c2c06243099..cb4a06a23f9a4 100644 --- a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py +++ b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_incremental_streams.py @@ -3,10 +3,13 @@ # +from unittest.mock import MagicMock + import requests from airbyte_cdk.models import SyncMode from pytest import fixture -from source_onesignal.streams import IncrementalOnesignalStream, Apps, Devices, Notifications +from source_onesignal.streams import Apps, Devices, IncrementalOnesignalStream, Notifications + @fixture def patch_incremental_base_class(mocker): @@ -18,14 +21,7 @@ def patch_incremental_base_class(mocker): @fixture def args(): - return { - "authenticator": None, - "config": { - "user_auth_key": "", - "start_date": "2021-01-01T00:00:00Z", - "outcome_names": "" - } - } + return {"authenticator": None, "config": {"user_auth_key": "", "start_date": "2021-01-01T00:00:00Z", "outcome_names": ""}} @fixture @@ -44,35 +40,65 @@ def test_cursor_field(stream): def test_get_updated_state(stream): - inputs = { - "current_stream_state": { "updated_at": 1580510247 }, - "latest_record": { "updated_at": 1580510248 } - } - expected_state = { "updated_at": 1580510247 } # note state shouldn't change - assert stream.get_updated_state(**inputs) == expected_state + inputs = {"current_stream_state": {"updated_at": 42}, "latest_record": {"updated_at": 90}} + expected_state = 42 + state = stream.get_updated_state(**inputs) + assert state["updated_at"].value == expected_state + + inputs = {"current_stream_state": state, "latest_record": {"updated_at": 100}} + expected_state = 42 + state = stream.get_updated_state(**inputs) + assert state["updated_at"].value == expected_state - inputs = { - "current_stream_state": { "updated_at": 1580510248 }, - "latest_record": { "updated_at": 1580510247 } - } - expected_state = { "updated_at": 1580510248 } # note state shouldn't change - assert stream.get_updated_state(**inputs) == expected_state + # after stream sync is finished, state should output the max cursor time + stream.is_finished = True + inputs = {"current_stream_state": state, "latest_record": {"updated_at": 80}} + expected_state = 100 + state = stream.get_updated_state(**inputs) + assert state["updated_at"].value == expected_state def test_stream_slices(stream, requests_mock): - requests_mock.get("https://onesignal.com/api/v1/apps", json=[ - { "id": "abc", "basic_auth_key": "key 1" }, - { "id": "def", "basic_auth_key": "key 2" } - ]) + requests_mock.get( + "https://onesignal.com/api/v1/apps", json=[{"id": "abc", "basic_auth_key": "key 1"}, {"id": "def", "basic_auth_key": "key 2"}] + ) inputs = {"sync_mode": SyncMode.incremental, "cursor_field": [], "stream_state": {}} - expected_stream_slice = [ - { "app_id": "abc", "rest_api_key": "key 1" }, - { "app_id": "def", "rest_api_key": "key 2" } - ] + expected_stream_slice = [{"app_id": "abc", "rest_api_key": "key 1"}, {"app_id": "def", "rest_api_key": "key 2"}] assert list(stream.stream_slices(**inputs)) == expected_stream_slice +def test_end_of_stream_state(parent, args, requests_mock): + requests_mock.get( + "https://onesignal.com/api/v1/apps", + json=[{"id": "abc", "basic_auth_key": "key 1"}, {"id": "def", "basic_auth_key": "key 2"}, {"id": "ghi", "basic_auth_key": "key 3"}], + ) + requests_mock.get( + "https://onesignal.com/api/v1/notifications?app_id=abc", + json={"total_count": 1, "offset": 0, "limit": 1, "notifications": [{"id": "notification 1", "queued_at": 90}]}, + ) + requests_mock.get( + "https://onesignal.com/api/v1/notifications?app_id=def", + json={"total_count": 1, "offset": 0, "limit": 1, "notifications": [{"id": "notification 2", "queued_at": 80}]}, + ) + requests_mock.get( + "https://onesignal.com/api/v1/notifications?app_id=ghi", + json={"total_count": 1, "offset": 0, "limit": 1, "notifications": [{"id": "notification 3", "queued_at": 70}]}, + ) + + stream = Notifications(parent=parent, **args) + state = {"queued_at": 50} + sync_mode = SyncMode.incremental + + for idx, app_slice in enumerate(stream.stream_slices(sync_mode, **MagicMock())): + for record in stream.read_records(sync_mode, stream_slice=app_slice): + state = stream.get_updated_state(state, record) + if idx == 2: # the last slice + assert state["queued_at"].value == 90 + else: + assert state["queued_at"].value == 50 + + def test_supports_incremental(patch_incremental_base_class, mocker, parent, args): mocker.patch.object(IncrementalOnesignalStream, "cursor_field", "dummy_field") stream = IncrementalOnesignalStream(parent=parent, **args) @@ -89,34 +115,22 @@ def test_stream_checkpoint_interval(stream): def test_next_page_token(stream, requests_mock): - requests_mock.get("https://dummy", json={ - "total_count": 123, - "offset": 22, - "limit": 33 - }) + requests_mock.get("https://dummy", json={"total_count": 123, "offset": 22, "limit": 33}) resp = requests.get("https://dummy") - expected_next_page_token = { "offset": 55 } + expected_next_page_token = {"offset": 55} assert stream.next_page_token(resp) == expected_next_page_token - requests_mock.get("https://dummy", json={ - "total_count": 123, - "offset": 100, - "limit": 33 - }) + requests_mock.get("https://dummy", json={"total_count": 123, "offset": 100, "limit": 33}) resp = requests.get("https://dummy") expected_next_page_token = None assert stream.next_page_token(resp) == expected_next_page_token def test_request_params(stream, parent, args): - inputs = { "stream_state": {}, "stream_slice": { "app_id": "abc" } } - inputs2 = { - "stream_state": {}, - "stream_slice": { "app_id": "abc" }, - "next_page_token": { "offset": 42 } - } - expected_request_params = { "app_id": "abc", "limit": None } - expected_request_params2 = { "app_id": "abc", "limit": None, "offset": 42 } + inputs = {"stream_state": {}, "stream_slice": {"app_id": "abc"}} + inputs2 = {"stream_state": {}, "stream_slice": {"app_id": "abc"}, "next_page_token": {"offset": 42}} + expected_request_params = {"app_id": "abc", "limit": None} + expected_request_params2 = {"app_id": "abc", "limit": None, "offset": 42} assert stream.request_params(**inputs) == expected_request_params assert stream.request_params(**inputs2) == expected_request_params2 @@ -135,17 +149,10 @@ def test_request_params(stream, parent, args): def test_filter_by_state(stream): - inputs = { - "stream_state": { "updated_at": 1580510247 }, - "record": { "updated_at": 1580510248 } - } - expected_filter_by_state = [{ "updated_at": 1580510248 }] - assert list(stream.filter_by_state(**inputs)) == expected_filter_by_state - - inputs = { - "stream_state": { "updated_at": 1580510248 }, - "record": { "updated_at": 1580510247 } - } - expected_filter_by_state = [] - assert list(stream.filter_by_state(**inputs)) == expected_filter_by_state + inputs = {"stream_state": {"updated_at": 1580510247}, "record": {"updated_at": 1580510248}} + expected_filter_by_state = True + assert stream.filter_by_state(**inputs) == expected_filter_by_state + inputs = {"stream_state": {"updated_at": 1580510248}, "record": {"updated_at": 1580510247}} + expected_filter_by_state = False + assert stream.filter_by_state(**inputs) == expected_filter_by_state diff --git a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_source.py b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_source.py index 29f291f32fae3..1b5bb62326ff4 100644 --- a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_source.py +++ b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_source.py @@ -4,21 +4,32 @@ from unittest.mock import MagicMock +from pytest import fixture from source_onesignal.source import SourceOnesignal -def test_check_connection(mocker, requests_mock): + +@fixture +def config(): + return {"config": {"user_auth_key": "", "start_date": "2021-01-01T00:00:00Z", "outcome_names": ""}} + + +def test_check_connection(mocker, requests_mock, config): source = SourceOnesignal() - logger_mock, config_mock = MagicMock(), MagicMock() - requests_mock.get("https://onesignal.com/api/v1/apps", json=[{ - "id": "92911750-242d-4260-9e00-9d9034f139ce", - "name": "Your app 1", - }]) - assert source.check_connection(logger_mock, config_mock) == (True, None) + logger_mock = MagicMock() + requests_mock.get( + "https://onesignal.com/api/v1/apps", + json=[ + { + "id": "92911750-242d-4260-9e00-9d9034f139ce", + "basic_auth_key": "your key", + } + ], + ) + assert source.check_connection(logger_mock, **config) == (True, None) -def test_streams(mocker): +def test_streams(mocker, config): source = SourceOnesignal() - config_mock = MagicMock() - streams = source.streams(config_mock) + streams = source.streams(**config) expected_streams_number = 4 assert len(streams) == expected_streams_number diff --git a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_streams.py index 8a12f2092b180..36da03e821dbb 100644 --- a/airbyte-integrations/connectors/source-onesignal/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-onesignal/unit_tests/test_streams.py @@ -2,11 +2,11 @@ # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # -import requests from http import HTTPStatus from unittest.mock import MagicMock import pytest +import requests from source_onesignal.streams import OnesignalStream @@ -18,32 +18,34 @@ def patch_base_class(mocker): mocker.patch.object(OnesignalStream, "__abstractmethods__", set()) -def test_next_page_token(patch_base_class): - stream = OnesignalStream(MagicMock()) +@pytest.fixture +def stream(patch_base_class): + args = {"authenticator": None, "config": {"user_auth_key": "", "start_date": "2021-01-01T00:00:00Z", "outcome_names": ""}} + return OnesignalStream(**args) + + +def test_next_page_token(stream): inputs = {"response": MagicMock()} expected_token = None assert stream.next_page_token(**inputs) == expected_token -def test_parse_response(patch_base_class, requests_mock): - requests_mock.get("https://dummy", json=[{ "a": 123, "b": "xx" }]) +def test_parse_response(stream, requests_mock): + requests_mock.get("https://dummy", json=[{"id": 123, "basic_auth_key": "xx"}]) resp = requests.get("https://dummy") - stream = OnesignalStream(MagicMock()) - inputs = { "response": resp, "stream_state": MagicMock() } - expected_parsed_object = { "a": 123, "b": "xx" } + inputs = {"response": resp, "stream_state": MagicMock()} + expected_parsed_object = {"id": 123, "basic_auth_key": "xx"} assert next(stream.parse_response(**inputs)) == expected_parsed_object -def test_request_headers(patch_base_class): - stream = OnesignalStream(MagicMock()) +def test_request_headers(stream): inputs = {"stream_slice": None, "stream_state": None, "next_page_token": None} expected_headers = {} assert stream.request_headers(**inputs) == expected_headers -def test_http_method(patch_base_class): - stream = OnesignalStream(MagicMock()) +def test_http_method(stream): expected_method = "GET" assert stream.http_method == expected_method @@ -57,15 +59,13 @@ def test_http_method(patch_base_class): (HTTPStatus.INTERNAL_SERVER_ERROR, True), ], ) -def test_should_retry(patch_base_class, http_status, should_retry): +def test_should_retry(stream, http_status, should_retry): response_mock = MagicMock() response_mock.status_code = http_status - stream = OnesignalStream(MagicMock()) assert stream.should_retry(response_mock) == should_retry -def test_backoff_time(patch_base_class): +def test_backoff_time(stream): response_mock = MagicMock() - stream = OnesignalStream(MagicMock()) - expected_backoff_time = None + expected_backoff_time = 60 assert stream.backoff_time(response_mock) == expected_backoff_time From 72cadc26f063cbb87ab1719cbd8b75453e2d406d Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Wed, 20 Oct 2021 15:31:37 +1100 Subject: [PATCH 6/8] format code --- .../source-onesignal/source_onesignal/streams.py | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py index 16b34f7f8c407..0c9dcbb36d0b2 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py @@ -123,12 +123,7 @@ def stream_slices( def filter_by_state(self, **kwargs) -> bool: return True - def parse_response( - self, - response: requests.Response, - stream_state: Mapping[str, Any], - **kwargs - ) -> Iterable[Mapping]: + def parse_response(self, response: requests.Response, stream_state: Mapping[str, Any], **kwargs) -> Iterable[Mapping]: data = response.json().get(self.data_field) for record in data: if self.filter_by_state(stream_state=stream_state, record=record): @@ -171,11 +166,7 @@ def next_page_token( if next_offset < total: return {"offset": next_offset} - def filter_by_state( - self, - stream_state: Mapping[str, Any] = None, - record: Mapping[str, Any] = None - ) -> bool: + def filter_by_state(self, stream_state: Mapping[str, Any] = None, record: Mapping[str, Any] = None) -> bool: value = 0 if record: value = record.get(self.cursor_field, value) From 9dabb622478127b48ad7373d9112cbf74665ffc8 Mon Sep 17 00:00:00 2001 From: Bo Lu Date: Wed, 20 Oct 2021 19:08:00 +1100 Subject: [PATCH 7/8] remove unused code --- .../connectors/source-onesignal/source_onesignal/streams.py | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py index 0c9dcbb36d0b2..1e325fb819e22 100644 --- a/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py +++ b/airbyte-integrations/connectors/source-onesignal/source_onesignal/streams.py @@ -23,7 +23,6 @@ class OnesignalStream(HttpStream, ABC): def __init__(self, config: Mapping[str, Any], **kwargs): super().__init__(**kwargs) self._auth_token = config["user_auth_key"] - self._app_slices = [] # OneSignal uses epoch timestamp, so we need to convert the start_date # config to epoch timestamp too. From 3c0fe9bcad7ab89e47a8b771dc28e04c6758c2cf Mon Sep 17 00:00:00 2001 From: Maksym Pavlenok Date: Thu, 21 Oct 2021 00:49:03 +0300 Subject: [PATCH 8/8] remove TODOs --- .../connectors/source-onesignal/integration_tests/acceptance.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-onesignal/integration_tests/acceptance.py b/airbyte-integrations/connectors/source-onesignal/integration_tests/acceptance.py index 58c194c5d1376..108075487440f 100644 --- a/airbyte-integrations/connectors/source-onesignal/integration_tests/acceptance.py +++ b/airbyte-integrations/connectors/source-onesignal/integration_tests/acceptance.py @@ -11,6 +11,4 @@ @pytest.fixture(scope="session", autouse=True) def connector_setup(): """ This fixture is a placeholder for external resources that acceptance test might require.""" - # TODO: setup test dependencies if needed. otherwise remove the TODO comments yield - # TODO: clean up test dependencies