Skip to content

Commit 8d560d8

Browse files
committed
generalize batch execution, add use_batch flag
fix SAT
1 parent 2e56c22 commit 8d560d8

File tree

4 files changed

+50
-62
lines changed

4 files changed

+50
-62
lines changed

airbyte-integrations/connectors/source-facebook-marketing/acceptance-test-config.yml

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ tests:
1313
- config_path: "secrets/config.json"
1414
basic_read:
1515
- config_path: "secrets/config.json"
16+
empty_streams: ["videos"]
1617
incremental:
1718
- config_path: "secrets/config.json"
1819
configured_catalog_path: "integration_tests/configured_catalog_without_insights.json"

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
import json
66
import logging
7-
from datetime import datetime
87
from time import sleep
98
from typing import Tuple
109

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/common.py

-7
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,6 @@ class JobException(Exception):
2424
"""Scheduled job failed"""
2525

2626

27-
def batch(iterable: Sequence, size: int = 1) -> Iterable:
28-
"""Split sequence in chunks"""
29-
total_size = len(iterable)
30-
for ndx in range(0, total_size, size):
31-
yield iterable[ndx : min(ndx + size, total_size)]
32-
33-
3427
def retry_pattern(backoff_type, exception, **wait_gen_kwargs):
3528
def log_retry_attempt(details):
3629
_, exc, _ = sys.exc_info()

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py

+49-54
Original file line numberDiff line numberDiff line change
@@ -4,41 +4,26 @@
44

55
import base64
66
import logging
7-
import urllib.parse as urlparse
87
from abc import ABC
98
from datetime import datetime
10-
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Sequence, TYPE_CHECKING, Optional
9+
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, TYPE_CHECKING, Optional
1110

1211
import pendulum
1312
import requests
1413
from airbyte_cdk.models import SyncMode
1514
from airbyte_cdk.sources.streams import Stream
1615
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
1716
from cached_property import cached_property
17+
from facebook_business.adobjects.abstractobject import AbstractObject
1818
from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse
19-
from .common import batch, deep_merge
19+
from .common import deep_merge
2020

2121
if TYPE_CHECKING:
2222
from source_facebook_marketing.api import API
2323

2424
logger = logging.getLogger("airbyte")
2525

2626

27-
def remove_params_from_url(url: str, params: List[str]) -> str:
28-
"""
29-
Parses a URL and removes the query parameters specified in params
30-
:param url: URL
31-
:param params: list of query parameters
32-
:return: URL with params removed
33-
"""
34-
parsed = urlparse.urlparse(url)
35-
query = urlparse.parse_qs(parsed.query, keep_blank_values=True)
36-
filtered = dict((k, v) for k, v in query.items() if k not in params)
37-
return urlparse.urlunparse(
38-
[parsed.scheme, parsed.netloc, parsed.path, parsed.params, urlparse.urlencode(filtered, doseq=True), parsed.fragment]
39-
)
40-
41-
4227
def fetch_thumbnail_data_url(url: str) -> Optional[str]:
4328
"""Request thumbnail image and return it embedded into the data-link"""
4429
try:
@@ -47,8 +32,10 @@ def fetch_thumbnail_data_url(url: str) -> Optional[str]:
4732
_type = response.headers["content-type"]
4833
data = base64.b64encode(response.content)
4934
return f"data:{_type};base64,{data.decode('ascii')}"
50-
except requests.exceptions.RequestException:
51-
pass
35+
else:
36+
logger.warning(f"Got {repr(response)} while requesting thumbnail image.")
37+
except requests.exceptions.RequestException as exc:
38+
logger.warning(f"Got {str(exc)} while requesting thumbnail image.")
5239
return None
5340

5441

@@ -59,10 +46,13 @@ class FBMarketingStream(Stream, ABC):
5946
transformer: TypeTransformer = TypeTransformer(TransformConfig.DefaultSchemaNormalization)
6047

6148
page_size = 100
49+
use_batch = False
6250

6351
enable_deleted = False
6452
entity_prefix = None
6553

54+
MAX_BATCH_SIZE = 50
55+
6656
def __init__(self, api: 'API', include_deleted: bool = False, **kwargs):
6757
super().__init__(**kwargs)
6858
self._api = api
@@ -73,26 +63,36 @@ def fields(self) -> List[str]:
7363
"""List of fields that we want to query, for now just all properties from stream's schema"""
7464
return list(self.get_json_schema().get("properties", {}).keys())
7565

76-
def execute_in_batch(self, requests: Iterable[FacebookRequest]) -> Sequence[MutableMapping[str, Any]]:
66+
def _execute_batch(self, batch):
67+
"""Execute batch, retry in case of failures"""
68+
while batch:
69+
batch = batch.execute()
70+
if batch:
71+
logger.info("Retry failed requests in batch")
72+
73+
def execute_in_batch(self, pending_requests: Iterable[FacebookRequest]) -> Iterable[MutableMapping[str, Any]]:
7774
"""Execute list of requests in batches"""
7875
records = []
7976

8077
def success(response: FacebookResponse):
8178
records.append(response.json())
8279

8380
def failure(response: FacebookResponse):
84-
logger.info(f"Request failed with response: {response.body()}")
81+
# FIXME: stop sync or retry
82+
logger.warning(f"Request failed with response: {response.body()}")
8583

8684
api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
87-
for request in requests:
85+
for request in pending_requests:
8886
api_batch.add_request(request, success=success, failure=failure)
87+
if len(api_batch) == self.MAX_BATCH_SIZE:
88+
self._execute_batch(api_batch)
8989

90-
while api_batch:
91-
api_batch = api_batch.execute()
92-
if api_batch:
93-
logger.info("Retry failed requests in batch")
90+
yield from records
91+
api_batch: FacebookAdsApiBatch = self._api.api.new_batch()
92+
records = []
9493

95-
return records
94+
self._execute_batch(api_batch)
95+
yield from records
9696

9797
def read_records(
9898
self,
@@ -102,19 +102,23 @@ def read_records(
102102
stream_state: Mapping[str, Any] = None,
103103
) -> Iterable[Mapping[str, Any]]:
104104
"""Main read method used by CDK"""
105-
for record in self._read_records(params=self.request_params(stream_state=stream_state)):
106-
yield self._extend_record(record, fields=self.fields)
105+
records_iter = self._read_records(params=self.request_params(stream_state=stream_state))
106+
loaded_records_iter = (record.api_get(fields=self.fields, pending=self.use_batch) for record in records_iter)
107+
if self.use_batch:
108+
loaded_records_iter = self.execute_in_batch(loaded_records_iter)
109+
110+
for record in loaded_records_iter:
111+
if isinstance(record, AbstractObject):
112+
yield record.export_all_data()
113+
else:
114+
yield record
107115

108116
def _read_records(self, params: Mapping[str, Any]) -> Iterable:
109117
"""Wrapper around query to backoff errors.
110118
We have default implementation because we still can override read_records so this method is not mandatory.
111119
"""
112120
return []
113121

114-
def _extend_record(self, obj: Any, **kwargs):
115-
"""Wrapper around api_get to backoff errors"""
116-
return obj.api_get(**kwargs).export_all_data()
117-
118122
def request_params(self, **kwargs) -> MutableMapping[str, Any]:
119123
"""Parameters that should be passed to query_records method"""
120124
params = {"limit": self.page_size}
@@ -205,39 +209,30 @@ class AdCreatives(FBMarketingStream):
205209
"""
206210

207211
entity_prefix = "adcreative"
208-
batch_size = 50
212+
use_batch = True
209213

210214
def __init__(self, fetch_thumbnail_images: bool = False, **kwargs):
211215
super().__init__(**kwargs)
212216
self._fetch_thumbnail_images = fetch_thumbnail_images
213217

218+
@cached_property
219+
def fields(self) -> List[str]:
220+
""" Remove "thumbnail_data_url" field because it is computed field and it's not a field that we can request from Facebook
221+
"""
222+
return [f for f in super().fields if f != "thumbnail_data_url"]
223+
214224
def read_records(
215225
self,
216226
sync_mode: SyncMode,
217227
cursor_field: List[str] = None,
218228
stream_slice: Mapping[str, Any] = None,
219229
stream_state: Mapping[str, Any] = None,
220230
) -> Iterable[Mapping[str, Any]]:
221-
"""Read records using batch API"""
222-
records = self._read_records(params=self.request_params(stream_state=stream_state))
223-
# "thumbnail_data_url" is a field in our stream's schema because we
224-
# output it (see fix_thumbnail_urls below), but it's not a field that
225-
# we can request from Facebook
226-
request_fields = [f for f in self.fields if f != "thumbnail_data_url"]
227-
requests = [record.api_get(fields=request_fields, pending=True) for record in records]
228-
for requests_batch in batch(requests, size=self.batch_size):
229-
for record in self.execute_in_batch(requests_batch):
230-
yield self.fix_thumbnail_urls(record)
231-
232-
def fix_thumbnail_urls(self, record: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
233-
"""Cleans and, if enabled, fetches thumbnail URLs for each creative."""
234-
# The thumbnail_url contains some extra query parameters that don't affect the validity of the URL, but break SAT
235-
thumbnail_url = record.get("thumbnail_url")
236-
if thumbnail_url:
237-
record["thumbnail_url"] = remove_params_from_url(thumbnail_url, ["_nc_hash", "d"])
231+
"""Read with super method and append thumbnail_data_url if enabled"""
232+
for record in super().read_records(sync_mode, cursor_field, stream_slice, stream_state):
238233
if self._fetch_thumbnail_images:
239-
record["thumbnail_data_url"] = fetch_thumbnail_data_url(thumbnail_url)
240-
return record
234+
record["thumbnail_data_url"] = fetch_thumbnail_data_url(record.get("thumbnail_url"))
235+
yield record
241236

242237
def _read_records(self, params: Mapping[str, Any]) -> Iterator:
243238
return self._api.account.get_ad_creatives(params=params)

0 commit comments

Comments
 (0)