Skip to content

Commit 48d8250

Browse files
🐛 Destination BigQuery-Denormalized: Fix JSON with $ref Definition keys (#7736)
* BUG-6638: Fix fields with $ref definition. * BUG-6638: Added integration tests. * BUG-6638: Added integration tests. * BUG-6638: Added integration tests. * BUG-6638: Added doc and pumped Dockerfile version. * BUG-6638: Added doc and pumped Dockerfile version. * BUG-6638: replaced for with forEach * BUG-6638: Bumped specification
1 parent 2d2965b commit 48d8250

File tree

9 files changed

+148
-7
lines changed

9 files changed

+148
-7
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
- name: BigQuery (denormalized typed struct)
1212
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
1313
dockerRepository: airbyte/destination-bigquery-denormalized
14-
dockerImageTag: 0.1.8
14+
dockerImageTag: 0.1.9
1515
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
1616
- name: Cassandra
1717
destinationDefinitionId: 707456df-6f4f-4ced-b5c6-03f73bcad1c5

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

+1-1
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@
254254
- "overwrite"
255255
- "append"
256256
- "append_dedup"
257-
- dockerImage: "airbyte/destination-bigquery-denormalized:0.1.8"
257+
- dockerImage: "airbyte/destination-bigquery-denormalized:0.1.9"
258258
spec:
259259
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
260260
connectionSpecification:

airbyte-config/init/src/main/resources/seed/source_specs.yaml

+47
Original file line numberDiff line numberDiff line change
@@ -4100,6 +4100,53 @@
41004100
supportsNormalization: false
41014101
supportsDBT: false
41024102
supported_destination_sync_modes: []
4103+
- dockerImage: "airbyte/source-pinterest:0.1.0"
4104+
spec:
4105+
documentationUrl: "https://docs.airbyte.io/integrations/sources/pinterest"
4106+
connectionSpecification:
4107+
$schema: "http://json-schema.org/draft-07/schema#"
4108+
title: "Pinterest Spec"
4109+
type: "object"
4110+
required:
4111+
- "client_id"
4112+
- "client_secret"
4113+
- "refresh_token"
4114+
additionalProperties: true
4115+
properties:
4116+
client_id:
4117+
type: "string"
4118+
title: "Client id"
4119+
description: "Your Pinterest client id. See the <a href=\"https://developers.pinterest.com/docs/api/v5/#tag/Authentication\"\
4120+
>docs</a> for instructions on how to generate it."
4121+
airbyte_secret: true
4122+
client_secret:
4123+
type: "string"
4124+
title: "Client secret"
4125+
description: "Your Pinterest client secret. See the <a href=\"https://developers.pinterest.com/docs/api/v5/#tag/Authentication\"\
4126+
>docs</a> for instructions on how to generate it."
4127+
airbyte_secret: true
4128+
refresh_token:
4129+
type: "string"
4130+
title: "Refresh token"
4131+
description: "Your Pinterest refresh token. See the <a href=\"https://developers.pinterest.com/docs/api/v5/#tag/Authentication\"\
4132+
>docs</a> for instructions on how to generate it."
4133+
airbyte_secret: true
4134+
access_token:
4135+
type: "string"
4136+
title: "Access token"
4137+
description: "Your Pinterest access token. See the <a href=\"https://developers.pinterest.com/docs/api/v5/#tag/Authentication\"\
4138+
>docs</a> for instructions on how to generate it."
4139+
airbyte_secret: true
4140+
start_date:
4141+
type: "string"
4142+
title: "Start date"
4143+
description: "A date in the format YYYY-MM-DD. If you have not set a date,\
4144+
\ it will be 2020-07-28 by default."
4145+
examples:
4146+
- "2020-07-28"
4147+
supportsNormalization: false
4148+
supportsDBT: false
4149+
supported_destination_sync_modes: []
41034150
- dockerImage: "airbyte/source-pipedrive:0.1.6"
41044151
spec:
41054152
documentationUrl: "https://docs.airbyte.io/integrations/sources/pipedrive"

airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
88

99
RUN tar xf ${APPLICATION}.tar --strip-components=1
1010

11-
LABEL io.airbyte.version=0.1.8
11+
LABEL io.airbyte.version=0.1.9
1212
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized

airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java

+32-3
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
2626
import java.util.Collections;
2727
import java.util.Comparator;
28+
import java.util.HashSet;
2829
import java.util.List;
2930
import java.util.Map;
31+
import java.util.Set;
3032
import java.util.function.Consumer;
3133
import java.util.stream.Collectors;
3234
import org.slf4j.Logger;
@@ -40,6 +42,8 @@ public class BigQueryDenormalizedDestination extends BigQueryDestination {
4042
protected static final String NESTED_ARRAY_FIELD = "value";
4143
private static final String TYPE_FIELD = "type";
4244
private static final String FORMAT_FIELD = "format";
45+
private static final String REF_DEFINITION_KEY = "$ref";
46+
private static final Set<String> fieldsContainRefDefinitionValue = new HashSet<>();
4347

4448
@Override
4549
protected String getTargetTableName(final String streamName) {
@@ -48,14 +52,13 @@ protected String getTargetTableName(final String streamName) {
4852
return getNamingResolver().getIdentifier(streamName);
4953
}
5054

51-
@Override
5255
protected AirbyteMessageConsumer getRecordConsumer(final BigQuery bigquery,
5356
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
5457
final ConfiguredAirbyteCatalog catalog,
5558
final Consumer<AirbyteMessage> outputRecordCollector,
5659
final boolean isGcsUploadingMode,
5760
final boolean isKeepFilesInGcs) {
58-
return new BigQueryDenormalizedRecordConsumer(bigquery, writeConfigs, catalog, outputRecordCollector, getNamingResolver());
61+
return new BigQueryDenormalizedRecordConsumer(bigquery, writeConfigs, catalog, outputRecordCollector, getNamingResolver(), fieldsContainRefDefinitionValue);
5962
}
6063

6164
@Override
@@ -73,10 +76,36 @@ protected Schema getBigQuerySchema(final JsonNode jsonSchema) {
7376
private static List<Field> getSchemaFields(final BigQuerySQLNameTransformer namingResolver, final JsonNode jsonSchema) {
7477
Preconditions.checkArgument(jsonSchema.isObject() && jsonSchema.has(PROPERTIES_FIELD));
7578
final ObjectNode properties = (ObjectNode) jsonSchema.get(PROPERTIES_FIELD);
76-
return Jsons.keys(properties).stream().map(key -> getField(namingResolver, key, properties.get(key)).build()).collect(Collectors.toList());
79+
List<Field> tmpFields = Jsons.keys(properties).stream()
80+
.peek(addToRefList(properties))
81+
.map(key -> getField(namingResolver, key, properties.get(key))
82+
.build())
83+
.collect(Collectors.toList());
84+
if (!fieldsContainRefDefinitionValue.isEmpty()) {
85+
LOGGER.warn("Next fields contain \"$ref\" as Definition: {}. They are going to be saved as String Type column", fieldsContainRefDefinitionValue);
86+
}
87+
return tmpFields;
88+
}
89+
90+
/**
91+
* @param properties - JSON schema with properties
92+
*
93+
* The method is responsible for population of fieldsContainRefDefinitionValue set with keys
94+
* contain $ref definition
95+
*
96+
* Currently, AirByte doesn't support parsing value by $ref key definition.
97+
* The issue to track this <a href="https://github.com/airbytehq/airbyte/issues/7725">7725</a>
98+
*/
99+
private static Consumer<String> addToRefList(ObjectNode properties) {
100+
return key -> {
101+
if (properties.get(key).has(REF_DEFINITION_KEY)) {
102+
fieldsContainRefDefinitionValue.add(key);
103+
}
104+
};
77105
}
78106

79107
private static Builder getField(final BigQuerySQLNameTransformer namingResolver, final String key, final JsonNode fieldDefinition) {
108+
80109
final String fieldName = namingResolver.getIdentifier(key);
81110
final Builder builder = Field.newBuilder(fieldName, StandardSQLTypeName.STRING);
82111
final List<JsonSchemaType> fieldTypes = getTypes(fieldName, fieldDefinition.get(TYPE_FIELD));

airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedRecordConsumer.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -38,13 +38,16 @@ public class BigQueryDenormalizedRecordConsumer extends BigQueryRecordConsumer {
3838

3939
private final StandardNameTransformer namingResolver;
4040
private final Set<String> invalidKeys;
41+
private final Set<String> fieldsWithRefDefinition;
4142

4243
public BigQueryDenormalizedRecordConsumer(final BigQuery bigquery,
4344
final Map<AirbyteStreamNameNamespacePair, BigQueryWriteConfig> writeConfigs,
4445
final ConfiguredAirbyteCatalog catalog,
4546
final Consumer<AirbyteMessage> outputRecordCollector,
46-
final StandardNameTransformer namingResolver) {
47+
final StandardNameTransformer namingResolver,
48+
final Set<String> fieldsWithRefDefinition) {
4749
super(bigquery, writeConfigs, catalog, outputRecordCollector, false, false);
50+
this.fieldsWithRefDefinition = fieldsWithRefDefinition;
4851
this.namingResolver = namingResolver;
4952
invalidKeys = new HashSet<>();
5053
}
@@ -57,12 +60,23 @@ protected JsonNode formatRecord(final Schema schema, final AirbyteRecordMessage
5760
final String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue();
5861
Preconditions.checkArgument(recordMessage.getData().isObject());
5962
final ObjectNode data = (ObjectNode) formatData(schema.getFields(), recordMessage.getData());
63+
// replace ObjectNode with TextNode for fields with $ref definition key
64+
// Do not need to iterate through all JSON Object nodes, only first nesting object.
65+
if (!fieldsWithRefDefinition.isEmpty()) {
66+
fieldsWithRefDefinition.forEach(key -> data.put(key, data.get(key).toString()));
67+
}
6068
data.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString());
6169
data.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, formattedEmittedAt);
6270

6371
return data;
6472
}
6573

74+
@Override
75+
public void close(boolean hasFailed) {
76+
fieldsWithRefDefinition.clear();
77+
super.close(hasFailed);
78+
}
79+
6680
protected JsonNode formatData(final FieldList fields, final JsonNode root) {
6781
// handles empty objects and arrays
6882
if (fields == null) {

airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationTest.java

+24
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,11 @@ class BigQueryDenormalizedDestinationTest {
7979
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
8080
.withData(getDataWithJSONDateTimeFormats())
8181
.withEmittedAt(NOW.toEpochMilli()));
82+
private static final AirbyteMessage MESSAGE_USERS5 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
83+
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
84+
.withData(getDataWithJSONWithReference())
85+
.withEmittedAt(NOW.toEpochMilli()));
86+
8287

8388
private JsonNode config;
8489

@@ -116,6 +121,7 @@ void setup(final TestInfo info) throws IOException {
116121
MESSAGE_USERS2.getRecord().setNamespace(datasetId);
117122
MESSAGE_USERS3.getRecord().setNamespace(datasetId);
118123
MESSAGE_USERS4.getRecord().setNamespace(datasetId);
124+
MESSAGE_USERS5.getRecord().setNamespace(datasetId);
119125

120126
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).setLocation(datasetLocation).build();
121127
dataset = bigquery.create(datasetInfo);
@@ -242,6 +248,24 @@ void testIfJSONDateTimeWasConvertedToBigQueryFormat() throws Exception {
242248
extractJsonValues(resultJson.get("items"), "nested_datetime"));
243249
}
244250

251+
@Test
252+
void testJsonReferenceDefinition() throws Exception {
253+
catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(new ConfiguredAirbyteStream()
254+
.withStream(new AirbyteStream().withName(USERS_STREAM_NAME).withNamespace(datasetId).withJsonSchema(getSchemaWithReferenceDefinition()))
255+
.withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE)));
256+
257+
final BigQueryDestination destination = new BigQueryDenormalizedDestination();
258+
final AirbyteMessageConsumer consumer = destination.getConsumer(config, catalog, Destination::defaultOutputRecordCollector);
259+
260+
consumer.accept(MESSAGE_USERS5);
261+
consumer.close();
262+
263+
final List<JsonNode> usersActual = retrieveRecordsAsJson(USERS_STREAM_NAME);
264+
final JsonNode resultJson = usersActual.get(0);
265+
assertEquals(usersActual.size(), 1);
266+
assertEquals(extractJsonValues(resultJson, "users"), Set.of("{\"name\":\"John\",\"surname\":\"Adams\"}"));
267+
}
268+
245269
private Set<String> extractJsonValues(final JsonNode node, final String attributeName) {
246270
final List<JsonNode> valuesNode = node.findValues(attributeName);
247271
final Set<String> resultSet = new HashSet<>();

airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java

+26
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,32 @@ public static JsonNode getDataWithJSONDateTimeFormats() {
202202
+ "}");
203203
}
204204

205+
public static JsonNode getDataWithJSONWithReference() {
206+
return Jsons.deserialize(
207+
"{\n"
208+
+ " \"users\" :{\n"
209+
+ " \"name\": \"John\",\n"
210+
+ " \"surname\": \"Adams"
211+
+"\"\n"
212+
+ " }\n"
213+
+ "}");
214+
}
215+
216+
public static JsonNode getSchemaWithReferenceDefinition() {
217+
return Jsons.deserialize(
218+
"{ \n"
219+
+ " \"type\" : [ \"null\", \"object\" ],\n"
220+
+ " \"properties\" : {\n"
221+
+" \"users\": {\n"
222+
+ " \"$ref\": \"#/definitions/users_\"\n"
223+
+
224+
" }\n"
225+
+ " }\n"
226+
+
227+
"}\n"
228+
+ " ");
229+
}
230+
205231
public static JsonNode getDataWithEmptyObjectAndArray() {
206232
return Jsons.deserialize(
207233
"{\n"

docs/integrations/destinations/bigquery.md

+1
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ Therefore, Airbyte BigQuery destination will convert any invalid characters into
169169

170170
| Version | Date | Pull Request | Subject |
171171
| :--- | :--- | :--- | :--- |
172+
| 0.1.9 | 2021-11-08 | [\#7736](https://github.com/airbytehq/airbyte/issues/7736) | Fixed the handling of ObjectNodes with $ref definition key |
172173
| 0.1.8 | 2021-10-27 | [\#7413](https://github.com/airbytehq/airbyte/issues/7413) | Fixed DATETIME conversion for BigQuery |
173174
| 0.1.7 | 2021-10-26 | [\#7240](https://github.com/airbytehq/airbyte/issues/7240) | Output partitioned/clustered tables |
174175
| 0.1.6 | 2021-09-16 | [\#6145](https://github.com/airbytehq/airbyte/pull/6145) | BigQuery Denormalized support for date, datetime & timestamp types through the json "format" key |

0 commit comments

Comments
 (0)