diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index f88a92fd6917b..fd7b5772272cc 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -13,13 +13,13 @@ - name: BigQuery destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133 dockerRepository: airbyte/destination-bigquery - dockerImageTag: 0.6.9 + dockerImageTag: 0.6.10 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg - name: BigQuery (denormalized typed struct) destinationDefinitionId: 079d5540-f236-4294-ba7c-ade8fd918496 dockerRepository: airbyte/destination-bigquery-denormalized - dockerImageTag: 0.2.8 + dockerImageTag: 0.2.9 documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery icon: bigquery.svg - name: Cassandra @@ -60,7 +60,7 @@ - name: Google Cloud Storage (GCS) destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a dockerRepository: airbyte/destination-gcs - dockerImageTag: 0.1.23 + dockerImageTag: 0.1.24 documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs icon: googlecloudstorage.svg - name: Google Firestore @@ -162,7 +162,7 @@ - name: Redshift destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc dockerRepository: airbyte/destination-redshift - dockerImageTag: 0.3.25 + dockerImageTag: 0.3.26 documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift icon: redshift.svg - name: Rockset @@ -173,7 +173,7 @@ - name: S3 destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.2.8 + dockerImageTag: 0.2.9 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 icon: s3.svg - name: SFTP-JSON @@ -185,7 +185,7 @@ - name: Snowflake destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba dockerRepository: airbyte/destination-snowflake - dockerImageTag: 0.4.15 + dockerImageTag: 0.4.16 documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake icon: snowflake.svg resourceRequirements: diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index d67d7f86fa4fe..091d757ed3a3b 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -188,7 +188,7 @@ supportsDBT: false supported_destination_sync_modes: - "append" -- dockerImage: "airbyte/destination-bigquery:0.6.9" +- dockerImage: "airbyte/destination-bigquery:0.6.10" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -383,7 +383,7 @@ - "overwrite" - "append" - "append_dedup" -- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.8" +- dockerImage: "airbyte/destination-bigquery-denormalized:0.2.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/bigquery" connectionSpecification: @@ -1162,7 +1162,7 @@ - "overwrite" - "append" supportsNamespaces: true -- dockerImage: "airbyte/destination-gcs:0.1.23" +- dockerImage: "airbyte/destination-gcs:0.1.24" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs" connectionSpecification: @@ -3272,7 +3272,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-redshift:0.3.25" +- dockerImage: "airbyte/destination-redshift:0.3.26" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/redshift" connectionSpecification: @@ -3454,7 +3454,7 @@ supported_destination_sync_modes: - "append" - "overwrite" -- dockerImage: "airbyte/destination-s3:0.2.8" +- dockerImage: "airbyte/destination-s3:0.2.9" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3" connectionSpecification: @@ -3825,7 +3825,7 @@ supported_destination_sync_modes: - "overwrite" - "append" -- dockerImage: "airbyte/destination-snowflake:0.4.15" +- dockerImage: "airbyte/destination-snowflake:0.4.16" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/snowflake" connectionSpecification: diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile index 5195a2158e354..d1b6147fc93dc 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/Dockerfile @@ -13,10 +13,10 @@ FROM airbyte/integration-base-java:dev WORKDIR /airbyte ENV APPLICATION destination-bigquery-denormalized -ENV APPLICATION_VERSION 0.2.8 +ENV APPLICATION_VERSION 0.2.9 ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.8 +LABEL io.airbyte.version=0.2.9 LABEL io.airbyte.name=airbyte/destination-bigquery-denormalized diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGscDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java similarity index 99% rename from airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGscDestinationAcceptanceTest.java rename to airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java index ffaddde9ef333..0301d6635f3cc 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGscDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationAcceptanceTest.java @@ -49,9 +49,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BigQueryDenormalizedGscDestinationAcceptanceTest extends DestinationAcceptanceTest { +public class BigQueryDenormalizedGcsDestinationAcceptanceTest extends DestinationAcceptanceTest { - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedGscDestinationAcceptanceTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedGcsDestinationAcceptanceTest.class); private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGscDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java similarity index 97% rename from airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGscDestinationTest.java rename to airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java index 696e0cfffd3f8..11697e942af1f 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGscDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedGcsDestinationTest.java @@ -16,6 +16,7 @@ import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithInvalidArrayType; import static io.airbyte.integrations.destination.bigquery.util.BigQueryDenormalizedTestDataUtils.getSchemaWithReferenceDefinition; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.params.provider.Arguments.arguments; import com.amazonaws.services.s3.AmazonS3; @@ -71,12 +72,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class BigQueryDenormalizedGscDestinationTest { +class BigQueryDenormalizedGcsDestinationTest { private static final Path CREDENTIALS_PATH = Path.of("secrets/credentials.json"); private static final Set AIRBYTE_METADATA_FIELDS = Set.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, JavaBaseConstants.COLUMN_NAME_AB_ID); - private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedGscDestinationTest.class); + private static final Logger LOGGER = LoggerFactory.getLogger(BigQueryDenormalizedGcsDestinationTest.class); private static final String BIG_QUERY_CLIENT_CHUNK_SIZE = "big_query_client_buffer_size_mb"; private static final Instant NOW = Instant.now(); @@ -276,8 +277,11 @@ void testWriteWithFormat() throws Exception { Field.of("updated_at", StandardSQLTypeName.TIMESTAMP), Field.of(JavaBaseConstants.COLUMN_NAME_AB_ID, StandardSQLTypeName.STRING), Field.of(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, StandardSQLTypeName.TIMESTAMP)); + final Schema actualSchema = BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(); - assertEquals(BigQueryUtils.getTableDefinition(bigquery, dataset.getDatasetId().getDataset(), USERS_STREAM_NAME).getSchema(), expectedSchema); + assertNotNull(actualSchema); + actualSchema.getFields().forEach(actualField -> assertEquals(expectedSchema.getFields().get(actualField.getName()), + Field.of(actualField.getName(), actualField.getType()))); } @Test diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java index 83c18fa4d2f80..337e674555d47 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/util/BigQueryDenormalizedTestDataUtils.java @@ -11,196 +11,201 @@ public class BigQueryDenormalizedTestDataUtils { public static JsonNode getSchema() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"accepts_marketing_updated_at\": {\n" - + " \"type\": [\n" - + " \"null\",\n" - + " \"string\"\n" - + " ],\n" - + " \"format\": \"date-time\"\n" - + " },\n" - + " \"name\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"permission-list\": {\n" - + " \"type\": [\n" - + " \"array\"\n" - + " ],\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"domain\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"grants\": {\n" - + " \"type\": [\n" - + " \"array\"\n" - + " ],\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + "}"); - + return Jsons.deserialize(""" + { + "type": [ + "object" + ], + "properties": { + "accepts_marketing_updated_at": { + "type": [ + "null", + "string" + ], + "format": "date-time" + }, + "name": { + "type": [ + "string" + ] + }, + "permission-list": { + "type": [ + "array" + ], + "items": { + "type": [ + "object" + ], + "properties": { + "domain": { + "type": [ + "string" + ] + }, + "grants": { + "type": [ + "array" + ], + "items": { + "type": [ + "string" + ] + } + } + } + } + } + } + } + """); } public static JsonNode getSchemaWithFormats() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"name\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"date_of_birth\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ],\n" - + " \"format\": \"date\"\n" - + " },\n" - + " \"updated_at\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ],\n" - + " \"format\": \"date-time\"\n" - + " }\n" - + " }\n" - + "}"); + return Jsons.deserialize(""" + { + "type": [ + "object" + ], + "properties": { + "name": { + "type": [ + "string" + ] + }, + "date_of_birth": { + "type": [ + "string" + ], + "format": "date" + }, + "updated_at": { + "type": [ + "string" + ], + "format": "date-time" + } + } + } + """); } public static JsonNode getSchemaWithDateTime() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " " - + - "\"updated_at\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ],\n" - + " \"format\": \"date-time\"\n" - + " },\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"nested_datetime\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ],\n" - + " \"format\": \"date-time\"\n" - + " }\n" - + - " " - + "}\n" - + " }\n" - + " }\n" - + "}"); + return Jsons.deserialize(""" + { + "type": [ + "object" + ], + "properties": { + "updated_at": { + "type": [ + "string" + ], + "format": "date-time" + }, + "items": { + "type": [ + "object" + ], + "properties": { + "nested_datetime": { + "type": [ + "string" + ], + "format": "date-time" + } + } + } + } + } + """); } public static JsonNode getSchemaWithInvalidArrayType() { - return Jsons.deserialize( - "{\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"name\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"permission-list\": {\n" - + " \"type\": [\n" - + " \"array\"\n" - + " ],\n" - + " \"items\": {\n" - + " \"type\": [\n" - + " \"object\"\n" - + " ],\n" - + " \"properties\": {\n" - + " \"domain\": {\n" - + " \"type\": [\n" - + " \"string\"\n" - + " ]\n" - + " },\n" - + " \"grants\": {\n" - + " \"type\": [\n" - + " \"array\"\n" // missed "items" element - + " ]\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + " }\n" - + "}"); + return Jsons.deserialize(""" + { + "type": [ + "object" + ], + "properties": { + "name": { + "type": [ + "string" + ] + }, + "permission-list": { + "type": [ + "array" + ], + "items": { + "type": [ + "object" + ], + "properties": { + "domain": { + "type": [ + "string" + ] + }, + "grants": { + "type": [ + "array" + """ + // missed "items" element + """ + ] + } + } + } + } + } + } + """); } public static JsonNode getData() { - return Jsons.deserialize( - "{\n" - + " \"name\": \"Andrii\",\n" - + " \"accepts_marketing_updated_at\": \"2021-10-11T06:36:53-07:00\",\n" - + " \"permission-list\": [\n" - + " {\n" - + " \"domain\": \"abs\",\n" - + " \"grants\": [\n" - + " \"admin\"\n" - + " ]\n" - + " },\n" - + " {\n" - + " \"domain\": \"tools\",\n" - + " \"grants\": [\n" - + " \"read\", \"write\"\n" - + " ]\n" - + " }\n" - + " ]\n" - + "}"); + return Jsons.deserialize(""" + { + "name": "Andrii", + "accepts_marketing_updated_at": "2021-10-11T06:36:53-07:00", + "permission-list": [ + { + "domain": "abs", + "grants": [ + "admin" + ] + }, + { + "domain": "tools", + "grants": [ + "read", + "write" + ] + } + ] + } + """); } public static JsonNode getDataWithFormats() { - return Jsons.deserialize( - "{\n" - + " \"name\": \"Andrii\",\n" - + " \"date_of_birth\": \"1996-01-25\",\n" - + " \"updated_at\": \"2021-10-11T06:36:53\"\n" - + "}"); + return Jsons.deserialize(""" + { + "name": "Andrii", + "date_of_birth": "1996-01-25", + "updated_at": "2021-10-11T06:36:53" + } + """); } public static JsonNode getDataWithJSONDateTimeFormats() { - return Jsons.deserialize( - "{\n" - + " \"updated_at\": \"2021-10-11T06:36:53+00:00\",\n" - + " \"items\": {\n" - + " \"nested_datetime\": \"2021-11-11T06:36:53+00:00\"\n" - + " }\n" - + "}"); + return Jsons.deserialize(""" + { + "updated_at": "2021-10-11T06:36:53+00:00", + "items": { + "nested_datetime": "2021-11-11T06:36:53+00:00" + } + } + """); } public static JsonNode getDataWithJSONWithReference() { @@ -211,86 +216,98 @@ public static JsonNode getDataWithJSONWithReference() { } public static JsonNode getSchemaWithReferenceDefinition() { - return Jsons.deserialize( - "{ \n" - + " \"type\" : [ \"null\", \"object\" ],\n" - + " \"properties\" : {\n" - + " \"users\": {\n" - + " \"$ref\": \"#/definitions/users_\"\n" - + - " }\n" - + " }\n" - + - "}\n" - + " "); + return Jsons.deserialize(""" + { + "type": [ + "null", + "object" + ], + "properties": { + "users": { + "$ref": "#/definitions/users_" + } + } + } + """); } public static JsonNode getSchemaWithNestedDatetimeInsideNullObject() { - return Jsons.deserialize("{\n" + - " \"type\": [\n" + - " \"object\"\n" + - " ],\n" + - " \"properties\": {\n" + - " \"name\": {\n" + - " \"type\": [\n" + - " \"null\",\n" + - " \"string\"\n" + - " ]\n" + - " },\n" + - " \"appointment\": {\n" + - " \"type\": [\n" + - " \"null\",\n" + - " \"object\"\n" + - " ],\n" + - " \"properties\": {\n" + - " \"street\": {\n" + - " \"type\": [\n" + - " \"null\",\n" + - " \"string\"\n" + - " ]\n" + - " },\n" + - " \"expTime\": {\n" + - " \"type\": [\n" + - " \"null\",\n" + - " \"string\"\n" + - " ],\n" + - " \"format\": \"date-time\"\n" + - " }\n" + - " }\n" + - " }\n" + - " }\n" + - "}"); + return Jsons.deserialize(""" + { + "type": [ + "object" + ], + "properties": { + "name": { + "type": [ + "null", + "string" + ] + }, + "appointment": { + "type": [ + "null", + "object" + ], + "properties": { + "street": { + "type": [ + "null", + "string" + ] + }, + "expTime": { + "type": [ + "null", + "string" + ], + "format": "date-time" + } + } + } + } + } + """); } public static JsonNode getDataWithEmptyObjectAndArray() { - return Jsons.deserialize( - "{\n" - + " \"name\": \"Andrii\",\n" - + " \"permission-list\": [\n" - + " {\n" - + " \"domain\": \"abs\",\n" - + " \"items\": {},\n" // empty object - + " \"grants\": [\n" - + " \"admin\"\n" - + " ]\n" - + " },\n" - + " {\n" - + " \"domain\": \"tools\",\n" - + " \"grants\": [],\n" // empty array - + " \"items\": {\n" // object with empty array and object - + " \"object\": {},\n" - + " \"array\": []\n" - + " }\n" - + " }\n" - + " ]\n" - + "}"); + return Jsons.deserialize(""" + { + "name": "Andrii", + "permission-list": [ + { + "domain": "abs", + "items": {}, + """ + // empty object + """ + "grants": [ + "admin" + ] + }, + { + "domain": "tools", + "grants": [], + """ + // empty array + """ + "items": { + """ + // object with empty array and object + """ + "object": {}, + "array": [] + } + } + ] + } + """); } public static JsonNode getDataWithNestedDatetimeInsideNullObject() { - return Jsons.deserialize("{\n" + - " \"name\": \"Alice in Wonderland\",\n" + - " \"appointment\": null\n" + - "}"); + return Jsons.deserialize(""" + { + "name": "Alice in Wonderland", + "appointment": null + } + """); } diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index c38040728a191..f632ef3ab8193 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -13,10 +13,10 @@ FROM airbyte/integration-base-java:dev WORKDIR /airbyte ENV APPLICATION destination-bigquery -ENV APPLICATION_VERSION 0.6.9 +ENV APPLICATION_VERSION 0.6.10 ENV ENABLE_SENTRY true COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.6.9 +LABEL io.airbyte.version=0.6.10 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-gcs/Dockerfile b/airbyte-integrations/connectors/destination-gcs/Dockerfile index 23db6a4d5ed90..6e7207acf2726 100644 --- a/airbyte-integrations/connectors/destination-gcs/Dockerfile +++ b/airbyte-integrations/connectors/destination-gcs/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.23 +LABEL io.airbyte.version=0.1.24 LABEL io.airbyte.name=airbyte/destination-gcs diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java index 80604e940218f..8149228b5574c 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsConsumer.java @@ -6,6 +6,7 @@ import com.amazonaws.services.s3.AmazonS3; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.gcs.writer.GcsWriterFactory; @@ -17,13 +18,19 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class GcsConsumer extends FailureTrackingAirbyteMessageConsumer { + protected static final Logger LOGGER = LoggerFactory.getLogger(GcsConsumer.class); + private final GcsDestinationConfig gcsDestinationConfig; private final ConfiguredAirbyteCatalog configuredCatalog; private final GcsWriterFactory writerFactory; @@ -87,8 +94,20 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Excepti @Override protected void close(final boolean hasFailed) throws Exception { - for (final DestinationFileWriter handler : streamNameAndNamespaceToWriters.values()) { - handler.close(hasFailed); + LOGGER.debug("Closing consumer with writers = {}", streamNameAndNamespaceToWriters); + List exceptionsThrown = new ArrayList<>(); + for (var entry : streamNameAndNamespaceToWriters.entrySet()) { + final DestinationFileWriter handler = entry.getValue(); + LOGGER.debug("Closing writer {}", entry.getKey()); + try { + handler.close(hasFailed); + } catch (Exception e) { + exceptionsThrown.add(e); + LOGGER.error("Exception while closing writer {}", entry.getKey(), e); + } + } + if (!exceptionsThrown.isEmpty()) { + throw new RuntimeException(String.format("Exceptions thrown while closing consumer: %s", Strings.join(exceptionsThrown, "\n"))); } // Gcs stream uploader is all or nothing if a failure happens in the destination. if (!hasFailed) { diff --git a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java index 8ff12911446fe..a94cb81e3f781 100644 --- a/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java +++ b/airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/writer/BaseGcsWriter.java @@ -60,33 +60,39 @@ protected BaseGcsWriter(final GcsDestinationConfig config, * */ @Override - public void initialize() { - final String bucket = config.getBucketName(); - if (!gcsBucketExist(s3Client, bucket)) { - LOGGER.info("Bucket {} does not exist; creating...", bucket); - s3Client.createBucket(bucket); - LOGGER.info("Bucket {} has been created.", bucket); - } - - if (syncMode == DestinationSyncMode.OVERWRITE) { - LOGGER.info("Overwrite mode"); - final List keysToDelete = new LinkedList<>(); - final List objects = s3Client.listObjects(bucket, outputPrefix) - .getObjectSummaries(); - for (final S3ObjectSummary object : objects) { - keysToDelete.add(new KeyVersion(object.getKey())); + public void initialize() throws IOException { + try { + final String bucket = config.getBucketName(); + if (!gcsBucketExist(s3Client, bucket)) { + LOGGER.info("Bucket {} does not exist; creating...", bucket); + s3Client.createBucket(bucket); + LOGGER.info("Bucket {} has been created.", bucket); } - if (keysToDelete.size() > 0) { - LOGGER.info("Purging non-empty output path for stream '{}' under OVERWRITE mode...", stream.getName()); - // Google Cloud Storage doesn't accept request to delete multiple objects - for (final KeyVersion keyToDelete : keysToDelete) { - s3Client.deleteObject(bucket, keyToDelete.getKey()); + if (syncMode == DestinationSyncMode.OVERWRITE) { + LOGGER.info("Overwrite mode"); + final List keysToDelete = new LinkedList<>(); + final List objects = s3Client.listObjects(bucket, outputPrefix) + .getObjectSummaries(); + for (final S3ObjectSummary object : objects) { + keysToDelete.add(new KeyVersion(object.getKey())); } - LOGGER.info("Deleted {} file(s) for stream '{}'.", keysToDelete.size(), - stream.getName()); + + if (keysToDelete.size() > 0) { + LOGGER.info("Purging non-empty output path for stream '{}' under OVERWRITE mode...", stream.getName()); + // Google Cloud Storage doesn't accept request to delete multiple objects + for (final KeyVersion keyToDelete : keysToDelete) { + s3Client.deleteObject(bucket, keyToDelete.getKey()); + } + LOGGER.info("Deleted {} file(s) for stream '{}'.", keysToDelete.size(), + stream.getName()); + } + LOGGER.info("Overwrite is finished"); } - LOGGER.info("Overwrite is finished"); + } catch (Exception e) { + LOGGER.error("Failed to initialize: ", e); + closeWhenFail(); + throw e; } } diff --git a/airbyte-integrations/connectors/destination-redshift/Dockerfile b/airbyte-integrations/connectors/destination-redshift/Dockerfile index 42cacaf635f6c..d909bb1d72e68 100644 --- a/airbyte-integrations/connectors/destination-redshift/Dockerfile +++ b/airbyte-integrations/connectors/destination-redshift/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-redshift COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.3.25 +LABEL io.airbyte.version=0.3.26 LABEL io.airbyte.name=airbyte/destination-redshift diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index 9a054d4bfbf23..7ad019a76a77b 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3 COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.8 +LABEL io.airbyte.version=0.2.9 LABEL io.airbyte.name=airbyte/destination-s3 diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java index 651a0a7f8cfdb..4af812673a0df 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3Consumer.java @@ -6,6 +6,7 @@ import com.amazonaws.services.s3.AmazonS3; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.FailureTrackingAirbyteMessageConsumer; import io.airbyte.integrations.destination.s3.writer.DestinationFileWriter; @@ -17,13 +18,19 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.UUID; import java.util.function.Consumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class S3Consumer extends FailureTrackingAirbyteMessageConsumer { + protected static final Logger LOGGER = LoggerFactory.getLogger(S3Consumer.class); + private final S3DestinationConfig s3DestinationConfig; private final ConfiguredAirbyteCatalog configuredCatalog; private final S3WriterFactory writerFactory; @@ -85,8 +92,20 @@ protected void acceptTracked(final AirbyteMessage airbyteMessage) throws Excepti @Override protected void close(final boolean hasFailed) throws Exception { - for (final DestinationFileWriter handler : streamNameAndNamespaceToWriters.values()) { - handler.close(hasFailed); + LOGGER.debug("Closing consumer with writers = {}", streamNameAndNamespaceToWriters); + List exceptionsThrown = new ArrayList<>(); + for (var entry : streamNameAndNamespaceToWriters.entrySet()) { + final DestinationFileWriter handler = entry.getValue(); + LOGGER.debug("Closing writer {}", entry.getKey()); + try { + handler.close(hasFailed); + } catch (Exception e) { + exceptionsThrown.add(e); + LOGGER.error("Exception while closing writer {}", entry.getKey(), e); + } + } + if (!exceptionsThrown.isEmpty()) { + throw new RuntimeException(String.format("Exceptions thrown while closing consumer: %s", Strings.join(exceptionsThrown, "\n"))); } // S3 stream uploader is all or nothing if a failure happens in the destination. if (!hasFailed) { diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java index e9929825b0a82..86e48b2e6d6e5 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/writer/BaseS3Writer.java @@ -66,31 +66,37 @@ public String getOutputPrefix() { * */ @Override - public void initialize() { - final String bucket = config.getBucketName(); - if (!s3Client.doesBucketExistV2(bucket)) { - LOGGER.info("Bucket {} does not exist; creating...", bucket); - s3Client.createBucket(bucket); - LOGGER.info("Bucket {} has been created.", bucket); - } - - if (syncMode == DestinationSyncMode.OVERWRITE) { - LOGGER.info("Overwrite mode"); - final List keysToDelete = new LinkedList<>(); - final List objects = s3Client.listObjects(bucket, outputPrefix) - .getObjectSummaries(); - for (final S3ObjectSummary object : objects) { - keysToDelete.add(new KeyVersion(object.getKey())); + public void initialize() throws IOException { + try { + final String bucket = config.getBucketName(); + if (!s3Client.doesBucketExistV2(bucket)) { + LOGGER.info("Bucket {} does not exist; creating...", bucket); + s3Client.createBucket(bucket); + LOGGER.info("Bucket {} has been created.", bucket); } - if (keysToDelete.size() > 0) { - LOGGER.info("Purging non-empty output path for stream '{}' under OVERWRITE mode...", - stream.getName()); - final DeleteObjectsResult result = s3Client - .deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keysToDelete)); - LOGGER.info("Deleted {} file(s) for stream '{}'.", result.getDeletedObjects().size(), - stream.getName()); + if (syncMode == DestinationSyncMode.OVERWRITE) { + LOGGER.info("Overwrite mode"); + final List keysToDelete = new LinkedList<>(); + final List objects = s3Client.listObjects(bucket, outputPrefix) + .getObjectSummaries(); + for (final S3ObjectSummary object : objects) { + keysToDelete.add(new KeyVersion(object.getKey())); + } + + if (keysToDelete.size() > 0) { + LOGGER.info("Purging non-empty output path for stream '{}' under OVERWRITE mode...", + stream.getName()); + final DeleteObjectsResult result = s3Client + .deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keysToDelete)); + LOGGER.info("Deleted {} file(s) for stream '{}'.", result.getDeletedObjects().size(), + stream.getName()); + } } + } catch (Exception e) { + LOGGER.error("Failed to initialize: ", e); + closeWhenFail(); + throw e; } } diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index f5605f335517d..5912f54ed3ba0 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -18,8 +18,8 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -ENV APPLICATION_VERSION 0.4.15 +ENV APPLICATION_VERSION 0.4.16 ENV ENABLE_SENTRY true -LABEL io.airbyte.version=0.4.15 +LABEL io.airbyte.version=0.4.16 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py index 57c638e8bb68f..7f50518a96c67 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py +++ b/airbyte-integrations/connectors/source-facebook-marketing/source_facebook_marketing/api.py @@ -93,7 +93,7 @@ def handle_call_rate_limit(self, response, params): max_pause_interval = self.pause_interval_minimum for record in response.json(): - # there are two types of failures: + # there are two types of failures: # 1. no response (we execute batch until all inner requests has response) # 2. response with error (we crash loudly) # in case it is failed inner request the headers might not be present