Skip to content

Commit

Permalink
🎉 Source Facebook Marketing: add activities stream (#10655)
Browse files Browse the repository at this point in the history
* add facebook marketing activities stream

* update incremental test

* add overrides for activities specific logic

* formatting

* update readme docs

* remove test limitation

* update dockerfile airbyte version

* correct tests

* bump connector version in config module

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
  • Loading branch information
PhilipCorr and marcosmarxm authored Mar 1, 2022
1 parent df47987 commit a8f7365
Show file tree
Hide file tree
Showing 11 changed files with 110 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@
- name: Facebook Marketing
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
dockerRepository: airbyte/source-facebook-marketing
dockerImageTag: 0.2.36
dockerImageTag: 0.2.37
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
icon: facebook.svg
sourceType: api
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1648,7 +1648,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-facebook-marketing:0.2.36"
- dockerImage: "airbyte/source-facebook-marketing:0.2.37"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
changelogUrl: "https://docs.airbyte.io/integrations/sources/facebook-marketing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,5 @@ RUN pip install .
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]


LABEL io.airbyte.version=0.2.36
LABEL io.airbyte.version=0.2.37
LABEL io.airbyte.name=airbyte/source-facebook-marketing
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@
"cursor_field": null,
"destination_sync_mode": "append",
"primary_key": null
},
{
"stream": {
"name": "activities",
"json_schema": {},
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_cursor": true,
"default_cursor_field": ["event_time"],
"namespace": null
},
"sync_mode": "incremental",
"cursor_field": null,
"destination_sync_mode": "append"
}
]
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
{
"activities": {
"event_time": "2121-07-25T13:34:26Z",
"include_deleted": true
},
"campaigns": {
"updated_time": "2121-07-25T13:34:26Z",
"include_deleted": true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"properties": {
"actor_id": {
"type": ["null", "string"]
},
"actor_name": {
"type": ["null", "string"]
},
"application_id": {
"type": ["null", "string"]
},
"application_name": {
"type": ["null", "string"]
},
"date_time_in_timezone": {
"type": ["null", "string"]
},
"event_time": {
"type": "string",
"format": "date-time"
},
"event_type": {
"type": ["null", "string"]
},
"extra_data": {
"type": ["null", "string"]
},
"object_id": {
"type": ["null", "string"]
},
"object_name": {
"type": ["null", "string"]
},
"object_type": {
"type": ["null", "string"]
},
"translated_event_type": {
"type": ["null", "string"]
}
},
"type": ["null", "object"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from source_facebook_marketing.api import API
from source_facebook_marketing.spec import ConnectorConfig
from source_facebook_marketing.streams import (
Activities,
AdAccount,
AdCreatives,
Ads,
Expand Down Expand Up @@ -61,7 +62,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
start_date=config.start_date,
end_date=config.end_date,
)

streams = [
AdAccount(api=api),
AdSets(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Expand All @@ -77,6 +77,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Type[Stream]]:
Campaigns(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Images(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Videos(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
Activities(api=api, start_date=config.start_date, end_date=config.end_date, include_deleted=config.include_deleted),
]

return self._update_insights_streams(insights=config.custom_insights, args=insights_args, streams=streams)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#

from .streams import (
Activities,
AdAccount,
AdCreatives,
Ads,
Expand Down Expand Up @@ -34,4 +35,5 @@
"Campaigns",
"Images",
"Videos",
"Activities",
]
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@
import logging
from typing import Any, Iterable, List, Mapping, Optional

import pendulum
import requests
from airbyte_cdk.models import SyncMode
from cached_property import cached_property
from facebook_business.adobjects.abstractobject import AbstractObject
from facebook_business.adobjects.adaccount import AdAccount as FBAdAccount

from .base_insight_streams import AdsInsights
Expand Down Expand Up @@ -93,6 +95,45 @@ def list_objects(self, params: Mapping[str, Any]) -> Iterable:
return self._api.account.get_campaigns(params=params)


class Activities(FBMarketingIncrementalStream):
"""doc: https://developers.facebook.com/docs/marketing-api/reference/ad-activity"""

entity_prefix = "activity"
cursor_field = "event_time"
primary_key = None

def list_objects(self, fields: List[str], params: Mapping[str, Any]) -> Iterable:
return self._api.account.get_activities(fields=fields, params=params)

def read_records(
self,
sync_mode: SyncMode,
cursor_field: List[str] = None,
stream_slice: Mapping[str, Any] = None,
stream_state: Mapping[str, Any] = None,
) -> Iterable[Mapping[str, Any]]:
"""Main read method used by CDK"""
loaded_records_iter = self.list_objects(fields=self.fields, params=self.request_params(stream_state=stream_state))

for record in loaded_records_iter:
if isinstance(record, AbstractObject):
yield record.export_all_data() # convert FB object to dict
else:
yield record # execute_in_batch will emmit dicts

def _state_filter(self, stream_state: Mapping[str, Any]) -> Mapping[str, Any]:
"""Additional filters associated with state if any set"""
state_value = stream_state.get(self.cursor_field)
since = self._start_date if not state_value else pendulum.parse(state_value)

potentially_new_records_in_the_past = self._include_deleted and not stream_state.get("include_deleted", False)
if potentially_new_records_in_the_past:
self.logger.info(f"Ignoring bookmark for {self.name} because of enabled `include_deleted` option")
since = self._start_date

return {"since": since.int_timestamp}


class Videos(FBMarketingIncrementalStream):
"""See: https://developers.facebook.com/docs/marketing-api/reference/video"""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def test_check_connection_exception(self, api, config, logger_mock):
def test_streams(self, config, api):
streams = SourceFacebookMarketing().streams(config)

assert len(streams) == 14
assert len(streams) == 15

def test_spec(self):
spec = SourceFacebookMarketing().spec()
Expand Down
2 changes: 2 additions & 0 deletions docs/integrations/sources/facebook-marketing.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ This Source is capable of syncing the following tables and their data:
* [AdInsights](https://developers.facebook.com/docs/marketing-api/reference/adgroup/insights/)
* [AdAccount](https://developers.facebook.com/docs/marketing-api/reference/ad-account)
* [Images](https://developers.facebook.com/docs/marketing-api/reference/ad-image)
* [Activities](https://developers.facebook.com/docs/marketing-api/reference/ad-activity)

You can segment the AdInsights table into parts based on the following information. Each part will be synced as a separate table if normalization is enabled:

Expand Down Expand Up @@ -103,6 +104,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.37 | 2022-02-28 | [10655](https://github.com/airbytehq/airbyte/pull/10655) | Add Activities stream |
| 0.2.36 | 2022-02-24 | [10588](https://github.com/airbytehq/airbyte/pull/10588) | Fix `execute_in_batch` for large amount of requests |
| 0.2.35 | 2022-02-18 | [10348](https://github.com/airbytehq/airbyte/pull/10348) | Add 104 error code to backoff triggers |
| 0.2.34 | 2022-02-17 | [10180](https://github.com/airbytehq/airbyte/pull/9805) | Performance and reliability fixes |
Expand Down

0 comments on commit a8f7365

Please sign in to comment.