Skip to content

Commit ae503c3

Browse files
committed
format
1 parent 06a75ef commit ae503c3

File tree

6 files changed

+76
-70
lines changed

6 files changed

+76
-70
lines changed

airbyte-integrations/connectors/source-facebook-marketing/integration_tests/spec.json

+7-25
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,7 @@
1010
"description": "The date from which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated after this date will be replicated.",
1111
"order": 0,
1212
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
13-
"examples": [
14-
"2017-01-25T00:00:00Z"
15-
],
13+
"examples": ["2017-01-25T00:00:00Z"],
1614
"type": "string",
1715
"format": "date-time"
1816
},
@@ -21,19 +19,15 @@
2119
"description": "The date until which you'd like to replicate data for AdCreatives and AdInsights APIs, in the format YYYY-MM-DDT00:00:00Z. All data generated between start_date and this date will be replicated. Not setting this option will result in always syncing the latest data.",
2220
"order": 1,
2321
"pattern": "^[0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2}Z$",
24-
"examples": [
25-
"2017-01-26T00:00:00Z"
26-
],
22+
"examples": ["2017-01-26T00:00:00Z"],
2723
"type": "string",
2824
"format": "date-time"
2925
},
3026
"account_id": {
3127
"title": "Account ID",
3228
"description": "The Facebook Ad account ID to use when pulling data from the Facebook Marketing API.",
3329
"order": 2,
34-
"examples": [
35-
"111111111111111"
36-
],
30+
"examples": ["111111111111111"],
3731
"type": "string"
3832
},
3933
"access_token": {
@@ -99,32 +93,20 @@
9993
}
10094
}
10195
},
102-
"required": [
103-
"name"
104-
]
96+
"required": ["name"]
10597
}
10698
}
10799
},
108-
"required": [
109-
"start_date",
110-
"account_id",
111-
"access_token"
112-
]
100+
"required": ["start_date", "account_id", "access_token"]
113101
},
114102
"supportsIncremental": true,
115-
"supported_destination_sync_modes": [
116-
"append"
117-
],
103+
"supported_destination_sync_modes": ["append"],
118104
"authSpecification": {
119105
"auth_type": "oauth2.0",
120106
"oauth2Specification": {
121107
"rootObject": [],
122108
"oauthFlowInitParameters": [],
123-
"oauthFlowOutputParameters": [
124-
[
125-
"access_token"
126-
]
127-
]
109+
"oauthFlowOutputParameters": [["access_token"]]
128110
}
129111
}
130112
}

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/source.py

+11-5
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,29 @@
44

55
import logging
66
from datetime import datetime
7-
from typing import Any, List, Mapping, Optional, Tuple, Type, Iterator, MutableMapping
7+
from typing import Any, Iterator, List, Mapping, MutableMapping, Optional, Tuple, Type
88

99
import pendulum
1010
from airbyte_cdk.logger import AirbyteLogger
1111
from airbyte_cdk.models import (
1212
AirbyteConnectionStatus,
13+
AirbyteMessage,
1314
AuthSpecification,
15+
ConfiguredAirbyteStream,
1416
ConnectorSpecification,
1517
DestinationSyncMode,
1618
OAuth2Specification,
17-
Status, AirbyteMessage, ConfiguredAirbyteStream, SyncMode,
19+
Status,
20+
SyncMode,
1821
)
1922
from airbyte_cdk.sources import AbstractSource
2023
from airbyte_cdk.sources.config import BaseConfig
2124
from airbyte_cdk.sources.streams import Stream
2225
from airbyte_cdk.sources.streams.core import package_name_from_class
23-
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader, InternalConfig
26+
from airbyte_cdk.sources.utils.schema_helpers import (
27+
InternalConfig,
28+
ResourceSchemaLoader,
29+
)
2430
from pydantic import BaseModel, Field
2531
from source_facebook_marketing.api import API
2632
from source_facebook_marketing.streams import (
@@ -104,7 +110,7 @@ class Config:
104110
custom_insights: Optional[List[InsightConfig]] = Field(
105111
title="Custom Insights",
106112
order=6,
107-
description="A list which contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)"
113+
description="A list which contains insights entries, each entry must have a name and can contains fields, breakdowns or action_breakdowns)",
108114
)
109115

110116

@@ -255,7 +261,7 @@ def _read_incremental(
255261
connector_state: MutableMapping[str, Any],
256262
internal_config: InternalConfig,
257263
) -> Iterator[AirbyteMessage]:
258-
""" We override this method because we need to inject new state handling.
264+
"""We override this method because we need to inject new state handling.
259265
Old way:
260266
pass stream_state in read_records and other methods
261267
call stream_state = stream_instance.get_updated_state(stream_state, record_data) for each record

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job.py

+13-11
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,21 @@
55
import logging
66
from abc import ABC, abstractmethod
77
from enum import Enum
8-
from typing import Any, Mapping, Optional, List
8+
from typing import Any, List, Mapping, Optional
99

1010
import pendulum
1111
from facebook_business.adobjects.adreportrun import AdReportRun
1212
from facebook_business.adobjects.campaign import Campaign
1313
from facebook_business.adobjects.objectparser import ObjectParser
14-
from facebook_business.api import FacebookResponse, FacebookAdsApiBatch
15-
14+
from facebook_business.api import FacebookAdsApiBatch, FacebookResponse
1615

1716
logger = logging.getLogger("airbyte")
1817

1918

2019
def chunks(data, n):
2120
"""Yield successive n-sized chunks from lst."""
2221
for i in range(0, len(data), n):
23-
yield data[i: i + n]
22+
yield data[i : i + n]
2423

2524

2625
class Status(str, Enum):
@@ -61,7 +60,7 @@ def failed(self) -> bool:
6160
"""Tell if the job previously failed"""
6261

6362
@abstractmethod
64-
def update_job(self, batch = None):
63+
def update_job(self, batch=None):
6564
"""Method to retrieve job's status, separated because of retry handler"""
6665

6766
@abstractmethod
@@ -70,8 +69,8 @@ def get_result(self) -> Any:
7069

7170

7271
class ParentAsyncJob(AsyncJob):
73-
""" Group of async jobs
74-
"""
72+
"""Group of async jobs"""
73+
7574
def __init__(self, api, jobs: List[AsyncJob]):
7675
self._api = api
7776
self._jobs = jobs
@@ -129,7 +128,7 @@ def get_result(self) -> Any:
129128
for job in self._jobs:
130129
yield from job.get_result()
131130

132-
def split_job(self) -> 'AsyncJob':
131+
def split_job(self) -> "AsyncJob":
133132
"""Split existing job in few smaller ones grouped by ParentAsyncJob class"""
134133
raise RuntimeError("Splitting of ParentAsyncJob is not allowed.")
135134

@@ -156,7 +155,7 @@ def __init__(self, api, edge_object: Any, params: Mapping[str, Any], key: Option
156155

157156
def split_job(self) -> ParentAsyncJob:
158157
"""Split existing job in few smaller ones grouped by ParentAsyncJob class.
159-
TODO: use some cache to avoid expensive queries across different streams.
158+
TODO: use some cache to avoid expensive queries across different streams.
160159
"""
161160
campaign_params = dict(copy.deepcopy(self._params))
162161
# get campaigns from attribution window as well (28 day + 1 current day)
@@ -177,8 +176,11 @@ def start(self, batch=None):
177176

178177
if batch is not None:
179178
self._edge_object.get_insights(
180-
params=self._params, is_async=True, batch=batch,
181-
success=self._batch_success_handler, failure=self._batch_failure_handler,
179+
params=self._params,
180+
is_async=True,
181+
batch=batch,
182+
success=self._batch_success_handler,
183+
failure=self._batch_failure_handler,
182184
)
183185
else:
184186
self._job = self._edge_object.get_insights(params=self._params, is_async=True)

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/async_job_manager.py

+7-9
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44

55
import logging
66
import time
7-
from typing import Tuple, Iterator, List, TYPE_CHECKING
7+
from typing import TYPE_CHECKING, Iterator, List, Tuple
88

99
from facebook_business.api import FacebookAdsApiBatch
10+
1011
from .async_job import AsyncJob
1112

1213
if TYPE_CHECKING:
@@ -34,8 +35,8 @@ class InsightAsyncJobManager:
3435
MAX_JOBS_IN_QUEUE = 100
3536
MAX_JOBS_TO_CHECK = 50
3637

37-
def __init__(self, api: 'API', jobs: Iterator[AsyncJob]):
38-
""" Init
38+
def __init__(self, api: "API", jobs: Iterator[AsyncJob]):
39+
"""Init
3940
4041
:param api:
4142
:param jobs:
@@ -50,10 +51,7 @@ def _start_jobs(self):
5051
self._update_api_throttle_limit()
5152
self._wait_throttle_limit_down()
5253
prev_jobs_count = len(self._running_jobs)
53-
while (
54-
self._get_current_throttle_value() < self.THROTTLE_LIMIT
55-
and len(self._running_jobs) < self.MAX_JOBS_IN_QUEUE
56-
):
54+
while self._get_current_throttle_value() < self.THROTTLE_LIMIT and len(self._running_jobs) < self.MAX_JOBS_IN_QUEUE:
5755
job = next(iter(self._jobs), None)
5856
if not job:
5957
self._empty = True
@@ -68,7 +66,7 @@ def _start_jobs(self):
6866
)
6967

7068
def completed_jobs(self) -> Iterator[AsyncJob]:
71-
""" Wait until job is ready and return it. If job
69+
"""Wait until job is ready and return it. If job
7270
failed try to restart it for FAILED_JOBS_RESTART_COUNT times. After job
7371
is completed new jobs added according to current throttling limit.
7472
@@ -87,7 +85,7 @@ def completed_jobs(self) -> Iterator[AsyncJob]:
8785
self._start_jobs()
8886

8987
def _check_jobs_status_and_restart(self) -> List[AsyncJob]:
90-
""" Checks jobs status in advance and restart if some failed.
88+
"""Checks jobs status in advance and restart if some failed.
9189
9290
:return: list of completed jobs
9391
"""

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/insights_streams.py

+25-16
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,24 @@
44

55
import copy
66
import logging
7-
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Iterator, Union
7+
from typing import (
8+
Any,
9+
Iterable,
10+
Iterator,
11+
List,
12+
Mapping,
13+
MutableMapping,
14+
Optional,
15+
Union,
16+
)
817

918
import airbyte_cdk.sources.utils.casing as casing
1019
import pendulum
1120
from airbyte_cdk.models import SyncMode
1221
from airbyte_cdk.sources.streams.core import package_name_from_class
1322
from airbyte_cdk.sources.utils.schema_helpers import ResourceSchemaLoader
1423
from cached_property import cached_property
15-
from source_facebook_marketing.streams.async_job import InsightAsyncJob, AsyncJob
24+
from source_facebook_marketing.streams.async_job import AsyncJob, InsightAsyncJob
1625
from source_facebook_marketing.streams.async_job_manager import InsightAsyncJobManager
1726

1827
from .streams import FBMarketingIncrementalStream
@@ -57,12 +66,12 @@ class AdsInsights(FBMarketingIncrementalStream):
5766
breakdowns = []
5867

5968
def __init__(
60-
self,
61-
name: str = None,
62-
fields: List[str] = None,
63-
breakdowns: List[str] = None,
64-
action_breakdowns: List[str] = None,
65-
**kwargs,
69+
self,
70+
name: str = None,
71+
fields: List[str] = None,
72+
breakdowns: List[str] = None,
73+
action_breakdowns: List[str] = None,
74+
**kwargs,
6675
):
6776
super().__init__(**kwargs)
6877
self._start_date = self._start_date.date()
@@ -79,7 +88,7 @@ def __init__(
7988

8089
@property
8190
def name(self) -> str:
82-
""" We override stream name to let the user change it via configuration."""
91+
"""We override stream name to let the user change it via configuration."""
8392
name = self._new_class_name or self.__class__.__name__
8493
return casing.camel_to_snake(name)
8594

@@ -95,11 +104,11 @@ def _get_campaign_ids(self, params) -> List[str]:
95104
return list(set(row["campaign_id"] for row in result))
96105

97106
def read_records(
98-
self,
99-
sync_mode: SyncMode,
100-
cursor_field: List[str] = None,
101-
stream_slice: Mapping[str, Any] = None,
102-
stream_state: Mapping[str, Any] = None,
107+
self,
108+
sync_mode: SyncMode,
109+
cursor_field: List[str] = None,
110+
stream_slice: Mapping[str, Any] = None,
111+
stream_state: Mapping[str, Any] = None,
103112
) -> Iterable[Mapping[str, Any]]:
104113
"""Waits for current job to finish (slice) and yield its result"""
105114
job = stream_slice["insight_job"]
@@ -147,7 +156,7 @@ def _advance_cursor(self):
147156
self._cursor_value = ts_start
148157

149158
def _generate_async_jobs(self, params: Mapping) -> Iterator[AsyncJob]:
150-
""" Generator of async jobs
159+
"""Generator of async jobs
151160
152161
:param params:
153162
:return:
@@ -184,7 +193,7 @@ def stream_slices(self, **kwargs) -> Iterable[Optional[Mapping[str, Any]]]:
184193
yield {"insight_job": job}
185194

186195
def _get_start_date(self) -> pendulum.Date:
187-
""" Get start date to begin sync with. It is not that trivial as it might seem.
196+
"""Get start date to begin sync with. It is not that trivial as it might seem.
188197
There are few rules:
189198
- don't read data older than start_date
190199
- re-read data within last 28 days

airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/streams/streams.py

+13-4
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,16 @@
66
import logging
77
from abc import ABC
88
from datetime import datetime
9-
from typing import Any, Iterable, Iterator, List, Mapping, MutableMapping, TYPE_CHECKING, Optional
9+
from typing import (
10+
TYPE_CHECKING,
11+
Any,
12+
Iterable,
13+
Iterator,
14+
List,
15+
Mapping,
16+
MutableMapping,
17+
Optional,
18+
)
1019

1120
import pendulum
1221
import requests
@@ -16,6 +25,7 @@
1625
from cached_property import cached_property
1726
from facebook_business.adobjects.abstractobject import AbstractObject
1827
from facebook_business.api import FacebookAdsApiBatch, FacebookRequest, FacebookResponse
28+
1929
from .common import deep_merge
2030

2131
if TYPE_CHECKING:
@@ -53,7 +63,7 @@ class FBMarketingStream(Stream, ABC):
5363

5464
MAX_BATCH_SIZE = 50
5565

56-
def __init__(self, api: 'API', include_deleted: bool = False, **kwargs):
66+
def __init__(self, api: "API", include_deleted: bool = False, **kwargs):
5767
super().__init__(**kwargs)
5868
self._api = api
5969
self._include_deleted = include_deleted if self.enable_deleted else False
@@ -216,8 +226,7 @@ def __init__(self, fetch_thumbnail_images: bool = False, **kwargs):
216226

217227
@cached_property
218228
def fields(self) -> List[str]:
219-
""" Remove "thumbnail_data_url" field because it is computed field and it's not a field that we can request from Facebook
220-
"""
229+
"""Remove "thumbnail_data_url" field because it is computed field and it's not a field that we can request from Facebook"""
221230
return [f for f in super().fields if f != "thumbnail_data_url"]
222231

223232
def read_records(

0 commit comments

Comments
 (0)