-
Notifications
You must be signed in to change notification settings - Fork 4.5k
/
Copy pathsource.py
111 lines (97 loc) · 4.81 KB
/
source.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#
# Copyright (c) 2021 Airbyte, Inc., all rights reserved.
#
from typing import Any, List, Mapping, Tuple
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (AdvancedAuth, AuthFlowType,
ConnectorSpecification,
OAuthConfigSpecification, SyncMode)
from airbyte_cdk.models.airbyte_protocol import DestinationSyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.auth import TokenAuthenticator
from .spec import (CompleteOauthOutputSpecification,
CompleteOauthServerInputSpecification,
CompleteOauthServerOutputSpecification,
SourceTiktokMarketingSpec)
from .streams import (DEFAULT_START_DATE, AdGroups, AdGroupsReports, Ads,
AdsReports, Advertisers, AdvertisersReports, Campaigns,
CampaignsReports, ReportGranularity)
DOCUMENTATION_URL = "https://docs.airbyte.io/integrations/sources/tiktok-marketing"
class TiktokTokenAuthenticator(TokenAuthenticator):
"""
Docs: https://business-api.tiktok.com/marketing_api/docs?rid=sta6fe2yww&id=1701890922708994
"""
def __init__(self, token: str, **kwargs):
super().__init__(token, **kwargs)
self.token = token
def get_auth_header(self) -> Mapping[str, Any]:
return {"Access-Token": self.token}
class SourceTiktokMarketing(AbstractSource):
def spec(self, *args, **kwargs) -> ConnectorSpecification:
"""Returns the spec for this integration."""
return ConnectorSpecification(
documentationUrl=DOCUMENTATION_URL,
changelogUrl=DOCUMENTATION_URL,
supportsIncremental=True,
supported_destination_sync_modes=[DestinationSyncMode.overwrite, DestinationSyncMode.append, DestinationSyncMode.append_dedup],
connectionSpecification=SourceTiktokMarketingSpec.schema(),
additionalProperties=True,
advanced_auth=AdvancedAuth(
auth_flow_type=AuthFlowType.oauth2_0,
predicate_key=["credentials", "auth_type"],
predicate_value="oauth2.0",
oauth_config_specification=OAuthConfigSpecification(
complete_oauth_output_specification=CompleteOauthOutputSpecification.schema(),
complete_oauth_server_input_specification=CompleteOauthServerInputSpecification.schema(),
complete_oauth_server_output_specification=CompleteOauthServerOutputSpecification.schema(),
),
),
)
@staticmethod
def _prepare_stream_args(config: Mapping[str, Any]) -> Mapping[str, Any]:
"""Converts an input configure to stream arguments"""
credentials = config.get("credentials")
if credentials:
# used for new config format
access_token = credentials["access_token"]
secret = credentials.get("secret")
app_id = int(credentials.get("app_id", 0))
advertiser_id = int(credentials.get("advertiser_id", 0))
else:
access_token = config["access_token"]
secret = config.get("environment", {}).get("secret")
app_id = int(config.get("environment", {}).get("app_id", 0))
advertiser_id = int(config.get("environment", {}).get("advertiser_id", 0))
return {
"authenticator": TiktokTokenAuthenticator(access_token),
"start_date": config.get("start_date") or DEFAULT_START_DATE,
"advertiser_id": advertiser_id,
"app_id": app_id,
"secret": secret,
}
def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, any]:
"""
Tests if the input configuration can be used to successfully connect to the integration
"""
try:
next(Advertisers(**self._prepare_stream_args(config)).read_records(SyncMode.full_refresh))
except Exception as err:
return False, err
return True, None
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
args = self._prepare_stream_args(config)
report_granularity = config.get("report_granularity") or ReportGranularity.default()
report_args = dict(report_granularity=report_granularity, **args)
advertisers_reports = AdvertisersReports(**report_args)
streams = [
Ads(**args),
AdsReports(**report_args),
Advertisers(**args),
advertisers_reports if not advertisers_reports.is_sandbox else None,
AdGroups(**args),
AdGroupsReports(**report_args),
Campaigns(**args),
CampaignsReports(**report_args),
]
return [stream for stream in streams if stream]