Skip to content

Commit aa9786d

Browse files
🎉 Google Ads improvement: Support user-specified queries (#5302)
*Add google ads custom queries stream *Display link to gradle scan on PR comment if test build failed
1 parent 5f697ac commit aa9786d

16 files changed

+628
-18
lines changed

.github/workflows/test-command.yml

+1
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,7 @@ jobs:
212212
comment-id: ${{ github.event.inputs.comment-id }}
213213
body: |
214214
> :x: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}
215+
> :bug: ${{env.GRADLE_SCAN_LINK}}
215216
# In case of self-hosted EC2 errors, remove this block.
216217
stop-test-runner:
217218
name: Stop Build EC2 Runner

airbyte-integrations/connectors/source-google-ads/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,5 +13,5 @@ RUN pip install .
1313

1414
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
1515

16-
LABEL io.airbyte.version=0.1.8
16+
LABEL io.airbyte.version=0.1.9
1717
LABEL io.airbyte.name=airbyte/source-google-ads

airbyte-integrations/connectors/source-google-ads/acceptance-test-config.yml

+11-7
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@ tests:
1414
basic_read:
1515
- config_path: "secrets/config.json"
1616
configured_catalog_path: "integration_tests/configured_catalog_without_empty_streams.json"
17-
# TODO incremental test is disabled because records output from the report streams can be up to 14 days older than the input state
18-
# incremental:
19-
# - config_path: "secrets/config.json"
20-
# configured_catalog_path: "integration_tests/configured_catalog.json"
21-
# future_state_path: "integration_tests/abnormal_state.json"
22-
# cursor_paths:
23-
# ad_group_ad_report: ["segments.date"]
17+
- config_path: "secrets/config.json"
18+
configured_catalog_path: "integration_tests/configured_catalog_protobuf_msg.json"
19+
expect_records:
20+
path: "integration_tests/expected_records_msg.txt"
21+
# TODO incremental test is disabled because records output from the report streams can be up to 14 days older than the input state
22+
# incremental:
23+
# - config_path: "secrets/config.json"
24+
# configured_catalog_path: "integration_tests/configured_catalog.json"
25+
# future_state_path: "integration_tests/abnormal_state.json"
26+
# cursor_paths:
27+
# ad_group_ad_report: ["segments.date"]
2428
full_refresh:
2529
- config_path: "secrets/config.json"
2630
configured_catalog_path: "integration_tests/configured_catalog.json"

airbyte-integrations/connectors/source-google-ads/integration_tests/configured_catalog.json

+18
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,24 @@
9999
},
100100
"sync_mode": "full_refresh",
101101
"destination_sync_mode": "overwrite"
102+
},
103+
{
104+
"stream": {
105+
"name": "happytable",
106+
"json_schema": {},
107+
"supported_sync_modes": ["full_refresh"]
108+
},
109+
"sync_mode": "full_refresh",
110+
"destination_sync_mode": "overwrite"
111+
},
112+
{
113+
"stream": {
114+
"name": "unhappytable",
115+
"json_schema": {},
116+
"supported_sync_modes": ["full_refresh"]
117+
},
118+
"sync_mode": "full_refresh",
119+
"destination_sync_mode": "overwrite"
102120
}
103121
]
104122
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{
2+
"streams": [
3+
{
4+
"stream": {
5+
"name": "ad_group_custom",
6+
"json_schema": {},
7+
"supported_sync_modes": ["full_refresh", "incremental"],
8+
"source_defined_cursor": true,
9+
"default_cursor_field": ["segments.date"]
10+
},
11+
"sync_mode": "incremental",
12+
"destination_sync_mode": "overwrite",
13+
"cursor_field": ["segments.date"]
14+
}
15+
]
16+
}

airbyte-integrations/connectors/source-google-ads/integration_tests/configured_catalog_without_empty_streams.json

+22
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,28 @@
6363
},
6464
"sync_mode": "full_refresh",
6565
"destination_sync_mode": "overwrite"
66+
},
67+
{
68+
"stream": {
69+
"name": "happytable",
70+
"json_schema": {},
71+
"supported_sync_modes": ["full_refresh", "incremental"],
72+
"source_defined_cursor": true,
73+
"default_cursor_field": ["campaign.start_date"]
74+
},
75+
"sync_mode": "incremental",
76+
"destination_sync_mode": "append",
77+
"cursor_field": ["campaign.start_date"]
78+
},
79+
{
80+
"stream": {
81+
"name": "unhappytable",
82+
"json_schema": {},
83+
"supported_sync_modes": ["full_refresh"],
84+
"source_defined_primary_key": [["customer.id"]]
85+
},
86+
"sync_mode": "full_refresh",
87+
"destination_sync_mode": "overwrite"
6688
}
6789
]
6890
}

airbyte-integrations/connectors/source-google-ads/integration_tests/expected_records_msg.txt

+104
Large diffs are not rendered by default.

airbyte-integrations/connectors/source-google-ads/setup.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525

2626
from setuptools import find_packages, setup
2727

28-
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-ads", "pendulum"]
28+
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "google-ads==13.0.0", "pendulum"]
2929

30-
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock", "pendulum"]
30+
TEST_REQUIREMENTS = ["pytest~=6.1", "pytest-mock"]
3131

3232
setup(
3333
name="source_google_ads",
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
#
2+
# MIT License
3+
#
4+
# Copyright (c) 2020 Airbyte
5+
#
6+
# Permission is hereby granted, free of charge, to any person obtaining a copy
7+
# of this software and associated documentation files (the "Software"), to deal
8+
# in the Software without restriction, including without limitation the rights
9+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
# copies of the Software, and to permit persons to whom the Software is
11+
# furnished to do so, subject to the following conditions:
12+
#
13+
# The above copyright notice and this permission notice shall be included in all
14+
# copies or substantial portions of the Software.
15+
#
16+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
# SOFTWARE.
23+
#
24+
25+
import re
26+
from functools import lru_cache
27+
from typing import Any, Dict, List, Mapping
28+
29+
from .streams import IncrementalGoogleAdsStream
30+
31+
32+
class CustomQuery(IncrementalGoogleAdsStream):
33+
def __init__(self, custom_query_config, **kwargs):
34+
self.custom_query_config = custom_query_config
35+
self.user_defined_query = custom_query_config["query"]
36+
super().__init__(**kwargs)
37+
38+
@property
39+
def primary_key(self) -> str:
40+
"""
41+
The primary_key option is disabled. Config should not provide the primary key.
42+
It will be ignored if provided.
43+
If you need to enable it, uncomment the next line instead of `return None` and modify your config
44+
"""
45+
# return self.custom_query_config.get("primary_key") or None
46+
return None
47+
48+
@property
49+
def name(self):
50+
return self.custom_query_config["table_name"]
51+
52+
def get_query(self, stream_slice: Mapping[str, Any] = None) -> str:
53+
start_date, end_date = self.get_date_params(stream_slice, self.cursor_field)
54+
return self.insert_segments_date_expr(self.user_defined_query, start_date, end_date)
55+
56+
# IncrementalGoogleAdsStream uses get_json_schema a lot while parsing
57+
# responses, caching plaing crucial role for performance here.
58+
@lru_cache()
59+
def get_json_schema(self) -> Dict[str, Any]:
60+
"""
61+
Compose json schema based on user defined query.
62+
:return Dict object representing jsonschema
63+
"""
64+
65+
local_json_schema = {
66+
"$schema": "http://json-schema.org/draft-07/schema#",
67+
"type": "object",
68+
"properties": {},
69+
"additionalProperties": True,
70+
}
71+
# full list {'ENUM', 'STRING', 'DATE', 'DOUBLE', 'RESOURCE_NAME', 'INT32', 'INT64', 'BOOLEAN', 'MESSAGE'}
72+
73+
google_datatype_mapping = {
74+
"INT64": "integer",
75+
"INT32": "integer",
76+
"DOUBLE": "number",
77+
"STRING": "string",
78+
"BOOLEAN": "boolean",
79+
"DATE": "string",
80+
}
81+
fields = CustomQuery.get_query_fields(self.user_defined_query)
82+
fields.append(self.cursor_field)
83+
google_schema = self.google_ads_client.get_fields_metadata(fields)
84+
85+
for field in fields:
86+
node = google_schema.get(field)
87+
# Data type return in enum format: "GoogleAdsFieldDataType.<data_type>"
88+
google_data_type = str(node.data_type).replace("GoogleAdsFieldDataType.", "")
89+
if google_data_type == "ENUM":
90+
field_value = {"type": "string", "enum": list(node.enum_values)}
91+
elif google_data_type == "MESSAGE":
92+
# Represents protobuf message and could be anything, set custom
93+
# attribute "protobuf_message" to convert it to a string (or
94+
# array of strings) later.
95+
# https://developers.google.com/google-ads/api/reference/rpc/v8/GoogleAdsFieldDataTypeEnum.GoogleAdsFieldDataType?hl=en#message
96+
if node.is_repeated:
97+
output_type = ["array", "null"]
98+
else:
99+
output_type = ["string", "null"]
100+
field_value = {"type": output_type, "protobuf_message": True}
101+
else:
102+
output_type = [google_datatype_mapping.get(google_data_type, "string"), "null"]
103+
field_value = {"type": output_type}
104+
local_json_schema["properties"][field] = field_value
105+
106+
return local_json_schema
107+
108+
# Regexp flags for parsing GAQL query
109+
RE_FLAGS = re.DOTALL | re.MULTILINE | re.IGNORECASE
110+
# Regexp for getting query columns
111+
SELECT_EXPR = re.compile("select(.*)from", flags=RE_FLAGS)
112+
WHERE_EXPR = re.compile("where.*", flags=RE_FLAGS)
113+
# list of keywords that can come after WHERE clause,
114+
# according to https://developers.google.com/google-ads/api/docs/query/grammar
115+
KEYWORDS_EXPR = re.compile("(order by|limit|parameters)", flags=RE_FLAGS)
116+
117+
@staticmethod
118+
def get_query_fields(query: str) -> List[str]:
119+
fields = CustomQuery.SELECT_EXPR.search(query)
120+
if not fields:
121+
return []
122+
fields = fields.group(1)
123+
return [f.strip() for f in fields.split(",")]
124+
125+
@staticmethod
126+
def insert_segments_date_expr(query: str, start_date: str, end_date: str) -> str:
127+
"""
128+
Insert segments.date condition to break query into slices for incremental stream.
129+
:param query Origin user defined query
130+
:param start_date start date for metric (inclusive)
131+
:param end_date end date for metric (inclusive)
132+
:return Modified query with date window condition included
133+
"""
134+
# insert segments.date field
135+
columns = CustomQuery.SELECT_EXPR.search(query)
136+
if not columns:
137+
raise Exception("Not valid GAQL expression")
138+
columns = columns.group(1)
139+
new_columns = columns + ", segments.date\n"
140+
result_query = query.replace(columns, new_columns)
141+
142+
# Modify/insert where condition
143+
where_cond = CustomQuery.WHERE_EXPR.search(result_query)
144+
if not where_cond:
145+
# There is no where condition, insert new one
146+
where_location = len(result_query)
147+
keywords = CustomQuery.KEYWORDS_EXPR.search(result_query)
148+
if keywords:
149+
# where condition is not at the end of expression, insert new condition before keyword begins.
150+
where_location = keywords.start()
151+
result_query = (
152+
result_query[0:where_location]
153+
+ f"\nWHERE segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
154+
+ result_query[where_location:]
155+
)
156+
return result_query
157+
# There is already where condition, add segments.date expression
158+
where_cond = where_cond.group(0)
159+
keywords = CustomQuery.KEYWORDS_EXPR.search(where_cond)
160+
if keywords:
161+
# There is some keywords after WHERE condition
162+
where_cond = where_cond[0 : keywords.start()]
163+
new_where_cond = where_cond + f" AND segments.date BETWEEN '{start_date}' AND '{end_date}'\n"
164+
result_query = result_query.replace(where_cond, new_where_cond)
165+
return result_query

airbyte-integrations/connectors/source-google-ads/source_google_ads/google_ads.py

+36-4
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
# SOFTWARE.
2323
#
2424

25+
2526
from enum import Enum
2627
from typing import Any, List, Mapping
2728

@@ -60,10 +61,32 @@ def send_request(self, query: str) -> SearchGoogleAdsResponse:
6061

6162
return self.ga_service.search(search_request)
6263

64+
def get_fields_metadata(self, fields: List[str]) -> Mapping[str, Any]:
65+
"""
66+
Issue Google API request to get detailed information on data type for custom query columns.
67+
:params fields list of columns for user defined query.
68+
:return dict of fields type info.
69+
"""
70+
71+
ga_field_service = self.client.get_service("GoogleAdsFieldService")
72+
request = self.client.get_type("SearchGoogleAdsFieldsRequest")
73+
request.page_size = len(fields)
74+
fields_sql = ",".join([f"'{field}'" for field in fields])
75+
request.query = f"""
76+
SELECT
77+
name,
78+
data_type,
79+
enum_values,
80+
is_repeated
81+
WHERE name in ({fields_sql})
82+
"""
83+
response = ga_field_service.search_google_ads_fields(request=request)
84+
return {r.name: r for r in response}
85+
6386
@staticmethod
6487
def get_fields_from_schema(schema: Mapping[str, Any]) -> List[str]:
6588
properties = schema.get("properties")
66-
return [*properties]
89+
return list(properties.keys())
6790

6891
@staticmethod
6992
def convert_schema_into_query(
@@ -82,7 +105,7 @@ def convert_schema_into_query(
82105
return query_template
83106

84107
@staticmethod
85-
def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
108+
def get_field_value(field_value: GoogleAdsRow, field: str, schema_type: Mapping[str, Any]) -> str:
86109
field_name = field.split(".")
87110
for level_attr in field_name:
88111
"""
@@ -130,7 +153,6 @@ def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
130153
# In GoogleAdsRow there are attributes that add an underscore at the end in their name.
131154
# For example, 'ad_group_ad.ad.type' is replaced by 'ad_group_ad.ad.type_'.
132155
field_value = getattr(field_value, level_attr + "_", None)
133-
134156
if isinstance(field_value, Enum):
135157
field_value = field_value.name
136158
elif isinstance(field_value, (Repeated, RepeatedComposite)):
@@ -144,13 +166,23 @@ def get_field_value(field_value: GoogleAdsRow, field: str) -> str:
144166
# For example:
145167
# 1. ad_group_ad.ad.responsive_display_ad.long_headline - type AdTextAsset (https://developers.google.com/google-ads/api/reference/rpc/v6/AdTextAsset?hl=en).
146168
# 2. ad_group_ad.ad.legacy_app_install_ad - type LegacyAppInstallAdInfo (https://developers.google.com/google-ads/api/reference/rpc/v7/LegacyAppInstallAdInfo?hl=en).
169+
#
147170
if not (isinstance(field_value, (list, int, float, str, bool, dict)) or field_value is None):
148171
field_value = str(field_value)
172+
# In case of custom query field has MESSAGE type it represents protobuf
173+
# message and could be anything, convert it to a string or array of
174+
# string if it has "repeated" flag on metadata
175+
if schema_type.get("protobuf_message"):
176+
if "array" in schema_type.get("type"):
177+
field_value = [str(field) for field in field_value]
178+
else:
179+
field_value = str(field_value)
149180

150181
return field_value
151182

152183
@staticmethod
153184
def parse_single_result(schema: Mapping[str, Any], result: GoogleAdsRow):
185+
props = schema.get("properties")
154186
fields = GoogleAds.get_fields_from_schema(schema)
155-
single_record = {field: GoogleAds.get_field_value(result, field) for field in fields}
187+
single_record = {field: GoogleAds.get_field_value(result, field, props.get(field)) for field in fields}
156188
return single_record

0 commit comments

Comments
 (0)