Skip to content

Commit 25c89a5

Browse files
authored
Source Facebook Marketing: Fix fail when async job takes too long (#7744)
* Update AdsInsights wait_for_job retry logics * Update wait_for_job restart mapping * Fix review comments * Bump docker version
1 parent 6b0e9da commit 25c89a5

File tree

6 files changed

+49
-23
lines changed

6 files changed

+49
-23
lines changed

airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e7778cfc-e97c-4458-9ecb-b4f2bba8946c.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
"sourceDefinitionId": "e7778cfc-e97c-4458-9ecb-b4f2bba8946c",
33
"name": "Facebook Marketing",
44
"dockerRepository": "airbyte/source-facebook-marketing",
5-
"dockerImageTag": "0.2.22",
5+
"dockerImageTag": "0.2.24",
66
"documentationUrl": "https://docs.airbyte.io/integrations/sources/facebook-marketing",
77
"icon": "facebook.svg"
88
}

airbyte-config/init/src/main/resources/seed/source_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@
137137
- name: Facebook Marketing
138138
sourceDefinitionId: e7778cfc-e97c-4458-9ecb-b4f2bba8946c
139139
dockerRepository: airbyte/source-facebook-marketing
140-
dockerImageTag: 0.2.22
140+
dockerImageTag: 0.2.24
141141
documentationUrl: https://docs.airbyte.io/integrations/sources/facebook-marketing
142142
icon: facebook.svg
143143
sourceType: api

airbyte-integrations/connectors/source-facebook-marketing/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,5 @@ RUN pip install .
1212
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
1313
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1414

15-
LABEL io.airbyte.version=0.2.23
15+
LABEL io.airbyte.version=0.2.24
1616
LABEL io.airbyte.name=airbyte/source-facebook-marketing

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/common.py

+4
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ class FacebookAPIException(Exception):
2323
"""General class for all API errors"""
2424

2525

26+
class JobException(Exception):
27+
"""Job failed after FB exception"""
28+
29+
2630
class JobTimeoutException(Exception):
2731
"""Scheduled job timed out"""
2832

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams.py

+41-20
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import time
66
import urllib.parse as urlparse
77
from abc import ABC
8-
from collections import deque
8+
from collections import defaultdict, deque
99
from datetime import datetime
1010
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Sequence
1111

@@ -23,9 +23,10 @@
2323
from facebook_business.exceptions import FacebookRequestError
2424
from source_facebook_marketing.api import API
2525

26-
from .common import FacebookAPIException, JobTimeoutException, batch, deep_merge, retry_pattern
26+
from .common import FacebookAPIException, JobException, JobTimeoutException, batch, deep_merge, retry_pattern
2727

2828
backoff_policy = retry_pattern(backoff.expo, FacebookRequestError, max_tries=5, factor=5)
29+
MAX_JOB_RESTART_TIMES = 6
2930

3031

3132
def remove_params_from_url(url: str, params: List[str]) -> str:
@@ -291,7 +292,7 @@ class AdsInsights(FBMarketingIncrementalStream):
291292
time_increment = 1
292293

293294
running_jobs = deque()
294-
times_job_restarted = {}
295+
times_job_restarted = defaultdict(int)
295296

296297
breakdowns = []
297298

@@ -369,31 +370,51 @@ def wait_for_job(self, job, stream_state: Mapping[str, Any] = None) -> AdReportR
369370

370371
if job["async_status"] == "Job Completed":
371372
return job
372-
elif job["async_status"] in ["Job Failed", "Job Skipped"]:
373+
else:
373374
time_range = (job["date_start"], job["date_stop"])
374-
if self.times_job_restarted.get(time_range, 0) < 6:
375+
if self.times_job_restarted[time_range] < MAX_JOB_RESTART_TIMES:
375376
params = deep_merge(
376377
{"time_range": {"since": job["date_start"], "until": job["date_stop"]}},
377378
self.request_params(stream_state=stream_state),
378379
)
379380
restart_job = self._create_insights_job(params)
380381
self.running_jobs.append(restart_job)
381382
self.times_job_restarted[time_range] += 1
382-
elif job["async_status"] == "Job Failed":
383-
raise JobTimeoutException(f"AdReportRun {job} failed after {runtime.in_seconds()} seconds.")
384-
elif job["async_status"] == "Job Skipped":
385-
raise JobTimeoutException(f"AdReportRun {job} skipped after {runtime.in_seconds()} seconds.")
386-
387-
if runtime > self.MAX_WAIT_TO_START and job_progress_pct == 0:
388-
raise JobTimeoutException(
389-
f"AdReportRun {job} did not start after {runtime.in_seconds()} seconds."
390-
f" This is an intermittent error which may be fixed by retrying the job. Aborting."
391-
)
392-
elif runtime > self.MAX_WAIT_TO_FINISH:
393-
raise JobTimeoutException(
394-
f"AdReportRun {job} did not finish after {runtime.in_seconds()} seconds."
395-
f" This is an intermittent error which may be fixed by retrying the job. Aborting."
396-
)
383+
else:
384+
error_msg = """
385+
AdReportRun job with id={id} {error_type}.
386+
Details: report_run_id={report_run_id},
387+
date_start={date_start}, date_stop={date_stop}
388+
"""
389+
if job["async_status"] == "Job Failed":
390+
raise JobException(
391+
error_msg.format(
392+
error_type="failed",
393+
report_run_id=job["report_run_id"],
394+
date_start=job["date_start"],
395+
date_stop=job["date_stop"],
396+
)
397+
)
398+
elif job["async_status"] == "Job Skipped":
399+
raise JobException(
400+
error_msg.format(
401+
error_type="failed",
402+
report_run_id=job["report_run_id"],
403+
date_start=job["date_start"],
404+
date_stop=job["date_stop"],
405+
)
406+
)
407+
408+
if runtime > self.MAX_WAIT_TO_START and job_progress_pct == 0:
409+
raise JobTimeoutException(
410+
f"AdReportRun {job} did not start after {runtime.in_seconds()} seconds."
411+
f" This is an intermittent error which may be fixed by retrying the job. Aborting."
412+
)
413+
elif runtime > self.MAX_WAIT_TO_FINISH:
414+
raise JobTimeoutException(
415+
f"AdReportRun {job} did not finish after {runtime.in_seconds()} seconds."
416+
f" This is an intermittent error which may be fixed by retrying the job. Aborting."
417+
)
397418
self.logger.info(f"Sleeping {sleep_seconds} seconds while waiting for AdReportRun: {job_id} to complete")
398419
time.sleep(sleep_seconds)
399420
if sleep_seconds < self.MAX_ASYNC_SLEEP.in_seconds():

docs/integrations/sources/facebook-marketing.md

+1
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ As a summary, custom insights allows to replicate only some fields, resulting in
9696

9797
| Version | Date | Pull Request | Subject |
9898
| :--- | :--- | :--- | :--- |
99+
| 0.2.24 | 2021-11-09 | [7744](https://github.com/airbytehq/airbyte/pull/7744) | Fix fail when async job takes too long |
99100
| 0.2.23 | 2021-11-08 | [7734](https://github.com/airbytehq/airbyte/pull/7734) | Resolve $ref field for discover schema |
100101
| 0.2.22 | 2021-11-05 | [7605](https://github.com/airbytehq/airbyte/pull/7605) | Add job retry logics to AdsInsights stream |
101102
| 0.2.21 | 2021-10-05 | [4864](https://github.com/airbytehq/airbyte/pull/4864) | Update insights streams with custom entries for fields, breakdowns and action_breakdowns |

0 commit comments

Comments
 (0)