diff --git a/airbyte-integrations/connectors/source-facebook-marketing/README.md b/airbyte-integrations/connectors/source-facebook-marketing/README.md index 77a2b5aca43fe..947a2cd657531 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/README.md +++ b/airbyte-integrations/connectors/source-facebook-marketing/README.md @@ -39,7 +39,7 @@ From the Airbyte repository root, run: **If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/facebook-marketing) to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_facebook_marketing/spec.json` file. Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information. -See `sample_files/sample_config.json` for a sample config file. +See `integration_tests/sample_config.json` for a sample config file. **If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source facebook-marketing test creds` and place them into `secrets/config.json`. @@ -49,7 +49,7 @@ and place them into `secrets/config.json`. python main.py spec python main.py check --config secrets/config.json python main.py discover --config secrets/config.json -python main.py read --config secrets/config.json --catalog sample_files/configured_catalog.json +python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json ``` ### Locally running the connector docker image @@ -73,7 +73,7 @@ Then run any of the connector commands as follows: docker run --rm airbyte/source-facebook-marketing:dev spec docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-facebook-marketing:dev check --config /secrets/config.json docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-facebook-marketing:dev discover --config /secrets/config.json -docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/sample_files:/sample_files airbyte/source-facebook-marketing:dev read --config /secrets/config.json --catalog /sample_files/configured_catalog.json +docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-facebook-marketing:20211222-rc1 read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json ``` ## Testing Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named. diff --git a/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml b/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml index d426101a5ef5f..7f8f75981b355 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml @@ -13,21 +13,10 @@ tests: - config_path: "secrets/config.json" basic_read: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - timeout_seconds: 600 + empty_streams: ["videos"] incremental: - config_path: "secrets/config.json" configured_catalog_path: "integration_tests/configured_catalog_without_insights.json" future_state_path: "integration_tests/future_state.json" full_refresh: - config_path: "secrets/config.json" - configured_catalog_path: "integration_tests/configured_catalog.json" - # Ad Insights API has estimated metrics in response, which is calculated based on another metrics. - # Sometimes API doesn't return estimated metrics. E.g, cost_per_estimated_ad_recallers is calculated - # as total amount spent divided by estimated ad recall lift rate. When second metric is equal to zero - # API may or may not return value. Such behavior causes sequential reads test failing. - # Because one read response contains this metric, and other doesn't. - # Therefore, it's needed to ignore fields like this in API responses. - ignored_fields: - "ads_insights_age_and_gender": ["cost_per_estimated_ad_recallers"] - "ad_creatives": ["thumbnail_url"] diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json index 67dc36d5d8071..b05433a47d649 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog.json @@ -64,31 +64,29 @@ "stream": { "name": "ads_insights", "json_schema": {}, + "default_cursor_field": ["date_start"], "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_cursor": true, - "default_cursor_field": ["date_start"], - "source_defined_primary_key": null, - "namespace": null + "source_defined_primary_key": [] }, "sync_mode": "incremental", + "primary_key": [], "cursor_field": ["date_start"], - "destination_sync_mode": "append", - "primary_key": null + "destination_sync_mode": "append" }, { "stream": { - "name": "ads_insights_age_and_gender", + "name": "ads_insights_platform_and_device", "json_schema": {}, + "default_cursor_field": ["date_start"], "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_cursor": true, - "default_cursor_field": ["date_start"], - "source_defined_primary_key": null, - "namespace": null + "source_defined_primary_key": [] }, "sync_mode": "incremental", + "primary_key": [], "cursor_field": ["date_start"], - "destination_sync_mode": "append", - "primary_key": null + "destination_sync_mode": "append" } ] } diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog_without_insights.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog_without_insights.json index fee4405518324..31324e26d5bc3 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog_without_insights.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/configured_catalog_without_insights.json @@ -7,8 +7,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_cursor": true, "default_cursor_field": ["updated_time"], - "source_defined_primary_key": [["id"]], - "namespace": null + "source_defined_primary_key": [["id"]] }, "sync_mode": "incremental", "cursor_field": null, @@ -22,13 +21,11 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_cursor": true, "default_cursor_field": ["updated_time"], - "source_defined_primary_key": [["id"]], - "namespace": null + "source_defined_primary_key": [["id"]] }, "sync_mode": "incremental", "cursor_field": null, - "destination_sync_mode": "append", - "primary_key": null + "destination_sync_mode": "append" }, { "stream": { @@ -37,8 +34,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_cursor": true, "default_cursor_field": ["updated_time"], - "source_defined_primary_key": [["id"]], - "namespace": null + "source_defined_primary_key": [["id"]] }, "sync_mode": "incremental", "cursor_field": null, diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py index df9639fab612b..8bcb821e03238 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/conftest.py @@ -26,4 +26,7 @@ def config_with_wrong_account_fixture(config): @pytest.fixture(scope="session", name="config_with_include_deleted") def config_with_include_deleted_fixture(config): - return {**config, "include_deleted": True} + new_config = {**config, "include_deleted": True} + new_config.pop("_limit", None) + new_config.pop("end_date", None) + return new_config diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json index 1f4e623f63685..5debd07b2087f 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json @@ -5,20 +5,10 @@ "title": "Source Facebook Marketing", "type": "object", "properties": { - "account_id": { - "title": "Account Id", - "description": "The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.", - "type": "string" - }, - "access_token": { - "title": "Access Token", - "description": "The value of the access token generated. See the <a href=\"https://docs.airbyte.io/integrations/sources/facebook-marketing\">docs</a> for more information", - "airbyte_secret": true, - "type": "string" - }, "start_date": { "title": "Start Date", "description": "The date from which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.", + "order": 0, "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", "examples": ["2017-01-25T00:00:00Z"], "type": "string", @@ -27,42 +17,44 @@ "end_date": { "title": "End Date", "description": "The date until which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated between start_date and this date will be replicated. Not setting this option will result in always syncing the latest data.", + "order": 1, "pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", "examples": ["2017-01-26T00:00:00Z"], "type": "string", "format": "date-time" }, - "fetch_thumbnail_images": { - "title": "Fetch Thumbnail Images", - "description": "In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url", - "default": false, - "type": "boolean" + "account_id": { + "title": "Account ID", + "description": "The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.", + "order": 2, + "examples": ["111111111111111"], + "type": "string" + }, + "access_token": { + "title": "Access Token", + "description": "The value of the access token generated. See the <a href=\"https://docs.airbyte.io/integrations/sources/facebook-marketing\">docs</a> for more information", + "order": 3, + "airbyte_secret": true, + "type": "string" }, "include_deleted": { "title": "Include Deleted", - "description": "Include data from deleted campaigns, ads, and adsets", + "description": "Include data from deleted Campaigns, Ads, and AdSets", "default": false, + "order": 4, "type": "boolean" }, - "insights_lookback_window": { - "title": "Insights Lookback Window", - "description": "The attribution window for the actions", - "default": 28, - "minimum": 0, - "maximum": 28, - "type": "integer" - }, - "insights_days_per_job": { - "title": "Insights Days Per Job", - "description": "Number of days to sync in one job (the more data you have, the smaller this parameter should be)", - "default": 7, - "minimum": 1, - "maximum": 30, - "type": "integer" + "fetch_thumbnail_images": { + "title": "Fetch Thumbnail Images", + "description": "In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url", + "default": false, + "order": 5, + "type": "boolean" }, "custom_insights": { "title": "Custom Insights", - "description": "A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)", + "description": "A list which contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)", + "order": 6, "type": "array", "items": { "title": "InsightConfig", @@ -105,48 +97,7 @@ } } }, - "required": ["account_id", "access_token", "start_date"], - "definitions": { - "InsightConfig": { - "title": "InsightConfig", - "type": "object", - "properties": { - "name": { - "title": "Name", - "description": "The name value of insight", - "type": "string" - }, - "fields": { - "title": "Fields", - "description": "A list of chosen fields for fields parameter", - "default": [], - "type": "array", - "items": { - "type": "string" - } - }, - "breakdowns": { - "title": "Breakdowns", - "description": "A list of chosen breakdowns for breakdowns", - "default": [], - "type": "array", - "items": { - "type": "string" - } - }, - "action_breakdowns": { - "title": "Action Breakdowns", - "description": "A list of chosen action_breakdowns for action_breakdowns", - "default": [], - "type": "array", - "items": { - "type": "string" - } - } - }, - "required": ["name"] - } - } + "required": ["start_date", "account_id", "access_token"] }, "supportsIncremental": true, "supported_destination_sync_modes": ["append"], diff --git a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py index c363a4adf9593..ef869e5eec897 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/integration_tests/test_streams.py @@ -4,10 +4,10 @@ import copy +import logging from typing import Any, List, MutableMapping, Set, Tuple import pytest -from airbyte_cdk import AirbyteLogger from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog, Type from source_facebook_marketing.source import SourceFacebookMarketing @@ -52,7 +52,7 @@ def test_streams_with_include_deleted(self, stream_name, deleted_id, config_with assert deleted_records, f"{stream_name} stream should have deleted records returned" assert is_specific_deleted_pulled, f"{stream_name} stream should have a deleted record with id={deleted_id}" - @pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 1), ("ad_sets", 1)]) + @pytest.mark.parametrize("stream_name, deleted_num", [("ads", 2), ("campaigns", 3), ("ad_sets", 1)]) def test_streams_with_include_deleted_and_state(self, stream_name, deleted_num, config_with_include_deleted, configured_catalog, state): """Should ignore state because of include_deleted enabled""" catalog = self.slice_catalog(configured_catalog, {stream_name}) @@ -92,7 +92,7 @@ def slice_catalog(catalog: ConfiguredAirbyteCatalog, streams: Set[str]) -> Confi def _read_records(conf, catalog, state=None) -> Tuple[List[AirbyteMessage], List[AirbyteMessage]]: records = [] states = [] - for message in SourceFacebookMarketing().read(AirbyteLogger(), conf, catalog, state=state): + for message in SourceFacebookMarketing().read(logging.getLogger("airbyte"), conf, catalog, state=state): if message.type == Type.RECORD: records.append(message) elif message.type == Type.STATE: diff --git a/airbyte-integrations/connectors/source-facebook-marketing/setup.py b/airbyte-integrations/connectors/source-facebook-marketing/setup.py index 384ef83be414d..1d2b4267bb387 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/setup.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/setup.py @@ -6,9 +6,9 @@ from setuptools import find_packages, setup MAIN_REQUIREMENTS = [ - "airbyte-cdk~=0.1.35", - "cached_property~=1.5", - "facebook_business~=12.0", + "airbyte-cdk==0.1.47", + "cached_property==1.5.2", + "facebook_business==12.0.1", "pendulum>=2,<3", ] diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py index 9ee3d73c4f8af..1f9433475e049 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py @@ -5,24 +5,41 @@ import json import logging from time import sleep +from typing import Tuple +import backoff import pendulum from cached_property import cached_property from facebook_business import FacebookAdsApi from facebook_business.adobjects import user as fb_user from facebook_business.adobjects.adaccount import AdAccount +from facebook_business.api import FacebookResponse from facebook_business.exceptions import FacebookRequestError -from source_facebook_marketing.common import FacebookAPIException +from source_facebook_marketing.streams.common import retry_pattern logger = logging.getLogger("airbyte") +class FacebookAPIException(Exception): + """General class for all API errors""" + + +backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) + + class MyFacebookAdsApi(FacebookAdsApi): """Custom Facebook API class to intercept all API calls and handle call rate limits""" - call_rate_threshold = 90 # maximum percentage of call limit utilization + call_rate_threshold = 95 # maximum percentage of call limit utilization pause_interval_minimum = pendulum.duration(minutes=1) # default pause interval if reached or close to call rate limit + # Insights async jobs throttle, from 1 to 100 + _ads_insights_throttle: Tuple[float, float] + + @property + def ads_insights_throttle(self) -> Tuple[float, float]: + return self._ads_insights_throttle + @staticmethod def parse_call_rate_header(headers): usage = 0 @@ -86,6 +103,19 @@ def handle_call_rate_limit(self, response, params): logger.warning(f"Utilization is too high ({usage})%, pausing for {pause_interval}") sleep(pause_interval.total_seconds()) + def _update_insights_throttle_limit(self, response: FacebookResponse): + """ + For /insights call every response contains x-fb-ads-insights-throttle + header representing current throttle limit parameter for async insights + jobs for current app/account. We need this information to adjust + number of running async jobs for optimal performance. + """ + ads_insights_throttle = response.headers().get("x-fb-ads-insights-throttle") + if ads_insights_throttle: + ads_insights_throttle = json.loads(ads_insights_throttle) + self._ads_insights_throttle = ads_insights_throttle.get("app_id_util_pct", 0), ads_insights_throttle.get("acc_id_util_pct", 0) + + @backoff_policy def call( self, method, @@ -98,6 +128,7 @@ def call( ): """Makes an API call, delegate actual work to parent class and handles call rates""" response = super().call(method, path, params, headers, files, url_override, api_version) + self._update_insights_throttle_limit(response) self.handle_call_rate_limit(response, params) return response diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/async_job.py deleted file mode 100644 index 5bb217533da3b..0000000000000 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/async_job.py +++ /dev/null @@ -1,149 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - -import logging -from enum import Enum -from typing import Any, Mapping - -import backoff -import pendulum -from facebook_business.exceptions import FacebookRequestError -from source_facebook_marketing.api import API - -from .common import JobException, JobTimeoutException, retry_pattern - -backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) -logger = logging.getLogger("airbyte") - - -class Status(Enum): - """Async job statuses""" - - COMPLETED = "Job Completed" - FAILED = "Job Failed" - SKIPPED = "Job Skipped" - STARTED = "Job Started" - - -class AsyncJob: - """AsyncJob wraps FB AdReport class and provides interface to restart/retry the async job""" - - MAX_WAIT_TO_START = pendulum.duration(minutes=5) - MAX_WAIT_TO_FINISH = pendulum.duration(minutes=30) - - def __init__(self, api: API, params: Mapping[str, Any]): - """Initialize - - :param api: Facebook Api wrapper - :param params: job params, required to start/restart job - """ - self._params = params - self._api = api - self._job = None - self._start_time = None - self._finish_time = None - self._failed = False - - @backoff_policy - def start(self): - """Start remote job""" - if self._job: - raise RuntimeError(f"{self}: Incorrect usage of start - the job already started, use restart instead") - - self._job = self._api.account.get_insights(params=self._params, is_async=True) - self._start_time = pendulum.now() - job_id = self._job["report_run_id"] - time_range = self._params["time_range"] - breakdowns = self._params["breakdowns"] - logger.info(f"Created AdReportRun: {job_id} to sync insights {time_range} with breakdown {breakdowns}") - - def restart(self): - """Restart failed job""" - if not self._job or not self.failed: - raise RuntimeError(f"{self}: Incorrect usage of restart - only failed jobs can be restarted") - - self._job = None - self._failed = False - self._start_time = None - self._finish_time = None - self.start() - logger.info(f"{self}: restarted") - - @property - def elapsed_time(self): - """Elapsed time since the job start""" - if not self._start_time: - return None - - end_time = self._finish_time or pendulum.now() - return end_time - self._start_time - - @property - def completed(self) -> bool: - """Check job status and return True if it is completed successfully - - :return: True if completed successfully, False - if task still running - :raises: JobException in case job failed to start, failed or timed out - """ - try: - return self._check_status() - except JobException: - self._failed = True - raise - - @property - def failed(self) -> bool: - """Tell if the job previously failed""" - return self._failed - - @backoff_policy - def _update_job(self): - if not self._job: - raise RuntimeError(f"{self}: Incorrect usage of the method - the job is not started") - self._job = self._job.api_get() - - def _check_status(self) -> bool: - """Perform status check - - :return: True if the job is completed, False - if the job is still running - :raises: errors if job failed or timed out - """ - self._update_job() - job_progress_pct = self._job["async_percent_completion"] - logger.info(f"{self} is {job_progress_pct}% complete ({self._job['async_status']})") - runtime = self.elapsed_time - - if self._job["async_status"] == Status.COMPLETED.value: - self._finish_time = pendulum.now() - return True - elif self._job["async_status"] == Status.FAILED.value: - raise JobException(f"{self._job} failed after {runtime.in_seconds()} seconds.") - elif self._job["async_status"] == Status.SKIPPED.value: - raise JobException(f"{self._job} skipped after {runtime.in_seconds()} seconds.") - - if runtime > self.MAX_WAIT_TO_START and self._job["async_percent_completion"] == 0: - raise JobTimeoutException( - f"{self._job} did not start after {runtime.in_seconds()} seconds." - f" This is an intermittent error which may be fixed by retrying the job. Aborting." - ) - elif runtime > self.MAX_WAIT_TO_FINISH: - raise JobTimeoutException( - f"{self._job} did not finish after {runtime.in_seconds()} seconds." - f" This is an intermittent error which may be fixed by retrying the job. Aborting." - ) - return False - - @backoff_policy - def get_result(self) -> Any: - """Retrieve result of the finished job.""" - if not self._job or self.failed: - raise RuntimeError(f"{self}: Incorrect usage of get_result - the job is not started of failed") - return self._job.get_result() - - def __str__(self) -> str: - """String representation of the job wrapper.""" - job_id = self._job["report_run_id"] if self._job else "<None>" - time_range = self._params["time_range"] - breakdowns = self._params["breakdowns"] - return f"AdReportRun(id={job_id}, time_range={time_range}, breakdowns={breakdowns}" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py index 72f1328a7151b..645af8a88c9be 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py @@ -3,26 +3,23 @@ # import logging -from datetime import datetime -from typing import Any, List, Mapping, MutableMapping, Optional, Tuple, Type +from typing import Any, Iterator, List, Mapping, MutableMapping, Tuple, Type -import pendulum from airbyte_cdk.logger import AirbyteLogger from airbyte_cdk.models import ( - AirbyteConnectionStatus, + AirbyteMessage, AuthSpecification, + ConfiguredAirbyteStream, ConnectorSpecification, DestinationSyncMode, OAuth2Specification, - Status, + SyncMode, ) from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.core import package_name_from_class -from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader -from jsonschema import RefResolver -from pydantic import BaseModel, Field +from airbyte_cdk.sources.utils.schema_helpers import InternalConfig from source_facebook_marketing.api import API +from source_facebook_marketing.spec import ConnectorConfig from source_facebook_marketing.streams import ( AdCreatives, Ads, @@ -40,101 +37,36 @@ logger = logging.getLogger("airbyte") - -class InsightConfig(BaseModel): - - name: str = Field(description="The name value of insight") - - fields: Optional[List[str]] = Field(description="A list of chosen fields for fields parameter", default=[]) - - breakdowns: Optional[List[str]] = Field(description="A list of chosen breakdowns for breakdowns", default=[]) - - action_breakdowns: Optional[List[str]] = Field(description="A list of chosen action_breakdowns for action_breakdowns", default=[]) - - -class ConnectorConfig(BaseModel): - class Config: - title = "Source Facebook Marketing" - - account_id: str = Field(description="The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.") - - access_token: str = Field( - description='The value of the access token generated. See the <a href="https://docs.airbyte.io/integrations/sources/facebook-marketing">docs</a> for more information', - airbyte_secret=True, - ) - - start_date: datetime = Field( - description="The date from which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.", - pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", - examples=["2017-01-25T00:00:00Z"], - ) - - end_date: Optional[datetime] = Field( - description="The date until which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated between start_date and this date will be replicated. Not setting this option will result in always syncing the latest data.", - pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", - examples=["2017-01-26T00:00:00Z"], - default_factory=pendulum.now, - ) - - fetch_thumbnail_images: bool = Field( - default=False, description="In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url" - ) - - include_deleted: bool = Field(default=False, description="Include data from deleted campaigns, ads, and adsets") - - insights_lookback_window: int = Field( - default=28, - description="The attribution window for the actions", - minimum=0, - maximum=28, - ) - - insights_days_per_job: int = Field( - default=7, - description="Number of days to sync in one job (the more data you have, the smaller this parameter should be)", - minimum=1, - maximum=30, - ) - custom_insights: Optional[List[InsightConfig]] = Field( - description="A list wich contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)" - ) +DOCS_URL = "https://docs.airbyte.io/integrations/sources/facebook-marketing" class SourceFacebookMarketing(AbstractSource): - def check_connection(self, logger, config: Mapping[str, Any]) -> Tuple[bool, Any]: + def check_connection(self, _logger: "logging.Logger", config: Mapping[str, Any]) -> Tuple[bool, Any]: """Connection check to validate that the user-provided config can be used to connect to the underlying API :param config: the user-input config object conforming to the connector's spec.json - :param logger: logger object + :param _logger: logger object :return Tuple[bool, Any]: (True, None) if the input config can be used to connect to the API successfully, (False, error) otherwise. """ - ok = False - error_msg = None - - try: - config = ConnectorConfig.parse_obj(config) # FIXME: this will be not need after we fix CDK - api = API(account_id=config.account_id, access_token=config.access_token) - logger.info(f"Select account {api.account}") - ok = True - except Exception as exc: - error_msg = repr(exc) + config = ConnectorConfig.parse_obj(config) + api = API(account_id=config.account_id, access_token=config.access_token) + logger.info(f"Select account {api.account}") - return ok, error_msg + return True, None def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: """Discovery method, returns available streams :param config: A Mapping of the user input configuration as defined in the connector spec. + :return: list of the stream instances """ - config: ConnectorConfig = ConnectorConfig.parse_obj(config) # FIXME: this will be not need after we fix CDK + config: ConnectorConfig = ConnectorConfig.parse_obj(config) api = API(account_id=config.account_id, access_token=config.access_token) insights_args = dict( api=api, start_date=config.start_date, end_date=config.end_date, - buffer_days=config.insights_lookback_window, - days_per_job=config.insights_days_per_job, ) streams = [ @@ -154,30 +86,17 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]: return self._update_insights_streams(insights=config.custom_insights, args=insights_args, streams=streams) - def check(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: - """Implements the Check Connection operation from the Airbyte Specification. See https://docs.airbyte.io/architecture/airbyte-specification.""" - try: - check_succeeded, error = self.check_connection(logger, config) - if not check_succeeded: - return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) - except Exception as e: - return AirbyteConnectionStatus(status=Status.FAILED, message=repr(e)) - - self._check_custom_insights_entries(config.get("custom_insights", [])) - - return AirbyteConnectionStatus(status=Status.SUCCEEDED) - def spec(self, *args, **kwargs) -> ConnectorSpecification: - """ - Returns the spec for this integration. The spec is a JSON-Schema object describing the required configurations (e.g: username and password) - required to run this integration. + """Returns the spec for this integration. + The spec is a JSON-Schema object describing the required configurations + (e.g: username and password) required to run this integration. """ return ConnectorSpecification( documentationUrl="https://docs.airbyte.io/integrations/sources/facebook-marketing", changelogUrl="https://docs.airbyte.io/integrations/sources/facebook-marketing", supportsIncremental=True, supported_destination_sync_modes=[DestinationSyncMode.append], - connectionSpecification=expand_local_ref(ConnectorConfig.schema()), + connectionSpecification=ConnectorConfig.schema(), authSpecification=AuthSpecification( auth_type="oauth2.0", oauth2Specification=OAuth2Specification( @@ -189,7 +108,6 @@ def spec(self, *args, **kwargs) -> ConnectorSpecification: def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream]]: """Update method, if insights have values returns streams replacing the default insights streams else returns streams - """ if not insights: return streams @@ -206,52 +124,64 @@ def _update_insights_streams(self, insights, args, streams) -> List[Type[Stream] return streams + insights_custom_streams - def _check_custom_insights_entries(self, insights: List[Mapping[str, Any]]): - - loader = ResourceSchemaLoader(package_name_from_class(self.__class__)) - default_fields = list(loader.get_schema("ads_insights").get("properties", {}).keys()) - default_breakdowns = list(loader.get_schema("ads_insights_breakdowns").get("properties", {}).keys()) - default_action_breakdowns = list(loader.get_schema("ads_insights_action_breakdowns").get("properties", {}).keys()) - - for insight in insights: - if insight.get("fields"): - value_checked, value = self._check_values(default_fields, insight.get("fields")) - if not value_checked: - message = f"{value} is not a valid field name" - raise Exception("Config validation error: " + message) from None - if insight.get("breakdowns"): - value_checked, value = self._check_values(default_breakdowns, insight.get("breakdowns")) - if not value_checked: - message = f"{value} is not a valid breakdown name" - raise Exception("Config validation error: " + message) from None - if insight.get("action_breakdowns"): - value_checked, value = self._check_values(default_action_breakdowns, insight.get("action_breakdowns")) - if not value_checked: - message = f"{value} is not a valid action_breakdown name" - raise Exception("Config validation error: " + message) from None - - return True - - def _check_values(self, default_value: List[str], custom_value: List[str]) -> Tuple[bool, Any]: - for e in custom_value: - if e not in default_value: - logger.error(f"{e} does not appear in {default_value}") - return False, e - - return True, None - - -def expand_local_ref(schema, resolver=None, **kwargs): - resolver = resolver or RefResolver("", schema) - if isinstance(schema, MutableMapping): - if "$ref" in schema: - ref_url = schema.pop("$ref") - url, resolved_schema = resolver.resolve(ref_url) - schema.update(resolved_schema) - for key, value in schema.items(): - schema[key] = expand_local_ref(value, resolver=resolver) - return schema - elif isinstance(schema, List): - return [expand_local_ref(item, resolver=resolver) for item in schema] - - return schema + def _read_incremental( + self, + _logger: AirbyteLogger, + stream_instance: Stream, + configured_stream: ConfiguredAirbyteStream, + connector_state: MutableMapping[str, Any], + internal_config: InternalConfig, + ) -> Iterator[AirbyteMessage]: + """We override this method because we need to inject new state handling. + Old way: + pass stream_state in read_records and other methods + call stream_state = stream_instance.get_updated_state(stream_state, record_data) for each record + New way: + stream_instance.state = stream_state + call stream_instance.state when we want to dump state message + + :param _logger: + :param stream_instance: + :param configured_stream: + :param connector_state: + :param internal_config: + :return: + """ + if not hasattr(stream_instance, "state"): + yield from super()._read_incremental(logger, stream_instance, configured_stream, connector_state, internal_config) + return + + stream_name = configured_stream.stream.name + stream_state = connector_state.get(stream_name, {}) + if stream_state: + logger.info(f"Setting state of {stream_name} stream to {stream_state}") + stream_instance.state = stream_state + + slices = stream_instance.stream_slices( + cursor_field=configured_stream.cursor_field, sync_mode=SyncMode.incremental, stream_state=stream_state + ) + total_records_counter = 0 + for _slice in slices: + records = stream_instance.read_records( + sync_mode=SyncMode.incremental, + stream_slice=_slice, + stream_state=stream_state, + cursor_field=configured_stream.cursor_field or None, + ) + for record_counter, record_data in enumerate(records, start=1): + yield self._as_airbyte_record(stream_name, record_data) + checkpoint_interval = stream_instance.state_checkpoint_interval + if checkpoint_interval and record_counter % checkpoint_interval == 0: + yield self._checkpoint_state(stream_name, stream_instance.state, connector_state, logger) + + total_records_counter += 1 + # This functionality should ideally live outside of this method + # but since state is managed inside this method, we keep track + # of it here. + if self._limit_reached(internal_config, total_records_counter): + # Break from slice loop to save state and exit from _read_incremental function. + break + + yield self._checkpoint_state(stream_name, stream_instance.state, connector_state, logger) + if self._limit_reached(internal_config, total_records_counter): + return diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py new file mode 100644 index 0000000000000..2edb95f2b6af7 --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/spec.py @@ -0,0 +1,98 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import logging +from datetime import datetime +from enum import Enum +from typing import List, Optional + +import pendulum +from airbyte_cdk.sources.config import BaseConfig +from facebook_business.adobjects.adsinsights import AdsInsights +from pydantic import BaseModel, Field + +logger = logging.getLogger("airbyte") + + +ValidFields = Enum('ValidEnums', AdsInsights.Field.__dict__) +ValidBreakdowns = Enum('ValidBreakdowns', AdsInsights.Breakdowns.__dict__) +ValidActionBreakdowns = Enum('ValidActionBreakdowns', AdsInsights.ActionBreakdowns.__dict__) + +class InsightConfig(BaseModel): + """Config for custom insights""" + + name: str = Field(description="The name value of insight") + fields: Optional[List[ValidFields]] = Field(description="A list of chosen fields for fields parameter", default=[]) + breakdowns: Optional[List[ValidBreakdowns]] = Field(description="A list of chosen breakdowns for breakdowns", default=[]) + action_breakdowns: Optional[List[ValidActionBreakdowns]] = Field(description="A list of chosen action_breakdowns for action_breakdowns", default=[]) + + +class ConnectorConfig(BaseConfig): + """Connector config""" + + class Config: + title = "Source Facebook Marketing" + + start_date: datetime = Field( + title="Start Date", + order=0, + description=( + "The date from which you'd like to replicate data for all incremental streams, " + "in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated." + ), + pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", + examples=["2017-01-25T00:00:00Z"], + ) + + end_date: Optional[datetime] = Field( + title="End Date", + order=1, + description=( + "The date until which you'd like to replicate data for all incremental streams, in the format YYYY-MM-DDT00:00:00Z. " + "All data generated between start_date and this date will be replicated. " + "Not setting this option will result in always syncing the latest data." + ), + pattern="^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$", + examples=["2017-01-26T00:00:00Z"], + default_factory=pendulum.now, + ) + + account_id: str = Field( + title="Account ID", + order=2, + description="The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.", + examples=["111111111111111"], + ) + + access_token: str = Field( + title="Access Token", + order=3, + description=( + 'The value of the access token generated. ' + 'See the <a href="https://docs.airbyte.io/integrations/sources/facebook-marketing">docs</a> for more information' + ), + airbyte_secret=True, + ) + + include_deleted: bool = Field( + title="Include Deleted", + order=4, + default=False, + description="Include data from deleted Campaigns, Ads, and AdSets", + ) + + fetch_thumbnail_images: bool = Field( + title="Fetch Thumbnail Images", + order=5, + default=False, + description="In each Ad Creative, fetch the thumbnail_url and store the result in thumbnail_data_url", + ) + + custom_insights: Optional[List[InsightConfig]] = Field( + title="Custom Insights", + order=6, + description=( + "A list which contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)" + ), + ) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py deleted file mode 100644 index fe0013a8966ea..0000000000000 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py +++ /dev/null @@ -1,490 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - -import base64 -import time -import urllib.parse as urlparse -from abc import ABC -from collections import deque -from datetime import datetime -from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence - -import airbyte_cdk.sources.utils.casing as casing -import backoff -import pendulum -import requests -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.core import package_name_from_class -from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader -from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer -from cached_property import cached_property -from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse -from facebook_business.exceptions import FacebookRequestError -from source_facebook_marketing.api import API - -from .async_job import AsyncJob -from .common import FacebookAPIException, JobException, batch, deep_merge, retry_pattern - -backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5) - - -def remove_params_from_url(url: str, params: List[str]) -> str: - """ - Parses a URL and removes the query parameters specified in params - :param url: URL - :param params: list of query parameters - :return: URL with params removed - """ - parsed = urlparse.urlparse(url) - query = urlparse.parse_qs(parsed.query, keep_blank_values=True) - filtered = dict((k, v) for k, v in query.items() if k not in params) - return urlparse.urlunparse( - [parsed.scheme, parsed.netloc, parsed.path, parsed.params, urlparse.urlencode(filtered, doseq=True), parsed.fragment] - ) - - -def fetch_thumbnail_data_url(url: str) -> str: - try: - response = requests.get(url) - if response.status_code == 200: - type = response.headers["content-type"] - data = base64.b64encode(response.content) - return f"data:{type};base64,{data.decode('ascii')}" - except requests.exceptions.RequestException: - pass - return None - - -class FBMarketingStream(Stream, ABC): - """Base stream class""" - - primary_key = "id" - transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) - - page_size = 100 - - enable_deleted = False - entity_prefix = None - - def __init__(self, api: API, include_deleted: bool = False, **kwargs): - super().__init__(**kwargs) - self._api = api - self._include_deleted = include_deleted if self.enable_deleted else False - - @cached_property - def fields(self) -> List[str]: - """List of fields that we want to query, for now just all properties from stream's schema""" - return list(self.get_json_schema().get("properties", {}).keys()) - - @backoff_policy - def execute_in_batch(self, requests: Iterable[FacebookRequest]) -> Sequence[MutableMapping[str, Any]]: - """Execute list of requests in batches""" - records = [] - - def success(response: FacebookResponse): - records.append(response.json()) - - def failure(response: FacebookResponse): - raise response.error() - - api_batch: FacebookAdsApiBatch = self._api.api.new_batch() - for request in requests: - api_batch.add_request(request, success=success, failure=failure) - retry_batch = api_batch.execute() - if retry_batch: - raise FacebookAPIException(f"Batch has failed {len(retry_batch)} requests") - - return records - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - """Main read method used by CDK""" - for record in self._read_records(params=self.request_params(stream_state=stream_state)): - yield self._extend_record(record, fields=self.fields) - - def _read_records(self, params: Mapping[str, Any]) -> Iterable: - """Wrapper around query to backoff errors. - We have default implementation because we still can override read_records so this method is not mandatory. - """ - return [] - - @backoff_policy - def _extend_record(self, obj: Any, **kwargs): - """Wrapper around api_get to backoff errors""" - return obj.api_get(**kwargs).export_all_data() - - def request_params(self, **kwargs) -> MutableMapping[str, Any]: - """Parameters that should be passed to query_records method""" - params = {"limit": self.page_size} - - if self._include_deleted: - params.update(self._filter_all_statuses()) - - return params - - def _filter_all_statuses(self) -> MutableMapping[str, Any]: - """Filter that covers all possible statuses thus including deleted/archived records""" - filt_values = [ - "active", - "archived", - "completed", - "limited", - "not_delivering", - "deleted", - "not_published", - "pending_review", - "permanently_deleted", - "recently_completed", - "recently_rejected", - "rejected", - "scheduled", - "inactive", - ] - - return { - "filtering": [ - {"field": f"{self.entity_prefix}.delivery_info", "operator": "IN", "value": filt_values}, - ], - } - - -class FBMarketingIncrementalStream(FBMarketingStream, ABC): - cursor_field = "updated_time" - - def __init__(self, start_date: datetime, end_date: datetime, **kwargs): - super().__init__(**kwargs) - self._start_date = pendulum.instance(start_date) - self._end_date = pendulum.instance(end_date) - - def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): - """Update stream state from latest record""" - potentially_new_records_in_the_past = self._include_deleted and not current_stream_state.get("include_deleted", False) - record_value = latest_record[self.cursor_field] - state_value = current_stream_state.get(self.cursor_field) or record_value - max_cursor = max(pendulum.parse(state_value), pendulum.parse(record_value)) - if potentially_new_records_in_the_past: - max_cursor = record_value - - return { - self.cursor_field: str(max_cursor), - "include_deleted": self._include_deleted, - } - - def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: - """Include state filter""" - params = super().request_params(**kwargs) - params = deep_merge(params, self._state_filter(stream_state=stream_state or {})) - return params - - def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: - """Additional filters associated with state if any set""" - state_value = stream_state.get(self.cursor_field) - filter_value = self._start_date if not state_value else pendulum.parse(state_value) - - potentially_new_records_in_the_past = self._include_deleted and not stream_state.get("include_deleted", False) - if potentially_new_records_in_the_past: - self.logger.info(f"Ignoring bookmark for {self.name} because of enabled `include_deleted` option") - filter_value = self._start_date - - return { - "filtering": [ - { - "field": f"{self.entity_prefix}.{self.cursor_field}", - "operator": "GREATER_THAN", - "value": filter_value.int_timestamp, - }, - ], - } - - -class AdCreatives(FBMarketingStream): - """AdCreative is append only stream - doc: https://developers.facebook.com/docs/marketing-api/reference/ad-creative - """ - - entity_prefix = "adcreative" - batch_size = 50 - - def __init__(self, fetch_thumbnail_images: bool = False, **kwargs): - super().__init__(**kwargs) - self._fetch_thumbnail_images = fetch_thumbnail_images - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - """Read records using batch API""" - records = self._read_records(params=self.request_params(stream_state=stream_state)) - # "thumbnail_data_url" is a field in our stream's schema because we - # output it (see fix_thumbnail_urls below), but it's not a field that - # we can request from Facebook - request_fields = [f for f in self.fields if f != "thumbnail_data_url"] - requests = [record.api_get(fields=request_fields, pending=True) for record in records] - for requests_batch in batch(requests, size=self.batch_size): - for record in self.execute_in_batch(requests_batch): - yield self.fix_thumbnail_urls(record) - - def fix_thumbnail_urls(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]: - """Cleans and, if enabled, fetches thumbnail URLs for each creative.""" - # The thumbnail_url contains some extra query parameters that don't affect the validity of the URL, but break SAT - thumbnail_url = record.get("thumbnail_url") - if thumbnail_url: - record["thumbnail_url"] = remove_params_from_url(thumbnail_url, ["_nc_hash", "d"]) - if self._fetch_thumbnail_images: - record["thumbnail_data_url"] = fetch_thumbnail_data_url(thumbnail_url) - return record - - @backoff_policy - def _read_records(self, params: Mapping[str, Any]) -> Iterator: - return self._api.account.get_ad_creatives(params=params) - - -class Ads(FBMarketingIncrementalStream): - """doc: https://developers.facebook.com/docs/marketing-api/reference/adgroup""" - - entity_prefix = "ad" - enable_deleted = True - - @backoff_policy - def _read_records(self, params: Mapping[str, Any]): - return self._api.account.get_ads(params=params, fields=[self.cursor_field]) - - -class AdSets(FBMarketingIncrementalStream): - """doc: https://developers.facebook.com/docs/marketing-api/reference/ad-campaign""" - - entity_prefix = "adset" - enable_deleted = True - - @backoff_policy - def _read_records(self, params: Mapping[str, Any]): - return self._api.account.get_ad_sets(params=params) - - -class Campaigns(FBMarketingIncrementalStream): - """doc: https://developers.facebook.com/docs/marketing-api/reference/ad-campaign-group""" - - entity_prefix = "campaign" - enable_deleted = True - - @backoff_policy - def _read_records(self, params: Mapping[str, Any]): - return self._api.account.get_campaigns(params=params) - - -class Videos(FBMarketingIncrementalStream): - """See: https://developers.facebook.com/docs/marketing-api/reference/video""" - - entity_prefix = "video" - enable_deleted = True - - @backoff_policy - def _read_records(self, params: Mapping[str, Any]) -> Iterator: - return self._api.account.get_ad_videos(params=params) - - -class AdsInsights(FBMarketingIncrementalStream): - """doc: https://developers.facebook.com/docs/marketing-api/insights""" - - cursor_field = "date_start" - primary_key = None - - ALL_ACTION_ATTRIBUTION_WINDOWS = [ - "1d_click", - "7d_click", - "28d_click", - "1d_view", - "7d_view", - "28d_view", - ] - - ALL_ACTION_BREAKDOWNS = [ - "action_type", - "action_target_id", - "action_destination", - ] - - MAX_ASYNC_SLEEP = pendulum.duration(minutes=5) - MAX_ASYNC_JOBS = 10 - INSIGHTS_RETENTION_PERIOD = pendulum.duration(days=37 * 30) - - action_breakdowns = ALL_ACTION_BREAKDOWNS - level = "ad" - action_attribution_windows = ALL_ACTION_ATTRIBUTION_WINDOWS - time_increment = 1 - - breakdowns = [] - - def __init__( - self, - buffer_days, - days_per_job, - name: str = None, - fields: List[str] = None, - breakdowns: List[str] = None, - action_breakdowns: List[str] = None, - **kwargs, - ): - - super().__init__(**kwargs) - self.lookback_window = pendulum.duration(days=buffer_days) - self._days_per_job = days_per_job - self._fields = fields - self.action_breakdowns = action_breakdowns or self.action_breakdowns - self.breakdowns = breakdowns or self.breakdowns - self._new_class_name = name - - @property - def name(self) -> str: - """ - :return: Stream name. By default this is the implementing class name, but it can be overridden as needed. - """ - name = self._new_class_name or self.__class__.__name__ - return casing.camel_to_snake(name) - - def read_records( - self, - sync_mode: SyncMode, - cursor_field: List[str] = None, - stream_slice: Mapping[str, Any] = None, - stream_state: Mapping[str, Any] = None, - ) -> Iterable[Mapping[str, Any]]: - """Waits for current job to finish (slice) and yield its result""" - job = self.wait_for_job(stream_slice["job"]) - # because we query `lookback_window` days before actual cursor we might get records older then cursor - - for obj in job.get_result(): - yield obj.export_all_data() - - def stream_slices(self, stream_state: Mapping[str, Any] = None, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: - """Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time. - This solution for Async was chosen because: - 1. we should commit state after each successful job - 2. we should run as many job as possible before checking for result - 3. we shouldn't proceed to consumption of the next job before previous succeed - """ - stream_state = stream_state or {} - running_jobs = deque() - date_ranges = list(self._date_ranges(stream_state=stream_state)) - for params in date_ranges: - params = deep_merge(params, self.request_params(stream_state=stream_state)) - job = AsyncJob(api=self._api, params=params) - job.start() - running_jobs.append(job) - if len(running_jobs) >= self.MAX_ASYNC_JOBS: - yield {"job": running_jobs.popleft()} - - while running_jobs: - yield {"job": running_jobs.popleft()} - - @retry_pattern(backoff.expo, JobException, max_tries=10, factor=5) - def wait_for_job(self, job: AsyncJob) -> AsyncJob: - if job.failed: - job.restart() - - factor = 2 - sleep_seconds = factor - while not job.completed: - self.logger.info(f"{job}: sleeping {sleep_seconds} seconds while waiting for completion") - time.sleep(sleep_seconds) - if sleep_seconds < self.MAX_ASYNC_SLEEP.in_seconds(): - sleep_seconds *= factor - - return job - - def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: - params = super().request_params(stream_state=stream_state, **kwargs) - params = deep_merge( - params, - { - "level": self.level, - "action_breakdowns": self.action_breakdowns, - "breakdowns": self.breakdowns, - "fields": self.fields, - "time_increment": self.time_increment, - "action_attribution_windows": self.action_attribution_windows, - }, - ) - - return params - - def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: - """Works differently for insights, so remove it""" - return {} - - def get_json_schema(self) -> Mapping[str, Any]: - """Add fields from breakdowns to the stream schema - :return: A dict of the JSON schema representing this stream. - """ - loader = ResourceSchemaLoader(package_name_from_class(self.__class__)) - schema = loader.get_schema("ads_insights") - if self._fields: - schema["properties"] = {k: v for k, v in schema["properties"].items() if k in self._fields} - if self.breakdowns: - breakdowns_properties = loader.get_schema("ads_insights_breakdowns")["properties"] - schema["properties"].update({prop: breakdowns_properties[prop] for prop in self.breakdowns}) - return schema - - @cached_property - def fields(self) -> List[str]: - """List of fields that we want to query, for now just all properties from stream's schema""" - if self._fields: - return self._fields - schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights") - return list(schema.get("properties", {}).keys()) - - def _date_ranges(self, stream_state: Mapping[str, Any]) -> Iterator[dict]: - """Iterate over period between start_date/state and now - - Notes: Facebook freezes insight data 28 days after it was generated, which means that all data - from the past 28 days may have changed since we last emitted it, so we retrieve it again. - """ - state_value = stream_state.get(self.cursor_field) - if state_value: - start_date = pendulum.parse(state_value) - self.lookback_window - else: - start_date = self._start_date - end_date = self._end_date - start_date = max(end_date - self.INSIGHTS_RETENTION_PERIOD, start_date) - - for since in pendulum.period(start_date, end_date).range("days", self._days_per_job): - until = min(since.add(days=self._days_per_job - 1), end_date) # -1 because time_range is inclusive - yield { - "time_range": {"since": since.to_date_string(), "until": until.to_date_string()}, - } - - -class AdsInsightsAgeAndGender(AdsInsights): - breakdowns = ["age", "gender"] - - -class AdsInsightsCountry(AdsInsights): - breakdowns = ["country"] - - -class AdsInsightsRegion(AdsInsights): - breakdowns = ["region"] - - -class AdsInsightsDma(AdsInsights): - breakdowns = ["dma"] - - -class AdsInsightsPlatformAndDevice(AdsInsights): - breakdowns = ["publisher_platform", "platform_position", "impression_device"] - action_breakdowns = ["action_type"] # FB Async Job fails for unknown reason if we set other breakdowns - - -class AdsInsightsActionType(AdsInsights): - breakdowns = [] - action_breakdowns = ["action_type"] diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/__init__.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/__init__.py new file mode 100644 index 0000000000000..784bf61081ffc --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/__init__.py @@ -0,0 +1,33 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +from .streams import ( + AdCreatives, + Ads, + AdSets, + AdsInsights, + AdsInsightsActionType, + AdsInsightsAgeAndGender, + AdsInsightsCountry, + AdsInsightsDma, + AdsInsightsPlatformAndDevice, + AdsInsightsRegion, + Campaigns, + Videos, +) + +__all__ = [ + "AdCreatives", + "Ads", + "AdSets", + "AdsInsights", + "AdsInsightsActionType", + "AdsInsightsAgeAndGender", + "AdsInsightsCountry", + "AdsInsightsDma", + "AdsInsightsPlatformAndDevice", + "AdsInsightsRegion", + "Campaigns", + "Videos", +] diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py new file mode 100644 index 0000000000000..7dc8c43948ec0 --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py @@ -0,0 +1,276 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# +import copy +import logging +from abc import ABC, abstractmethod +from enum import Enum +from typing import Any, List, Mapping, Optional + +import pendulum +from facebook_business.adobjects.adreportrun import AdReportRun +from facebook_business.adobjects.campaign import Campaign +from facebook_business.adobjects.objectparser import ObjectParser +from facebook_business.api import FacebookAdsApiBatch, FacebookResponse + +logger = logging.getLogger("airbyte") + + +def chunks(data, n): + """Yield successive n-sized chunks from lst.""" + for i in range(0, len(data), n): + yield data[i : i + n] + + +class Status(str, Enum): + """Async job statuses""" + + COMPLETED = "Job Completed" + FAILED = "Job Failed" + SKIPPED = "Job Skipped" + STARTED = "Job Started" + RUNNING = "Job Running" + NOT_STARTED = "Job Not Started" + + +class AsyncJob(ABC): + """Abstract AsyncJob base class""" + + @abstractmethod + def start(self): + """Start remote job""" + + @abstractmethod + def restart(self): + """Restart failed job""" + + @property + @abstractmethod + def restart_number(self): + """Number of restarts""" + + @property + @abstractmethod + def completed(self) -> bool: + """Check job status and return True if it is completed, use failed/succeeded to check if it was successful""" + + @property + @abstractmethod + def failed(self) -> bool: + """Tell if the job previously failed""" + + @abstractmethod + def update_job(self, batch=None): + """Method to retrieve job's status, separated because of retry handler""" + + @abstractmethod + def get_result(self) -> Any: + """Retrieve result of the finished job.""" + + +class ParentAsyncJob(AsyncJob): + """Group of async jobs""" + + def __init__(self, api, jobs: List[AsyncJob]): + self._api = api + self._jobs = jobs + self._restart_number = 0 + + def start(self): + """Start each job in the group.""" + for job in self._jobs: + job.start() + + def restart(self): + """Restart failed jobs""" + for job in self._jobs: + if job.failed: + job.restart() + self._restart_number = max(self._restart_number, job.restart_number) + + @property + def restart_number(self): + """Number of restarts""" + return self._restart_number + + @property + def completed(self) -> bool: + """Check job status and return True if all jobs are completed, use failed/succeeded to check if it was successful""" + return all(job.completed for job in self._jobs) + + @property + def failed(self) -> bool: + """Tell if any job previously failed""" + return any(job.failed for job in self._jobs) + + def update_job(self, batch=None): + """Checks jobs status in advance and restart if some failed.""" + batch = self._api.new_batch() + unfinished_jobs = [job for job in self._jobs if not job.completed] + for jobs in chunks(unfinished_jobs, 50): + for job in jobs: + job.update_job(batch=batch) + + while batch: + # If some of the calls from batch have failed, it returns a new + # FacebookAdsApiBatch object with those calls + batch = batch.execute() + + def get_result(self) -> Any: + """Retrieve result of the finished job.""" + for job in self._jobs: + yield from job.get_result() + + def split_job(self) -> "AsyncJob": + """Split existing job in few smaller ones grouped by ParentAsyncJob class""" + raise RuntimeError("Splitting of ParentAsyncJob is not allowed.") + + +class InsightAsyncJob(AsyncJob): + """AsyncJob wraps FB AdReport class and provides interface to restart/retry the async job""" + + def __init__(self, api, edge_object: Any, params: Mapping[str, Any], key: Optional[Any] = None): + """Initialize + + :param api: FB API + :param edge_object: Account, Campaign, (AdSet or Ad in future) + :param params: job params, required to start/restart job + """ + self._api = api + self._params = params + self._edge_object = edge_object + self._job: Optional[AdReportRun] = None + self._start_time = None + self._finish_time = None + self._failed = False + self._restart_number = 0 + self.key = key + + def split_job(self) -> ParentAsyncJob: + """Split existing job in few smaller ones grouped by ParentAsyncJob class. + TODO: use some cache to avoid expensive queries across different streams. + """ + campaign_params = dict(copy.deepcopy(self._params)) + # get campaigns from attribution window as well (28 day + 1 current day) + new_start = pendulum.parse(self._params["time_range"]["since"]) - pendulum.duration(days=28 + 1) + campaign_params.update(fields=["campaign_id"], level="campaign") + campaign_params["time_range"].update(since=new_start.to_date_string()) + campaign_params.pop("time_increment") # query all days + result = self._edge_object.get_insights(params=campaign_params) + campaign_ids = set(row["campaign_id"] for row in result) + logger.info(f"Got {len(campaign_ids)} campaigns for period {self._params['time_range']}: {campaign_ids}") + + return ParentAsyncJob(self._api, jobs=[InsightAsyncJob(self._api, Campaign(pk), self._params) for pk in campaign_ids]) + + def start(self): + """Start remote job""" + if self._job: + raise RuntimeError(f"{self}: Incorrect usage of start - the job already started, use restart instead") + + self._job = self._edge_object.get_insights(params=self._params, is_async=True) + self._start_time = pendulum.now() + job_id = self._job["report_run_id"] + time_range = self._params["time_range"] + breakdowns = self._params["breakdowns"] + logger.info(f"Created AdReportRun: {job_id} to sync insights {time_range} with breakdown {breakdowns} for {self._edge_object}") + + def restart(self): + """Restart failed job""" + if not self._job or not self.failed: + raise RuntimeError(f"{self}: Incorrect usage of restart - only failed jobs can be restarted") + + self._job = None + self._failed = False + self._start_time = None + self._finish_time = None + self._restart_number += 1 + self.start() + logger.info(f"{self}: restarted") + + @property + def restart_number(self): + """Number of restarts""" + return self._restart_number + + @property + def elapsed_time(self) -> Optional[pendulum.duration]: + """Elapsed time since the job start""" + if not self._start_time: + return None + + end_time = self._finish_time or pendulum.now() + return end_time - self._start_time + + @property + def completed(self) -> bool: + """Check job status and return True if it is completed, use failed/succeeded to check if it was successful + + :return: True if completed, False - if task still running + :raises: JobException in case job failed to start, failed or timed out + """ + return bool(self._finish_time is not None) + + @property + def failed(self) -> bool: + """Tell if the job previously failed""" + return self._failed + + def _batch_success_handler(self, response: FacebookResponse): + """Update job status from response""" + print("GOT", response.json()) + self._job = ObjectParser(reuse_object=self._job).parse_single(response.json()) + self._check_status() + + def _batch_failure_handler(self, response: FacebookResponse): + """Update job status from response""" + logger.info(f"Request failed with response: {response.body()}") + + def update_job(self, batch: Optional[FacebookAdsApiBatch] = None): + """Method to retrieve job's status, separated because of retry handler""" + if not self._job: + raise RuntimeError(f"{self}: Incorrect usage of the method - the job is not started") + + if self.completed: + job_progress_pct = self._job["async_percent_completion"] + logger.info(f"{self} is {job_progress_pct}% complete ({self._job['async_status']})") + # No need to update job status if its already completed + return + + if batch is not None: + self._job.api_get(batch=batch, success=self._batch_success_handler, failure=self._batch_failure_handler) + else: + self._job = self._job.api_get() + self._check_status() + + def _check_status(self) -> bool: + """Perform status check + + :return: True if the job is completed, False - if the job is still running + """ + job_progress_pct = self._job["async_percent_completion"] + job_status = self._job["async_status"] + logger.info(f"{self} is {job_progress_pct}% complete ({job_status})") + + if job_status == Status.COMPLETED: + self._finish_time = pendulum.now() # TODO: is not actual running time, but interval between check_status calls + return True + elif job_status in [Status.FAILED, Status.SKIPPED]: + self._finish_time = pendulum.now() + self._failed = True + logger.info(f"{self._job} has status {job_status} after {self.elapsed_time.in_seconds()} seconds.") + return True + + return False + + def get_result(self) -> Any: + """Retrieve result of the finished job.""" + if not self._job or self.failed: + raise RuntimeError(f"{self}: Incorrect usage of get_result - the job is not started of failed") + return self._job.get_result(params={"limit": 100}) + + def __str__(self) -> str: + """String representation of the job wrapper.""" + job_id = self._job["report_run_id"] if self._job else "<None>" + time_range = self._params["time_range"] + breakdowns = self._params["breakdowns"] + return f"AdReportRun(id={job_id}, {self._edge_object}, time_range={time_range}, breakdowns={breakdowns}" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py new file mode 100644 index 0000000000000..8c8ae23f13527 --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py @@ -0,0 +1,162 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import logging +import time +from typing import TYPE_CHECKING, Iterator, List, Tuple + +from facebook_business.api import FacebookAdsApiBatch + +from .async_job import AsyncJob + +if TYPE_CHECKING: + from source_facebook_marketing.api import API + +logger = logging.getLogger("airbyte") + + +class InsightAsyncJobManager: + """ + Class for managing Ads Insights async jobs. Before running next job it + checks current insight throttle value and if it greater than THROTTLE_LIMIT + variable, no new jobs added. + To consume completed jobs use completed_job generator, jobs will be returned in the same order they finished. + """ + + # When current insights throttle hit this value no new jobs added. + THROTTLE_LIMIT = 70 + FAILED_JOBS_RESTART_COUNT = 5 + # Time to wait before checking job status update again. + JOB_STATUS_UPDATE_SLEEP_SECONDS = 30 + # Maximum of concurrent jobs that could be scheduled. Since throttling + # limit is not reliable indicator of async workload capability we still + # have to use this parameter. It is equal to maximum number of request in batch (FB API limit) + MAX_JOBS_IN_QUEUE = 100 + MAX_JOBS_TO_CHECK = 50 + + def __init__(self, api: "API", jobs: Iterator[AsyncJob]): + """Init + + :param api: + :param jobs: + """ + self._api = api + self._jobs = jobs + self._running_jobs = [] + + def _start_jobs(self): + """Enqueue new jobs.""" + + self._update_api_throttle_limit() + self._wait_throttle_limit_down() + prev_jobs_count = len(self._running_jobs) + while self._get_current_throttle_value() < self.THROTTLE_LIMIT and len(self._running_jobs) < self.MAX_JOBS_IN_QUEUE: + job = next(iter(self._jobs), None) + if not job: + self._empty = True + break + job.start() + self._running_jobs.append(job) + + logger.info( + f"Added: {len(self._running_jobs) - prev_jobs_count} jobs. " + f"Current throttle limit is {self._current_throttle()}, " + f"{len(self._running_jobs)}/{self.MAX_JOBS_IN_QUEUE} job(s) in queue" + ) + + def completed_jobs(self) -> Iterator[AsyncJob]: + """Wait until job is ready and return it. If job + failed try to restart it for FAILED_JOBS_RESTART_COUNT times. After job + is completed new jobs added according to current throttling limit. + + :yield: completed jobs + """ + if not self._running_jobs: + self._start_jobs() + + while self._running_jobs: + completed_jobs = self._check_jobs_status_and_restart() + while not completed_jobs: + logger.info(f"No jobs ready to be consumed, wait for {self.JOB_STATUS_UPDATE_SLEEP_SECONDS} seconds") + time.sleep(self.JOB_STATUS_UPDATE_SLEEP_SECONDS) + completed_jobs = self._check_jobs_status_and_restart() + yield from completed_jobs + self._start_jobs() + + def _check_jobs_status_and_restart(self) -> List[AsyncJob]: + """Checks jobs status in advance and restart if some failed. + + :return: list of completed jobs + """ + completed_jobs = [] + running_jobs = [] + api_batch: FacebookAdsApiBatch = self._api.api.new_batch() + for job in self._running_jobs: + # we check it here because job can be an instance of ParentAsyncJob, which uses its own batch object + if len(api_batch) >= self.MAX_JOBS_TO_CHECK: + logger.info("Reached batch queue limit") + break + job.update_job(batch=api_batch) + + while api_batch: + # If some of the calls from batch have failed, it returns a new + # FacebookAdsApiBatch object with those calls + api_batch = api_batch.execute() + + failed_num = 0 + self._wait_throttle_limit_down() + for job in self._running_jobs: + if job.completed: + if job.failed: + if job.restart_number >= self.FAILED_JOBS_RESTART_COUNT: + raise Exception(f"Job {job} failed more than {self.FAILED_JOBS_RESTART_COUNT} times. Terminating...") + elif job.restart_number: + logger.info(f"Job {job} failed, trying to split job into smaller chunks (campaigns).") + group_job = job.split_job() + running_jobs.append(group_job) + group_job.start() + else: + logger.info(f"Job {job} failed, restarting") + job.restart() + running_jobs.append(job) + failed_num += 1 + else: + completed_jobs.append(job) + else: + running_jobs.append(job) + + self._running_jobs = running_jobs + logger.info(f"Completed jobs: {len(completed_jobs)}, Failed jobs: {failed_num}, Running jobs: {len(self._running_jobs)}") + + return completed_jobs + + def _wait_throttle_limit_down(self): + while self._get_current_throttle_value() > self.THROTTLE_LIMIT: + logger.info(f"Current throttle is {self._current_throttle()}, wait {self.JOB_STATUS_UPDATE_SLEEP_SECONDS} seconds") + time.sleep(self.JOB_STATUS_UPDATE_SLEEP_SECONDS) + self._update_api_throttle_limit() + + def _current_throttle(self) -> Tuple[float, float]: + """ + Return tuple of 2 floats representing current ads insights throttle values for app id and account id + """ + return self._api.api.ads_insights_throttle + + def _get_current_throttle_value(self) -> float: + """ + Get current ads insights throttle value based on app id and account id. + It evaluated as minimum of those numbers cause when account id throttle + hit 100 it cool down very slowly (i.e. it still says 100 despite no jobs + running and it capable serve new requests). Because of this behaviour + facebook throttle limit is not reliable metric to estimate async workload. + """ + return min(self._current_throttle()[0], self._current_throttle()[1]) + + def _update_api_throttle_limit(self): + """ + Sends <ACCOUNT_ID>/insights GET request with no parameters so it would + respond with empty list of data so api use "x-fb-ads-insights-throttle" + header to update current insights throttle limit. + """ + self._api.account.get_insights() diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py new file mode 100644 index 0000000000000..9f5c2aae315bd --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_insight_streams.py @@ -0,0 +1,260 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import copy +import logging +from typing import ( + Any, + Iterable, + Iterator, + List, + Mapping, + MutableMapping, + Optional, + Union, +) + +import airbyte_cdk.sources.utils.casing as casing +import pendulum +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams.core import package_name_from_class +from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader +from cached_property import cached_property +from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob +from source_facebook_marketing.streams.async_job_manager import InsightAsyncJobManager + +from .base_streams import FBMarketingIncrementalStream + +logger = logging.getLogger("airbyte") + + +class AdsInsights(FBMarketingIncrementalStream): + """doc: https://developers.facebook.com/docs/marketing-api/insights""" + + cursor_field = "date_start" + + ALL_ACTION_ATTRIBUTION_WINDOWS = [ + "1d_click", + "7d_click", + "28d_click", + "1d_view", + "7d_view", + "28d_view", + ] + + ALL_ACTION_BREAKDOWNS = [ + "action_type", + "action_target_id", + "action_destination", + ] + + # Facebook store metrics maximum of 37 months old. Any time range that + # older that 37 months from current date would result in 400 Bad request + # HTTP response. + # https://developers.facebook.com/docs/marketing-api/reference/ad-account/insights/#overview + INSIGHTS_RETENTION_PERIOD = pendulum.duration(months=37) + # Facebook freezes insight data 28 days after it was generated, which means that all data + # from the past 28 days may have changed since we last emitted it, so we retrieve it again. + INSIGHTS_LOOKBACK_PERIOD = pendulum.duration(days=28) + + action_breakdowns = ALL_ACTION_BREAKDOWNS + level = "ad" + action_attribution_windows = ALL_ACTION_ATTRIBUTION_WINDOWS + time_increment = 1 + + breakdowns = [] + + def __init__( + self, + name: str = None, + fields: List[str] = None, + breakdowns: List[str] = None, + action_breakdowns: List[str] = None, + **kwargs, + ): + super().__init__(**kwargs) + self._start_date = self._start_date.date() + self._end_date = self._end_date.date() + self._fields = fields + self.action_breakdowns = action_breakdowns or self.action_breakdowns + self.breakdowns = breakdowns or self.breakdowns + self._new_class_name = name + + # state + self._cursor_value: Optional[pendulum.Date] = None # latest period that was read + self._next_cursor_value = self._get_start_date() + self._completed_slices = set() + + @property + def name(self) -> str: + """We override stream name to let the user change it via configuration.""" + name = self._new_class_name or self.__class__.__name__ + return casing.camel_to_snake(name) + + @property + def primary_key(self) -> Optional[Union[str, List[str], List[List[str]]]]: + """Build complex PK based on slices and breakdowns""" + return ["date_start", "ad_id"] + self.breakdowns + + def _get_campaign_ids(self, params) -> List[str]: + campaign_params = copy.deepcopy(params) + campaign_params.update(fields=["campaign_id"], level="campaign") + result = self._api.account.get_insights(params=campaign_params) + return list(set(row["campaign_id"] for row in result)) + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """Waits for current job to finish (slice) and yield its result""" + job = stream_slice["insight_job"] + for obj in job.get_result(): + yield obj.export_all_data() + + self._completed_slices.add(job.key) + if job.key == self._next_cursor_value: + self._advance_cursor() + + @property + def state(self) -> MutableMapping[str, Any]: + """State getter, the result can be stored by the source""" + if self._cursor_value: + return { + self.cursor_field: self._cursor_value.isoformat(), + "slices": [d.isoformat() for d in self._completed_slices], + } + + if self._completed_slices: + return { + "slices": [d.isoformat() for d in self._completed_slices], + } + + return {} + + @state.setter + def state(self, value: MutableMapping[str, Any]): + """State setter""" + self._cursor_value = pendulum.parse(value[self.cursor_field]).date() if value.get(self.cursor_field) else None + self._completed_slices = set(pendulum.parse(v).date() for v in value.get("slices", [])) + self._next_cursor_value = self._get_start_date() + + def _date_intervals(self) -> Iterator[pendulum.Date]: + date_range = self._end_date - self._next_cursor_value + return date_range.range("days", self.time_increment) + + def _advance_cursor(self): + """Iterate over state, find continuing sequence of slices. Get last value, advance cursor there and remove slices from state""" + for ts_start in self._date_intervals(): + if ts_start not in self._completed_slices: + self._next_cursor_value = ts_start + break + self._completed_slices.remove(ts_start) + self._cursor_value = ts_start + + def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]: + """Generator of async jobs + + :param params: + :return: + """ + + for ts_start in self._date_intervals(): + if ts_start in self._completed_slices: + continue + ts_end = ts_start + pendulum.duration(days=self.time_increment - 1) + total_params = { + **params, + "time_range": { + "since": ts_start.to_date_string(), + "until": ts_end.to_date_string(), + }, + } + yield InsightAsyncJob(self._api.api, edge_object=self._api.account, params=total_params, key=ts_start) + + def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]: + """Slice by date periods and schedule async job for each period, run at most MAX_ASYNC_JOBS jobs at the same time. + This solution for Async was chosen because: + 1. we should commit state after each successful job + 2. we should run as many job as possible before checking for result + 3. we shouldn't proceed to consumption of the next job before previous succeed + + generate slice only if it is not in state, + when we finished reading slice (in read_records) we check if current slice is the next one and do advance cursor + + when slice is not next one we just update state with it + to do so source will check state attribute and call get_state, + """ + manager = InsightAsyncJobManager(api=self._api, jobs=self._generate_async_jobs(params=self.request_params())) + for job in manager.completed_jobs(): + yield {"insight_job": job} + + def _get_start_date(self) -> pendulum.Date: + """Get start date to begin sync with. It is not that trivial as it might seem. + There are few rules: + - don't read data older than start_date + - re-read data within last 28 days + - don't read data older than retention date + + :return: the first date to sync + """ + today = pendulum.today().date() + oldest_date = today - self.INSIGHTS_RETENTION_PERIOD + refresh_date = today - self.INSIGHTS_LOOKBACK_PERIOD + + if self._cursor_value: + start_date = self._cursor_value + pendulum.duration(days=self.time_increment) + if start_date > refresh_date: + logger.info( + f"The cursor value within refresh period ({self.INSIGHTS_LOOKBACK_PERIOD}), start sync from {refresh_date} instead." + ) + # FIXME: change cursor logic to not update cursor earlier than 28 days, after that we don't need this line + start_date = min(start_date, refresh_date) + + if start_date < self._start_date: + logger.warning(f"Ignore provided state and start sync from start_date ({self._start_date}).") + start_date = max(start_date, self._start_date) + else: + start_date = self._start_date + if start_date < oldest_date: + logger.warning(f"Loading insights older then {self.INSIGHTS_RETENTION_PERIOD} is not possible. Start sync from {oldest_date}.") + + return max(oldest_date, start_date) + + def request_params(self, **kwargs) -> MutableMapping[str, Any]: + return { + "level": self.level, + "action_breakdowns": self.action_breakdowns, + "breakdowns": self.breakdowns, + "fields": self.fields, + "time_increment": self.time_increment, + "action_attribution_windows": self.action_attribution_windows, + } + + def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: + """Works differently for insights, so remove it""" + return {} + + def get_json_schema(self) -> Mapping[str, Any]: + """Add fields from breakdowns to the stream schema + :return: A dict of the JSON schema representing this stream. + """ + loader = ResourceSchemaLoader(package_name_from_class(self.__class__)) + schema = loader.get_schema("ads_insights") + if self._fields: + schema["properties"] = {k: v for k, v in schema["properties"].items() if k in self._fields} + if self.breakdowns: + breakdowns_properties = loader.get_schema("ads_insights_breakdowns")["properties"] + schema["properties"].update({prop: breakdowns_properties[prop] for prop in self.breakdowns}) + return schema + + @cached_property + def fields(self) -> List[str]: + """List of fields that we want to query, for now just all properties from stream's schema""" + if self._fields: + return self._fields + schema = ResourceSchemaLoader(package_name_from_class(self.__class__)).get_schema("ads_insights") + return list(schema.get("properties", {}).keys()) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py new file mode 100644 index 0000000000000..42d00948a833c --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/base_streams.py @@ -0,0 +1,186 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import logging +from abc import ABC +from datetime import datetime +from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping + +import pendulum +from airbyte_cdk.models import SyncMode +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer +from cached_property import cached_property +from facebook_business.adobjects.abstractobject import AbstractObject +from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse + +from .common import deep_merge + +if TYPE_CHECKING: + from source_facebook_marketing.api import API + +logger = logging.getLogger("airbyte") + + +class FBMarketingStream(Stream, ABC): + """Base stream class""" + + primary_key = "id" + transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization) + + page_size = 100 + use_batch = False + + enable_deleted = False + entity_prefix = None + + MAX_BATCH_SIZE = 50 + + def __init__(self, api: "API", include_deleted: bool = False, **kwargs): + super().__init__(**kwargs) + self._api = api + self._include_deleted = include_deleted if self.enable_deleted else False + + @cached_property + def fields(self) -> List[str]: + """List of fields that we want to query, for now just all properties from stream's schema""" + return list(self.get_json_schema().get("properties", {}).keys()) + + def _execute_batch(self, batch): + """Execute batch, retry in case of failures""" + while batch: + batch = batch.execute() + if batch: + logger.info("Retry failed requests in batch") + + def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Iterable[MutableMapping[str, Any]]: + """Execute list of requests in batches""" + records = [] + + def success(response: FacebookResponse): + records.append(response.json()) + + def failure(response: FacebookResponse): + # FIXME: stop sync or retry + logger.warning(f"Request failed with response: {response.body()}") + + api_batch: FacebookAdsApiBatch = self._api.api.new_batch() + for request in pending_requests: + api_batch.add_request(request, success=success, failure=failure) + if len(api_batch) == self.MAX_BATCH_SIZE: + self._execute_batch(api_batch) + yield from records + api_batch: FacebookAdsApiBatch = self._api.api.new_batch() + records = [] + + self._execute_batch(api_batch) + yield from records + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """Main read method used by CDK""" + records_iter = self._read_records(params=self.request_params(stream_state=stream_state)) + loaded_records_iter = (record.api_get(fields=self.fields, pending=self.use_batch) for record in records_iter) + if self.use_batch: + loaded_records_iter = self.execute_in_batch(loaded_records_iter) + + for record in loaded_records_iter: + if isinstance(record, AbstractObject): + yield record.export_all_data() + else: + yield record + + def _read_records(self, params: Mapping[str, Any]) -> Iterable: + """Wrapper around query to backoff errors. + We have default implementation because we still can override read_records so this method is not mandatory. + """ + return [] + + def request_params(self, **kwargs) -> MutableMapping[str, Any]: + """Parameters that should be passed to query_records method""" + params = {"limit": self.page_size} + + if self._include_deleted: + params.update(self._filter_all_statuses()) + + return params + + def _filter_all_statuses(self) -> MutableMapping[str, Any]: + """Filter that covers all possible statuses thus including deleted/archived records""" + filt_values = [ + "active", + "archived", + "completed", + "limited", + "not_delivering", + "deleted", + "not_published", + "pending_review", + "permanently_deleted", + "recently_completed", + "recently_rejected", + "rejected", + "scheduled", + "inactive", + ] + + return { + "filtering": [ + {"field": f"{self.entity_prefix}.delivery_info", "operator": "IN", "value": filt_values}, + ], + } + + +class FBMarketingIncrementalStream(FBMarketingStream, ABC): + cursor_field = "updated_time" + + def __init__(self, start_date: datetime, end_date: datetime, **kwargs): + super().__init__(**kwargs) + self._start_date = pendulum.instance(start_date) + self._end_date = pendulum.instance(end_date) + + def get_updated_state(self, current_stream_state: MutableMapping[str, Any], latest_record: Mapping[str, Any]): + """Update stream state from latest record""" + potentially_new_records_in_the_past = self._include_deleted and not current_stream_state.get("include_deleted", False) + record_value = latest_record[self.cursor_field] + state_value = current_stream_state.get(self.cursor_field) or record_value + max_cursor = max(pendulum.parse(state_value), pendulum.parse(record_value)) + if potentially_new_records_in_the_past: + max_cursor = record_value + + return { + self.cursor_field: str(max_cursor), + "include_deleted": self._include_deleted, + } + + def request_params(self, stream_state: Mapping[str, Any], **kwargs) -> MutableMapping[str, Any]: + """Include state filter""" + params = super().request_params(**kwargs) + params = deep_merge(params, self._state_filter(stream_state=stream_state or {})) + return params + + def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]: + """Additional filters associated with state if any set""" + state_value = stream_state.get(self.cursor_field) + filter_value = self._start_date if not state_value else pendulum.parse(state_value) + + potentially_new_records_in_the_past = self._include_deleted and not stream_state.get("include_deleted", False) + if potentially_new_records_in_the_past: + self.logger.info(f"Ignoring bookmark for {self.name} because of enabled `include_deleted` option") + filter_value = self._start_date + + return { + "filtering": [ + { + "field": f"{self.entity_prefix}.{self.cursor_field}", + "operator": "GREATER_THAN", + "value": filter_value.int_timestamp, + }, + ], + } diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py similarity index 76% rename from airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py rename to airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py index 0ac09cbacd48f..8acf339cd7026 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py @@ -1,10 +1,10 @@ # # Copyright (c) 2021 Airbyte, Inc., all rights reserved. # - +import http.client import logging import sys -from typing import Any, Iterable, Sequence +from typing import Any import backoff import pendulum @@ -13,31 +13,17 @@ # The Facebook API error codes indicating rate-limiting are listed at # https://developers.facebook.com/docs/graph-api/overview/rate-limiting/ FACEBOOK_RATE_LIMIT_ERROR_CODES = (4, 17, 32, 613, 80000, 80001, 80002, 80003, 80004, 80005, 80006, 80008) +FACEBOOK_BATCH_ERROR_CODE = 960 FACEBOOK_UNKNOWN_ERROR_CODE = 99 DEFAULT_SLEEP_INTERVAL = pendulum.duration(minutes=1) logger = logging.getLogger("airbyte") -class FacebookAPIException(Exception): - """General class for all API errors""" - - class JobException(Exception): """Scheduled job failed""" -class JobTimeoutException(JobException): - """Scheduled job timed out""" - - -def batch(iterable: Sequence, size: int = 1) -> Iterable: - """Split sequence in chunks""" - total_size = len(iterable) - for ndx in range(0, total_size, size): - yield iterable[ndx : min(ndx + size, total_size)] - - def retry_pattern(backoff_type, exception, **wait_gen_kwargs): def log_retry_attempt(details): _, exc, _ = sys.exc_info() @@ -47,7 +33,9 @@ def log_retry_attempt(details): def should_retry_api_error(exc): if isinstance(exc, FacebookRequestError): call_rate_limit_error = exc.api_error_code() in FACEBOOK_RATE_LIMIT_ERROR_CODES - return exc.api_transient_error() or exc.api_error_subcode() == FACEBOOK_UNKNOWN_ERROR_CODE or call_rate_limit_error + batch_timeout_error = exc.http_status() == http.client.BAD_REQUEST and exc.api_error_code() == FACEBOOK_BATCH_ERROR_CODE + unknown_error = exc.api_error_subcode() == FACEBOOK_UNKNOWN_ERROR_CODE + return any((exc.api_transient_error(), unknown_error, call_rate_limit_error, batch_timeout_error)) return True return backoff.on_exception( diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py new file mode 100644 index 0000000000000..64eb702432627 --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py @@ -0,0 +1,131 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import base64 +import logging +from typing import Any, Iterable, Iterator, List, Mapping, Optional + +import requests +from airbyte_cdk.models import SyncMode +from cached_property import cached_property + +from .base_insight_streams import AdsInsights +from .base_streams import FBMarketingIncrementalStream, FBMarketingStream + +logger = logging.getLogger("airbyte") + + +def fetch_thumbnail_data_url(url: str) -> Optional[str]: + """Request thumbnail image and return it embedded into the data-link""" + try: + response = requests.get(url) + if response.status_code == requests.status_codes.codes.OK: + _type = response.headers["content-type"] + data = base64.b64encode(response.content) + return f"data:{_type};base64,{data.decode('ascii')}" + else: + logger.warning(f"Got {repr(response)} while requesting thumbnail image.") + except requests.exceptions.RequestException as exc: + logger.warning(f"Got {str(exc)} while requesting thumbnail image.") + return None + + +class AdCreatives(FBMarketingStream): + """AdCreative is append only stream + doc: https://developers.facebook.com/docs/marketing-api/reference/ad-creative + """ + + entity_prefix = "adcreative" + use_batch = True + + def __init__(self, fetch_thumbnail_images: bool = False, **kwargs): + super().__init__(**kwargs) + self._fetch_thumbnail_images = fetch_thumbnail_images + + @cached_property + def fields(self) -> List[str]: + """Remove "thumbnail_data_url" field because it is computed field and it's not a field that we can request from Facebook""" + return [f for f in super().fields if f != "thumbnail_data_url"] + + def read_records( + self, + sync_mode: SyncMode, + cursor_field: List[str] = None, + stream_slice: Mapping[str, Any] = None, + stream_state: Mapping[str, Any] = None, + ) -> Iterable[Mapping[str, Any]]: + """Read with super method and append thumbnail_data_url if enabled""" + for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state): + if self._fetch_thumbnail_images: + record["thumbnail_data_url"] = fetch_thumbnail_data_url(record.get("thumbnail_url")) + yield record + + def _read_records(self, params: Mapping[str, Any]) -> Iterator: + return self._api.account.get_ad_creatives(params=params) + + +class Ads(FBMarketingIncrementalStream): + """doc: https://developers.facebook.com/docs/marketing-api/reference/adgroup""" + + entity_prefix = "ad" + enable_deleted = True + + def _read_records(self, params: Mapping[str, Any]): + return self._api.account.get_ads(params=params, fields=[self.cursor_field]) + + +class AdSets(FBMarketingIncrementalStream): + """doc: https://developers.facebook.com/docs/marketing-api/reference/ad-campaign""" + + entity_prefix = "adset" + enable_deleted = True + + def _read_records(self, params: Mapping[str, Any]): + return self._api.account.get_ad_sets(params=params) + + +class Campaigns(FBMarketingIncrementalStream): + """doc: https://developers.facebook.com/docs/marketing-api/reference/ad-campaign-group""" + + entity_prefix = "campaign" + enable_deleted = True + + def _read_records(self, params: Mapping[str, Any]): + return self._api.account.get_campaigns(params=params) + + +class Videos(FBMarketingIncrementalStream): + """See: https://developers.facebook.com/docs/marketing-api/reference/video""" + + entity_prefix = "video" + enable_deleted = True + + def _read_records(self, params: Mapping[str, Any]) -> Iterator: + return self._api.account.get_ad_videos(params=params) + + +class AdsInsightsAgeAndGender(AdsInsights): + breakdowns = ["age", "gender"] + + +class AdsInsightsCountry(AdsInsights): + breakdowns = ["country"] + + +class AdsInsightsRegion(AdsInsights): + breakdowns = ["region"] + + +class AdsInsightsDma(AdsInsights): + breakdowns = ["dma"] + + +class AdsInsightsPlatformAndDevice(AdsInsights): + breakdowns = ["publisher_platform", "platform_position", "impression_device"] + action_breakdowns = ["action_type"] # FB Async Job fails for unknown reason if we set other breakdowns + + +class AdsInsightsActionType(AdsInsights): + breakdowns = [] + action_breakdowns = ["action_type"] diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py new file mode 100644 index 0000000000000..31945a5b92039 --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/conftest.py @@ -0,0 +1,11 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +from pytest import fixture + + +@fixture(autouse=True) +def time_sleep_mock(mocker): + time_mock = mocker.patch("time.sleep") + yield time_mock diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py index 6bab56862fa87..9d36b3a95f3a2 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job.py @@ -3,54 +3,112 @@ # import time +from typing import Iterator +from unittest.mock import call -import pendulum import pytest -from source_facebook_marketing.api import API -from source_facebook_marketing.async_job import AsyncJob, Status -from source_facebook_marketing.common import JobException, JobTimeoutException +from facebook_business.adobjects.adreportrun import AdReportRun +from facebook_business.api import FacebookAdsApiBatch +from source_facebook_marketing.api import MyFacebookAdsApi +from source_facebook_marketing.streams.async_job import InsightAsyncJob, ParentAsyncJob, Status, chunks @pytest.fixture(name="adreport") -def adreport_fixture(mocker): - response = {"report_run_id": 123} - job = mocker.MagicMock() - job.__getitem__.side_effect = response.__getitem__ - job.__setitem__.side_effect = response.__setitem__ - job.__iter__.side_effect = response.__iter__ - job.__contains__.side_effect = response.__contains__ - return job +def adreport_fixture(mocker, api): + ao = AdReportRun(fbid=123, api=api) + ao["report_run_id"] = 123 + mocker.patch.object(AdReportRun, 'api_get', return_value=ao) + mocker.patch.object(AdReportRun, 'get_result', return_value={}) + return ao + + +@pytest.fixture(name="account") +def account_fixture(mocker, adreport): + account = mocker.Mock() + account.get_insights.return_value = adreport + return account @pytest.fixture(name="job") -def job_fixture(api, mocker): - return AsyncJob(api=api, params=mocker.MagicMock()) +def job_fixture(api, account, mocker): + params = { + "level": "ad", + "action_breakdowns": [], + "breakdowns": [], + "fields": ["field1", "field2"], + "time_increment": 1, + "action_attribution_windows": [], + "time_range": { + "since": "2019-01-01", + "until": "2019-01-01", + }, + } + return InsightAsyncJob(edge_object=account, api=api, params=params) -@pytest.fixture(name="failed_job") -def failed_job_fixture(job, adreport): - adreport["async_status"] = Status.FAILED.value + +@pytest.fixture(name="grouped_jobs") +def grouped_jobs_fixture(mocker): + return [mocker.Mock(spec=InsightAsyncJob, restart_number=0, failed=False, completed=False) for _ in range(10)] + + +@pytest.fixture(name="parent_job") +def parent_job_fixture(api, grouped_jobs): + return ParentAsyncJob(api=api, jobs=grouped_jobs) + + + +@pytest.fixture(name="started_job") +def started_job_fixture(job, adreport): + adreport["async_status"] = Status.RUNNING.value adreport["async_percent_completion"] = 0 job.start() - try: - _ = job.completed - except JobException: - pass - return job +@pytest.fixture(name="completed_job") +def completed_job_fixture(started_job, adreport): + adreport["async_status"] = Status.COMPLETED.value + adreport["async_percent_completion"] = 100 + started_job.update_job() + + return started_job + + +@pytest.fixture(name="failed_job") +def failed_job_fixture(started_job, adreport): + adreport["async_status"] = Status.FAILED.value + adreport["async_percent_completion"] = 0 + started_job.update_job() + + return started_job + + @pytest.fixture(name="api") -def api_fixture(mocker, adreport): - api = mocker.Mock(spec=API) - api.account.get_insights.return_value = adreport - adreport.api_get.return_value = adreport +def api_fixture(mocker): + api = mocker.Mock(spec=MyFacebookAdsApi) + api.call().json.return_value = {} + api.new_batch().execute.return_value = None # short-circuit batch execution of failed jobs, prevent endless loop return api -class TestAsyncJob: +class TestChunks: + def test_two_or_more(self): + result = chunks([1, 2, 3, 4, 5], 2) + + assert isinstance(result, Iterator), "should be iterator/generator" + assert list(result) == [[1, 2], [3, 4], [5]] + + def test_single(self): + result = chunks([1, 2, 3, 4, 5], 6) + + assert isinstance(result, Iterator), "should be iterator/generator" + assert list(result) == [[1, 2, 3, 4, 5]] + + +class TestInsightAsyncJob: def test_start(self, job): job.start() @@ -64,9 +122,12 @@ def test_start_already_started(self, job): job.start() def test_restart(self, failed_job, api, adreport): + assert failed_job.restart_number == 0 + failed_job.restart() assert not failed_job.failed, "restart should reset fail flag" + assert failed_job.restart_number == 1 def test_restart_when_job_not_failed(self, job, api): job.start() @@ -79,16 +140,49 @@ def test_restart_when_job_not_started(self, job): with pytest.raises(RuntimeError, match=r": Incorrect usage of restart - only failed jobs can be restarted"): job.restart() + def test_update_job_not_started(self, job): + with pytest.raises(RuntimeError, match=r": Incorrect usage of the method - the job is not started"): + job.update_job() + + def test_update_job_on_completed_job(self, completed_job, adreport): + completed_job.update_job() + + adreport.api_get.assert_called_once() + + def test_update_job(self, started_job, adreport): + started_job.update_job() + + adreport.api_get.assert_called_once() + + def test_update_job_with_batch(self, started_job, adreport, mocker): + response = mocker.Mock() + + response.json.return_value = {'id': '1128003977936306', 'account_id': '212551616838260', 'time_ref': 1642989751, 'time_completed': 1642989754, + 'async_status': 'Job Completed', 'async_percent_completion': 100, 'date_start': '2021-02-24', 'date_stop': '2021-02-24'} + response.body.return_value = "Some error" + batch_mock = mocker.Mock(spec=FacebookAdsApiBatch) + + started_job.update_job(batch=batch_mock) + + adreport.api_get.assert_called_once() + args, kwargs = adreport.api_get.call_args + assert kwargs["batch"] == batch_mock + + kwargs["success"](response) + assert started_job.completed + + kwargs["failure"](response) + def test_elapsed_time(self, job, api, adreport): assert job.elapsed_time is None, "should be None for the job that is not started" job.start() adreport["async_status"] = Status.COMPLETED.value adreport["async_percent_completion"] = 0 + job.update_job() assert job.elapsed_time, "should be set for the job that is running" - _ = job.completed elapsed_1 = job.elapsed_time time.sleep(1) elapsed_2 = job.elapsed_time @@ -96,47 +190,21 @@ def test_elapsed_time(self, job, api, adreport): assert elapsed_2 == elapsed_1, "should not change after job completed" def test_completed_without_start(self, job, api, adreport): - with pytest.raises(RuntimeError, match=r"Incorrect usage of the method - the job is not started"): - _ = job.completed - + assert not job.completed assert not job.failed - def test_completed_ok(self, job, api, adreport): - job.start() - adreport["async_status"] = Status.COMPLETED.value - adreport["async_percent_completion"] = 0 - - assert job.completed, "should return True if the job was completed" - assert not job.failed, "failed should be set to False" + def test_completed_ok(self, completed_job, api, adreport): + assert completed_job.completed, "should return True if the job was completed" + assert not completed_job.failed, "failed should be set to False" def test_completed_failed(self, failed_job, api, adreport): - with pytest.raises(JobException, match=r"failed after \d* seconds."): - _ = failed_job.completed + assert failed_job.completed + assert failed_job.failed def test_completed_skipped(self, failed_job, api, adreport): adreport["async_status"] = Status.SKIPPED.value - with pytest.raises(JobException, match=r"skipped after \d* seconds."): - _ = failed_job.completed - - def test_completed_timeout(self, job, adreport, mocker): - job.start() - adreport["async_status"] = Status.STARTED.value - adreport["async_percent_completion"] = 1 - mocker.patch.object(job, "MAX_WAIT_TO_FINISH", pendulum.duration()) - mocker.patch.object(job, "MAX_WAIT_TO_START", pendulum.duration()) - - with pytest.raises(JobTimeoutException, match=r" did not finish after \d* seconds."): - _ = job.completed - - def test_completed_timeout_not_started(self, job, adreport, mocker): - job.start() - adreport["async_status"] = Status.STARTED.value - adreport["async_percent_completion"] = 0 - mocker.patch.object(job, "MAX_WAIT_TO_FINISH", pendulum.duration()) - mocker.patch.object(job, "MAX_WAIT_TO_START", pendulum.duration()) - - with pytest.raises(JobTimeoutException, match=r" did not start after \d* seconds."): - _ = job.completed + assert failed_job.completed + assert failed_job.failed def test_failed_no(self, job): assert not job.failed, "should return False for active job" @@ -144,10 +212,10 @@ def test_failed_no(self, job): def test_failed_yes(self, failed_job): assert failed_job.failed, "should return True if the job previously failed" - def test_str(self, api, adreport): - job = AsyncJob(api=api, params={"time_range": 123, "breakdowns": [10, 20]}) + def test_str(self, api, account): + job = InsightAsyncJob(edge_object=account, api=api, params={"time_range": 123, "breakdowns": [10, 20]}) - assert str(job) == "AdReportRun(id=<None>, time_range=123, breakdowns=[10, 20]" + assert str(job) == f"AdReportRun(id=<None>, {account}, time_range=123, breakdowns=[10, 20]" def test_get_result(self, job, adreport): job.start() @@ -164,3 +232,86 @@ def test_get_result_when_job_is_not_started(self, job): def test_get_result_when_job_is_failed(self, failed_job): with pytest.raises(RuntimeError, match=r"Incorrect usage of get_result - the job is not started of failed"): failed_job.get_result() + + def test_split_job(self, job, account, mocker, api): + account.get_insights.return_value = [{"campaign_id": 1}, {"campaign_id": 2}, {"campaign_id": 3}] + parent_job_mock = mocker.patch("source_facebook_marketing.streams.async_job.ParentAsyncJob") + campaign_mock = mocker.patch("source_facebook_marketing.streams.async_job.Campaign") + + parent_job = job.split_job() + + account.get_insights.assert_called_once() + campaign_mock.assert_has_calls([call(1), call(2), call(3)]) + assert parent_job_mock.called + assert parent_job + args, kwargs = parent_job_mock.call_args + assert args == (api,) + assert len(kwargs["jobs"]) == 3, "number of jobs should match number of campaigns" + +class TestParentAsyncJob: + def test_start(self, parent_job, grouped_jobs): + parent_job.start() + for job in grouped_jobs: + job.start.assert_called_once() + + def test_restart(self, parent_job, grouped_jobs): + assert not parent_job.failed, "initially not failed" + + # fail some jobs + grouped_jobs[0].failed = True + grouped_jobs[0].restart_number = 1 + grouped_jobs[5].failed = True + grouped_jobs[0].restart_number = 1 + grouped_jobs[6].restart_number = 3 + + assert parent_job.failed, "should be failed if any job failed" + parent_job.restart() + assert parent_job.failed + assert parent_job.restart_number == 3, "restart should be max value of all jobs" + + def test_completed(self, parent_job, grouped_jobs): + assert not parent_job.completed, "initially not completed" + + # complete some jobs + grouped_jobs[0].completed = True + grouped_jobs[5].completed = True + + assert not parent_job.completed, "not completed until all jobs completed" + + # complete all jobs + for job in grouped_jobs: + job.completed = True + + assert parent_job.completed, "completed because all jobs completed" + + def test_update_job(self, parent_job, grouped_jobs, api): + """Checks jobs status in advance and restart if some failed.""" + # finish some jobs + grouped_jobs[0].completed = True + grouped_jobs[5].completed = True + + parent_job.update_job() + + # assert + batch_mock = api.new_batch() + for i, job in enumerate(grouped_jobs): + if i in (0, 5): + job.update_job.assert_not_called() + else: + job.update_job.assert_called_once_with(batch=batch_mock) + + def test_get_result(self, parent_job, grouped_jobs): + """Retrieve result of the finished job.""" + for job in grouped_jobs: + job.get_result.return_value = [] + grouped_jobs[0].get_result.return_value = range(3, 8) + grouped_jobs[6].get_result.return_value = range(4, 11) + + generator = parent_job.get_result() + + assert isinstance(generator, Iterator) + assert list(generator) == list(range(3, 8)) + list(range(4, 11)) + + def test_split_job(self, parent_job): + with pytest.raises(RuntimeError, match="Splitting of ParentAsyncJob is not allowed."): + parent_job.split_job() diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py new file mode 100644 index 0000000000000..0beba538aead1 --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_async_job_manager.py @@ -0,0 +1,29 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import pytest +from source_facebook_marketing.streams.async_job_manager import InsightAsyncJobManager + + +@pytest.fixture(name="api") +def api_fixture(mocker): + api = mocker.Mock() + api.api.ads_insights_throttle = (0, 0) + return api + + +class TestInsightAsyncManager: + def test_jobs_empty(self, api): + manager = InsightAsyncJobManager(api=api, jobs=[]) + jobs = list(manager.completed_jobs()) + assert not jobs + + def test_job_restarted(self): + """TODO""" + + def test_job_split(self): + """TODO""" + + def test_job_failed_too_many_times(self): + """TODO""" diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py index 046c5da95a748..f14d8ab2d598a 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_client.py @@ -28,7 +28,7 @@ def some_config_fixture(account_id): @pytest.fixture(autouse=True) def mock_default_sleep_interval(mocker): - mocker.patch("source_facebook_marketing.common.DEFAULT_SLEEP_INTERVAL", return_value=pendulum.duration(seconds=5)) + mocker.patch("source_facebook_marketing.streams.common.DEFAULT_SLEEP_INTERVAL", return_value=pendulum.duration(seconds=5)) @pytest.fixture(name="api") @@ -42,7 +42,10 @@ def api_fixture(some_config, requests_mock, fb_account_response): @pytest.fixture(name="fb_call_rate_response") def fb_call_rate_response_fixture(): error = { - "message": "(#80000) There have been too many calls from this ad-account. Wait a bit and try again. For more info, please refer to https://developers.facebook.com/docs/graph-api/overview/rate-limiting.", + "message": ( + "(#80000) There have been too many calls from this ad-account. Wait a bit and try again. " + "For more info, please refer to https://developers.facebook.com/docs/graph-api/overview/rate-limiting." + ), "type": "OAuthException", "code": 80000, "error_subcode": 2446079, diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_deep_merge.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_deep_merge.py index 1941e61848474..2089e0cac64d3 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_deep_merge.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_deep_merge.py @@ -4,7 +4,7 @@ from copy import deepcopy -from source_facebook_marketing.common import deep_merge +from source_facebook_marketing.streams.common import deep_merge def test_return_new_object(): diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py new file mode 100644 index 0000000000000..4da6c6cc673bc --- /dev/null +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_source.py @@ -0,0 +1,64 @@ +# +# Copyright (c) 2021 Airbyte, Inc., all rights reserved. +# + +import pydantic +import pytest +from airbyte_cdk.models import ConnectorSpecification +from source_facebook_marketing import SourceFacebookMarketing + + +@pytest.fixture(name="config") +def config_fixture(): + config = { + "account_id": 123, + "access_token": "TOKEN", + "start_date": "2019-10-10T00:00:00" + } + + return config + + +@pytest.fixture(name="api") +def api_fixture(mocker): + api_mock = mocker.patch("source_facebook_marketing.source.API") + api_mock.return_value = mocker.Mock(account=123) + return api_mock + + +@pytest.fixture(name="logger_mock") +def logger_mock_fixture(mocker): + return mocker.patch("source_facebook_marketing.source.logger") + +class TestSourceFacebookMarketing: + def test_check_connection_ok(self, api, config, logger_mock): + ok, error_msg = SourceFacebookMarketing().check_connection(logger_mock, config=config) + + assert ok + assert not error_msg + api.assert_called_once_with(account_id="123", access_token="TOKEN") + logger_mock.info.assert_called_once_with(f"Select account {api.return_value.account}") + + def test_check_connection_invalid_config(self, api, config, logger_mock): + config.pop("start_date") + + with pytest.raises(pydantic.ValidationError): + SourceFacebookMarketing().check_connection(logger_mock, config=config) + + assert not api.called + + def test_check_connection_exception(self, api, config, logger_mock): + api.side_effect = RuntimeError("Something went wrong!") + + with pytest.raises(RuntimeError, match="Something went wrong!"): + SourceFacebookMarketing().check_connection(logger_mock, config=config) + + def test_streams(self, config, api): + streams = SourceFacebookMarketing().streams(config) + + assert len(streams) == 12 + + def test_spec(self): + spec = SourceFacebookMarketing().spec() + + assert isinstance(spec, ConnectorSpecification) diff --git a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py index a43e7493aa267..e69de29bb2d1d 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/unit_tests/test_streams.py @@ -1,47 +0,0 @@ -# -# Copyright (c) 2021 Airbyte, Inc., all rights reserved. -# - -from source_facebook_marketing.streams import remove_params_from_url - - -class TestUrlParsing: - def test_empty_url(self): - url = "" - parsed_url = remove_params_from_url(url=url, params=[]) - assert parsed_url == url - - def test_does_not_raise_exception_for_invalid_url(self): - url = "abcd" - parsed_url = remove_params_from_url(url=url, params=["test"]) - assert parsed_url == url - - def test_escaped_characters(self): - url = "https://google.com?test=123%23%24%25%2A&test2=456" - parsed_url = remove_params_from_url(url=url, params=["test3"]) - assert parsed_url == url - - def test_no_params_url(self): - url = "https://google.com" - parsed_url = remove_params_from_url(url=url, params=["test"]) - assert parsed_url == url - - def test_no_params_arg(self): - url = "https://google.com?" - parsed_url = remove_params_from_url(url=url, params=["test"]) - assert parsed_url == "https://google.com" - - def test_partially_empty_params(self): - url = "https://google.com?test=122&&" - parsed_url = remove_params_from_url(url=url, params=[]) - assert parsed_url == "https://google.com?test=122" - - def test_no_matching_params(self): - url = "https://google.com?test=123" - parsed_url = remove_params_from_url(url=url, params=["test2"]) - assert parsed_url == url - - def test_removes_params(self): - url = "https://google.com?test=123&test2=456" - parsed_url = remove_params_from_url(url=url, params=["test2"]) - assert parsed_url == "https://google.com?test=123" diff --git a/docs/integrations/sources/facebook-marketing.md b/docs/integrations/sources/facebook-marketing.md index 88d3a98f16f9f..513d57ef76f0e 100644 --- a/docs/integrations/sources/facebook-marketing.md +++ b/docs/integrations/sources/facebook-marketing.md @@ -98,7 +98,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in | :--- | :--- | :--- | :--- | | 0.2.31 | 2021-12-28 | [](https://github.com/airbytehq/airbyte/pull/) | Fixed videos stream format field incorrect type | | 0.2.30 | 2021-12-20 | [8962](https://github.com/airbytehq/airbyte/pull/8962) | Added `asset_feed_spec` field to `ad creatives` stream | -| 0.2.29 | 2021-12-17 | [8649](https://github.com/airbytehq/airbyte/pull/8649) | Retrive ad_creatives image as data encoded | +| 0.2.29 | 2021-12-17 | [8649](https://github.com/airbytehq/airbyte/pull/8649) | Retrieve ad_creatives image as data encoded | | 0.2.28 | 2021-12-13 | [8742](https://github.com/airbytehq/airbyte/pull/8742) | Fix for schema generation related to "breakdown" fields | | 0.2.27 | 2021-11-29 | [8257](https://github.com/airbytehq/airbyte/pull/8257) | Add fields to Campaign stream | | 0.2.26 | 2021-11-19 | [7855](https://github.com/airbytehq/airbyte/pull/7855) | Add Video stream | @@ -107,7 +107,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in | 0.2.23 | 2021-11-08 | [7734](https://github.com/airbytehq/airbyte/pull/7734) | Resolve $ref field for discover schema | | 0.2.22 | 2021-11-05 | [7605](https://github.com/airbytehq/airbyte/pull/7605) | Add job retry logics to AdsInsights stream | | 0.2.21 | 2021-10-05 | [4864](https://github.com/airbytehq/airbyte/pull/4864) | Update insights streams with custom entries for fields, breakdowns and action_breakdowns | -| 0.2.20 | 2021-10-04 | [6719](https://github.com/airbytehq/airbyte/pull/6719) | Update version of facebook\_bussiness package to 12.0 | +| 0.2.20 | 2021-10-04 | [6719](https://github.com/airbytehq/airbyte/pull/6719) | Update version of facebook\_business package to 12.0 | | 0.2.19 | 2021-09-30 | [6438](https://github.com/airbytehq/airbyte/pull/6438) | Annotate Oauth2 flow initialization parameters in connector specification | | 0.2.18 | 2021-09-28 | [6499](https://github.com/airbytehq/airbyte/pull/6499) | Fix field values converting fail | | 0.2.17 | 2021-09-14 | [4978](https://github.com/airbytehq/airbyte/pull/4978) | Convert values' types according to schema types | @@ -115,7 +115,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in | 0.2.15 | 2021-09-14 | [5958](https://github.com/airbytehq/airbyte/pull/5958) | Fix url parsing and add report that exposes conversions | | 0.2.14 | 2021-07-19 | [4820](https://github.com/airbytehq/airbyte/pull/4820) | Improve the rate limit management | | 0.2.12 | 2021-06-20 | [3743](https://github.com/airbytehq/airbyte/pull/3743) | Refactor connector to use CDK: - Improve error handling. - Improve async job performance \(insights\). - Add new configuration parameter `insights_days_per_job`. - Rename stream `adsets` to `ad_sets`. - Refactor schema logic for insights, allowing to configure any possible insight stream. | -| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook\_bussiness to 11.0 | +| 0.2.10 | 2021-06-16 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Update version of facebook\_business to 11.0 | | 0.2.9 | 2021-06-10 | [3996](https://github.com/airbytehq/airbyte/pull/3996) | Add `AIRBYTE_ENTRYPOINT` for Kubernetes support | | 0.2.8 | 2021-06-09 | [3973](https://github.com/airbytehq/airbyte/pull/3973) | Add 80000 as a rate-limiting error code | | 0.2.7 | 2021-06-03 | [3646](https://github.com/airbytehq/airbyte/pull/3646) | Add missing fields to AdInsights streams |