Skip to content

Commit 7f281af

Browse files
edgaooctavia-squidington-iii
authored andcommitted
Destination Bigquery / Bigquery-denormalized: Only override dataset ID if stream namespace is null/empty (airbytehq#17054)
* stop overriding namespace? * set namespace if needed * also check for empty namespace * version bump + changelog * auto-bump connector version [ci skip] * sanitize dataset id * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
1 parent 61873e2 commit 7f281af

File tree

9 files changed

+28
-16
lines changed

9 files changed

+28
-16
lines changed

airbyte-config/init/src/main/resources/seed/destination_definitions.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
- name: BigQuery
2828
destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
2929
dockerRepository: airbyte/destination-bigquery
30-
dockerImageTag: 1.2.1
30+
dockerImageTag: 1.2.3
3131
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
3232
icon: bigquery.svg
3333
resourceRequirements:
@@ -40,7 +40,7 @@
4040
- name: BigQuery (denormalized typed struct)
4141
destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496
4242
dockerRepository: airbyte/destination-bigquery-denormalized
43-
dockerImageTag: 1.2.2
43+
dockerImageTag: 1.2.3
4444
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
4545
icon: bigquery.svg
4646
resourceRequirements:

airbyte-config/init/src/main/resources/seed/destination_specs.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@
285285
supported_destination_sync_modes:
286286
- "overwrite"
287287
- "append"
288-
- dockerImage: "airbyte/destination-bigquery:1.2.1"
288+
- dockerImage: "airbyte/destination-bigquery:1.2.3"
289289
spec:
290290
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
291291
connectionSpecification:
@@ -495,7 +495,7 @@
495495
- "overwrite"
496496
- "append"
497497
- "append_dedup"
498-
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.2"
498+
- dockerImage: "airbyte/destination-bigquery-denormalized:1.2.3"
499499
spec:
500500
documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery"
501501
connectionSpecification:

airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true
1717

1818
COPY --from=build /airbyte /airbyte
1919

20-
LABEL io.airbyte.version=1.2.2
20+
LABEL io.airbyte.version=1.2.3
2121
LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized

airbyte-integrations/connectors/destination-bigquery-denormalized/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestination.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ protected void putStreamIntoUploaderMap(AirbyteStream stream,
8787
UploaderConfig uploaderConfig,
8888
Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap)
8989
throws IOException {
90-
Table existingTable =
91-
uploaderConfig.getBigQuery().getTable(uploaderConfig.getConfigStream().getStream().getNamespace(), uploaderConfig.getTargetTableName());
90+
String datasetId = BigQueryUtils.sanitizeDatasetId(uploaderConfig.getConfigStream().getStream().getNamespace());
91+
Table existingTable = uploaderConfig.getBigQuery().getTable(datasetId, uploaderConfig.getTargetTableName());
9292
BigQueryRecordFormatter formatter = uploaderConfig.getFormatter();
9393

9494
if (existingTable != null) {

airbyte-integrations/connectors/destination-bigquery/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,5 @@ ENV ENABLE_SENTRY true
1717

1818
COPY --from=build /airbyte /airbyte
1919

20-
LABEL io.airbyte.version=1.2.1
20+
LABEL io.airbyte.version=1.2.3
2121
LABEL io.airbyte.name=airbyte/destination-bigquery

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import java.util.function.Consumer;
5656
import java.util.function.Function;
5757
import org.apache.avro.Schema;
58+
import org.apache.commons.lang3.StringUtils;
5859
import org.apache.commons.lang3.tuple.ImmutablePair;
5960
import org.joda.time.DateTime;
6061
import org.joda.time.DateTimeZone;
@@ -209,7 +210,9 @@ protected Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> getUp
209210
final Map<AirbyteStreamNameNamespacePair, AbstractBigQueryUploader<?>> uploaderMap = new HashMap<>();
210211
for (final ConfiguredAirbyteStream configStream : catalog.getStreams()) {
211212
final AirbyteStream stream = configStream.getStream();
212-
stream.setNamespace(BigQueryUtils.getDatasetId(config));
213+
if (StringUtils.isEmpty(stream.getNamespace())) {
214+
stream.setNamespace(BigQueryUtils.getDatasetId(config));
215+
}
213216
final String streamName = stream.getName();
214217
final UploaderConfig uploaderConfig = UploaderConfig
215218
.builder()

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryRecordConsumer.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import java.util.List;
1616
import java.util.Map;
1717
import java.util.function.Consumer;
18+
import org.apache.commons.lang3.StringUtils;
1819
import org.slf4j.Logger;
1920
import org.slf4j.LoggerFactory;
2021

@@ -46,7 +47,9 @@ public void acceptTracked(final AirbyteMessage message) {
4647
lastStateMessage = message;
4748
outputRecordCollector.accept(message);
4849
} else if (message.getType() == Type.RECORD) {
49-
message.getRecord().setNamespace(datasetId);
50+
if (StringUtils.isEmpty(message.getRecord().getNamespace())) {
51+
message.getRecord().setNamespace(datasetId);
52+
}
5053
processRecord(message);
5154
} else {
5255
LOGGER.warn("Unexpected message: {}", message.getType());

airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryUtils.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,11 @@ private static String getFormattedBigQueryDateTime(final String dateTimeValue) {
263263
public static String getSchema(final JsonNode config, final ConfiguredAirbyteStream stream) {
264264
final String srcNamespace = stream.getStream().getNamespace();
265265
final String schemaName = srcNamespace == null ? getDatasetId(config) : srcNamespace;
266-
return NAME_TRANSFORMER.getNamespace(schemaName);
266+
return sanitizeDatasetId(schemaName);
267+
}
268+
269+
public static String sanitizeDatasetId(String datasetId) {
270+
return NAME_TRANSFORMER.getNamespace(datasetId);
267271
}
268272

269273
public static JobInfo.WriteDisposition getWriteDisposition(final DestinationSyncMode syncMode) {

docs/integrations/destinations/bigquery.md

+7-5
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,9 @@ Now that you have set up the BigQuery destination connector, check out the follo
134134

135135
| Version | Date | Pull Request | Subject |
136136
|:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
137-
| 1.2.1 | 2022-09-14 | [#15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
138-
| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | Cover arrays only if they are nested |
137+
| 1.2.3 | 2022-09-22 | [#17054](https://github.com/airbytehq/airbyte/pull/17054) | Respect stream namespace |
138+
| 1.2.1 | 2022-09-14 | [#15668](https://github.com/airbytehq/airbyte/pull/15668) | (bugged, do not use) Wrap logs in AirbyteLogMessage |
139+
| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | (bugged, do not use) Cover arrays only if they are nested |
139140
| 1.1.16 | 2022-09-01 | [#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
140141
| 1.1.15 | 2022-08-22 | [15787](https://github.com/airbytehq/airbyte/pull/15787) | Throw exception if job failed |
141142
| 1.1.14 | 2022-08-03 | [14784](https://github.com/airbytehq/airbyte/pull/14784) | Enabling Application Default Credentials |
@@ -182,9 +183,10 @@ Now that you have set up the BigQuery destination connector, check out the follo
182183

183184
| Version | Date | Pull Request | Subject |
184185
|:--------|:-----------|:----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
185-
| 1.2.2 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | Wrap logs in AirbyteLogMessage |
186-
| 1.2.1 | 2022-09-10 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | Wrapping string objects with TextNode |
187-
| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | Cover arrays only if they are nested |
186+
| 1.2.3 | 2022-09-22 | [#17054](https://github.com/airbytehq/airbyte/pull/17054) | Respect stream namespace |
187+
| 1.2.2 | 2022-09-14 | [15668](https://github.com/airbytehq/airbyte/pull/15668) | (bugged, do not use) Wrap logs in AirbyteLogMessage |
188+
| 1.2.1 | 2022-09-10 | [16401](https://github.com/airbytehq/airbyte/pull/16401) | (bugged, do not use) Wrapping string objects with TextNode |
189+
| 1.2.0 | 2022-09-09 | [#14023](https://github.com/airbytehq/airbyte/pull/14023) | (bugged, do not use) Cover arrays only if they are nested |
188190
| 1.1.16 | 2022-09-01 | [#16243](https://github.com/airbytehq/airbyte/pull/16243) | Fix Json to Avro conversion when there is field name clash from combined restrictions (`anyOf`, `oneOf`, `allOf` fields) |
189191
| 1.1.15 | 2022-08-03 | [14784](https://github.com/airbytehq/airbyte/pull/14784) | Enabling Application Default Credentials |
190192
| 1.1.14 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings |

0 commit comments

Comments
 (0)