Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

πŸŽ‰ Source facebook-marketing: add activities stream #10655

Merged
merged 13 commits into from
Mar 1, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.36
LABEL io.airbyte.version=0.2.37
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "activities",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["event_time"],
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"properties": {
"actor_id": {
"type": ["null", "string"]
},
"actor_name": {
"type": ["null", "string"]
},
"application_id": {
"type": ["null", "string"]
},
"application_name": {
"type": ["null", "string"]
},
"date_time_in_timezone": {
"type": ["null", "string"]
},
"event_time": {
"type": "string",
"format": "date-time"
},
"event_type": {
"type": ["null", "string"]
},
"extra_data": {
"type": ["null", "string"]
},
"object_id": {
"type": ["null", "string"]
},
"object_name": {
"type": ["null", "string"]
},
"object_type": {
"type": ["null", "string"]
},
"translated_event_type": {
"type": ["null", "string"]
}
},
"type": ["null", "object"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from source_facebook_marketing.api import API
from source_facebook_marketing.spec import ConnectorConfig
from source_facebook_marketing.streams import (
Activities,
AdAccount,
AdCreatives,
Ads,
Expand Down Expand Up @@ -61,7 +62,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
start_date=config.start_date,
end_date=config.end_date,
)

streams = [
AdAccount(api=api),
AdSets(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Expand All @@ -77,6 +77,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
Campaigns(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Images(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Videos(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Activities(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
]

return self._update_insights_streams(insights=config.custom_insights, args=insights_args, streams=streams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

from .streams import (
Activities,
AdAccount,
AdCreatives,
Ads,
Expand Down Expand Up @@ -34,4 +35,5 @@
"Campaigns",
"Images",
"Videos",
"Activities",
]
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import logging
from typing import Any, Iterable, List, Mapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from cached_property import cached_property
from facebook_business.adobjects.abstractobject import AbstractObject
from facebook_business.adobjects.adaccount import AdAccount as FBAdAccount

from .base_insight_streams import AdsInsights
Expand Down Expand Up @@ -93,6 +95,45 @@ def list_objects(self, params: Mapping[str, Any]) -> Iterable:
return self._api.account.get_campaigns(params=params)


class Activities(FBMarketingIncrementalStream):
"""doc: https://developers.facebook.com/docs/marketing-api/reference/ad-activity"""

entity_prefix = "activity"
cursor_field = "event_time"
primary_key = None

def list_objects(self, fields: List[str], params: Mapping[str, Any]) -> Iterable:
return self._api.account.get_activities(fields=fields, params=params)

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""Main read method used by CDK"""
loaded_records_iter = self.list_objects(fields=self.fields, params=self.request_params(stream_state=stream_state))

for record in loaded_records_iter:
if isinstance(record, AbstractObject):
yield record.export_all_data() # convert FB object to dict
else:
yield record # execute_in_batch will emmit dicts

def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
"""Additional filters associated with state if any set"""
state_value = stream_state.get(self.cursor_field)
since = self._start_date if not state_value else pendulum.parse(state_value)

potentially_new_records_in_the_past = self._include_deleted and not stream_state.get("include_deleted", False)
if potentially_new_records_in_the_past:
self.logger.info(f"Ignoring bookmark for {self.name} because of enabled `include_deleted` option")
since = self._start_date

return {"since": since.int_timestamp}


class Videos(FBMarketingIncrementalStream):
"""See: https://developers.facebook.com/docs/marketing-api/reference/video"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_check_connection_exception(self, api, config, logger_mock):
def test_streams(self, config, api):
streams = SourceFacebookMarketing().streams(config)

assert len(streams) == 14
assert len(streams) == 15

def test_spec(self):
spec = SourceFacebookMarketing().spec()
Expand Down
2 changes: 2 additions & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This Source is capable of syncing the following tables and their data:
* [AdInsights](https://developers.facebook.com/docs/marketing-api/reference/adgroup/insights/)
* [AdAccount](https://developers.facebook.com/docs/marketing-api/reference/ad-account)
* [Images](https://developers.facebook.com/docs/marketing-api/reference/ad-image)
* [Activities](https://developers.facebook.com/docs/marketing-api/reference/ad-activity)

You can segment the AdInsights table into parts based on the following information. Each part will be synced as a separate table if normalization is enabled:

Expand Down Expand Up @@ -103,6 +104,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.37 | 2022-02-28 | [10655](https://github.com/airbytehq/airbyte/pull/10655) | Add Activities stream |
| 0.2.36 | 2022-02-24 | [10588](https://github.com/airbytehq/airbyte/pull/10588) | Fix `execute_in_batch` for large amount of requests |
| 0.2.35 | 2022-02-18 | [10348](https://github.com/airbytehq/airbyte/pull/10348) | Add 104 error code to backoff triggers |
| 0.2.34 | 2022-02-17 | [10180](https://github.com/airbytehq/airbyte/pull/9805) | Performance and reliability fixes |
Expand Down