diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 7f0fbadec12d4..bfcd84549460e 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -60,7 +60,7 @@ - name: Google Cloud Storage (GCS) destinationDefinitionId: ca8f6566-e555-4b40-943a-545bf123117a dockerRepository: airbyte/destination-gcs - dockerImageTag: 0.1.19 + dockerImageTag: 0.1.20 documentationUrl: https://docs.airbyte.io/integrations/destinations/gcs icon: googlecloudstorage.svg - name: Google PubSub @@ -167,7 +167,7 @@ - name: S3 destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362 dockerRepository: airbyte/destination-s3 - dockerImageTag: 0.2.2 + dockerImageTag: 0.2.3 documentationUrl: https://docs.airbyte.io/integrations/destinations/s3 icon: s3.svg - name: SFTP-JSON diff --git a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml index 535ea3c6abbfd..eaf2558800231 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_specs.yaml @@ -1143,7 +1143,7 @@ - "overwrite" - "append" supportsNamespaces: true -- dockerImage: "airbyte/destination-gcs:0.1.19" +- dockerImage: "airbyte/destination-gcs:0.1.20" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/gcs" connectionSpecification: @@ -3396,7 +3396,7 @@ supported_destination_sync_modes: - "append" - "overwrite" -- dockerImage: "airbyte/destination-s3:0.2.2" +- dockerImage: "airbyte/destination-s3:0.2.3" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3" connectionSpecification: @@ -3594,8 +3594,6 @@ \ more memory. Allowed values: min=5MB, max=525MB Default: 5MB." type: "integer" default: 5 - minimum: 5 - maximum: 525 examples: - 5 - title: "CSV: Comma-Separated Values" diff --git a/airbyte-integrations/connectors/destination-gcs/Dockerfile b/airbyte-integrations/connectors/destination-gcs/Dockerfile index 793b2e0cead17..90be81f532ed4 100644 --- a/airbyte-integrations/connectors/destination-gcs/Dockerfile +++ b/airbyte-integrations/connectors/destination-gcs/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.1.19 +LABEL io.airbyte.version=0.1.20 LABEL io.airbyte.name=airbyte/destination-gcs diff --git a/airbyte-integrations/connectors/destination-s3/Dockerfile b/airbyte-integrations/connectors/destination-s3/Dockerfile index 27ab7eebf8b2e..74be9eb7dc0e5 100644 --- a/airbyte-integrations/connectors/destination-s3/Dockerfile +++ b/airbyte-integrations/connectors/destination-s3/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION destination-s3 COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.2.2 +LABEL io.airbyte.version=0.2.3 LABEL io.airbyte.name=airbyte/destination-s3 diff --git a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java index 7b6b4e7220b41..555e4a0bb7c15 100644 --- a/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java +++ b/airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/avro/JsonToAvroSchemaConverter.java @@ -51,6 +51,9 @@ static List getNonNullTypes(final String fieldName, final JsonNo .filter(type -> type != JsonSchemaType.NULL).collect(Collectors.toList()); } + /** + * When no type is specified, it will default to string. + */ static List getTypes(final String fieldName, final JsonNode fieldDefinition) { final Optional combinedRestriction = getCombinedRestriction(fieldDefinition); if (combinedRestriction.isPresent()) { @@ -59,7 +62,8 @@ static List getTypes(final String fieldName, final JsonNode fiel final JsonNode typeProperty = fieldDefinition.get("type"); if (typeProperty == null || typeProperty.isNull()) { - throw new IllegalStateException(String.format("Field %s has no type", fieldName)); + LOGGER.warn("Field \"{}\" has no type specification. It will default to string", fieldName); + return Collections.singletonList(JsonSchemaType.STRING); } if (typeProperty.isArray()) { @@ -72,7 +76,8 @@ static List getTypes(final String fieldName, final JsonNode fiel return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText())); } - throw new IllegalStateException("Unexpected type: " + typeProperty); + LOGGER.warn("Field \"{}\" has unexpected type {}. It will default to string.", fieldName, typeProperty); + return Collections.singletonList(JsonSchemaType.STRING); } static Optional getCombinedRestriction(final JsonNode fieldDefinition) { @@ -120,7 +125,7 @@ public Schema getAvroSchema(final JsonNode jsonSchema, final SchemaBuilder.RecordBuilder builder = SchemaBuilder.record(stdName); if (!stdName.equals(fieldName)) { standardizedNames.put(fieldName, stdName); - LOGGER.warn("Schema name contains illegal character(s) and is standardized: {} -> {}", fieldName, + LOGGER.warn("Schema name \"{}\" contains illegal character(s) and is standardized to \"{}\"", fieldName, stdName); builder.doc( String.format("%s%s%s", @@ -159,7 +164,7 @@ public Schema getAvroSchema(final JsonNode jsonSchema, final SchemaBuilder.FieldBuilder fieldBuilder = assembler.name(stdFieldName); if (!stdFieldName.equals(subfieldName)) { standardizedNames.put(subfieldName, stdFieldName); - LOGGER.warn("Field name contains illegal character(s) and is standardized: {} -> {}", + LOGGER.warn("Field name \"{}\" contains illegal character(s) and is standardized to \"{}\"", subfieldName, stdFieldName); fieldBuilder.doc(String.format("%s%s%s", AvroConstants.DOC_KEY_ORIGINAL_NAME, @@ -231,26 +236,33 @@ Schema parseSingleType(final String fieldName, case ARRAY -> { final JsonNode items = fieldDefinition.get("items"); if (items == null) { - LOGGER.warn("Array field {} does not specify the items type. It will be assumed to be an array of strings", fieldName); + LOGGER.warn("Array field \"{}\" does not specify the items type. It will default to an array of strings", fieldName); fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA)); } else if (items.isObject()) { - fieldSchema = - Schema.createArray( - parseJsonField(String.format("%s.items", fieldName), fieldNamespace, items, appendExtraProps, addStringToLogicalTypes)); + if (!items.has("type") || items.get("type").isNull()) { + LOGGER.warn("Array field \"{}\" does not specify the items type. it will default to an array of strings", fieldName); + fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA)); + } else { + // Objects inside Json array has no names. We name it with the ".items" suffix. + final String elementFieldName = fieldName + ".items"; + fieldSchema = Schema.createArray(parseJsonField(elementFieldName, fieldNamespace, items, appendExtraProps, addStringToLogicalTypes)); + } } else if (items.isArray()) { final List arrayElementTypes = parseJsonTypeUnion(fieldName, fieldNamespace, (ArrayNode) items, appendExtraProps, addStringToLogicalTypes); arrayElementTypes.add(0, NULL_SCHEMA); fieldSchema = Schema.createArray(Schema.createUnion(arrayElementTypes)); } else { - throw new IllegalStateException( - String.format("Array field %s has invalid items property: %s", fieldName, items)); + LOGGER.warn("Array field \"{}\" has invalid items specification: {}. It will default to an array of strings.", fieldName, items); + fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA)); } } case OBJECT -> fieldSchema = getAvroSchema(fieldDefinition, fieldName, fieldNamespace, false, appendExtraProps, addStringToLogicalTypes, false); - default -> throw new IllegalStateException( - String.format("Unexpected type for field %s: %s", fieldName, fieldType)); + default -> { + LOGGER.warn("Field \"{}\" has invalid type definition: {}. It will default to string.", fieldName, fieldDefinition); + fieldSchema = Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA); + } } return fieldSchema; } @@ -267,7 +279,6 @@ List parseJsonTypeUnion(final String fieldName, final ArrayNode types, final boolean appendExtraProps, final boolean addStringToLogicalTypes) { - final List typeList = MoreIterators.toList(types.elements()); final List schemas = MoreIterators.toList(types.elements()) .stream() .flatMap(definition -> getNonNullTypes(fieldName, definition).stream().flatMap(type -> { diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json index 92e989ec1ab48..1a490a2835c28 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/json_conversion_test_cases.json @@ -1512,5 +1512,55 @@ ], "_airbyte_additional_properties": null } + }, + { + "schemaName": "array_field_with_empty_items", + "namespace": "namespace20", + "appendAirbyteFields": false, + "jsonSchema": { + "type": "object", + "properties": { + "array_field": { + "type": "array", + "items": {} + } + } + }, + "jsonObject": { + "array_field": [1234, true, "false", 0.001] + }, + "avroSchema": { + "type": "record", + "name": "array_field_with_empty_items", + "namespace": "namespace20", + "fields": [ + { + "name": "array_field", + "type": [ + "null", + { + "type": "array", + "items": ["null", "string"] + } + ], + "default": null + }, + { + "name": "_airbyte_additional_properties", + "type": [ + "null", + { + "type": "map", + "values": "string" + } + ], + "default": null + } + ] + }, + "avroObject": { + "array_field": ["1234", "true", "false", "0.001"], + "_airbyte_additional_properties": null + } } ] diff --git a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/type_conversion_test_cases.json b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/type_conversion_test_cases.json index 39af73a95cf26..fabee9775aa85 100644 --- a/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/type_conversion_test_cases.json +++ b/airbyte-integrations/connectors/destination-s3/src/test/resources/parquet/json_schema_converter/type_conversion_test_cases.json @@ -201,7 +201,7 @@ ] }, { - "fieldName": "array_field_without_items_type", + "fieldName": "array_field_without_items", "jsonFieldSchema": { "type": "array" }, @@ -212,5 +212,24 @@ "items": ["null", "string"] } ] + }, + { + "fieldName": "array_field_with_empty_items", + "jsonFieldSchema": { + "type": "array", + "items": {} + }, + "avroFieldType": [ + "null", + { + "type": "array", + "items": ["null", "string"] + } + ] + }, + { + "fieldName": "field_without_type", + "jsonFieldSchema": {}, + "avroFieldType": ["null", "string"] } ] diff --git a/docs/integrations/destinations/gcs.md b/docs/integrations/destinations/gcs.md index 2bb45eee300a1..eb7dbf37d5878 100644 --- a/docs/integrations/destinations/gcs.md +++ b/docs/integrations/destinations/gcs.md @@ -229,6 +229,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.1.20 | 2022-01-11 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Avro & Parquet: support array field with unknown item type; default any improperly typed field to string. | | 0.1.19 | 2022-01-10 | [\#9121](https://github.com/airbytehq/airbyte/pull/9121) | Fixed check method for GCS mode to verify if all roles assigned to user | | 0.1.18 | 2021-12-30 | [\#8809](https://github.com/airbytehq/airbyte/pull/8809) | Update connector fields title/description | | 0.1.17 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types | diff --git a/docs/integrations/destinations/s3.md b/docs/integrations/destinations/s3.md index cd887d24c8c5c..79a73bb120eb8 100644 --- a/docs/integrations/destinations/s3.md +++ b/docs/integrations/destinations/s3.md @@ -223,6 +223,7 @@ Under the hood, an Airbyte data stream in Json schema is first converted to an A | Version | Date | Pull Request | Subject | | :--- | :--- | :--- | :--- | +| 0.2.3 | 2022-01-11 | [\#9367](https://github.com/airbytehq/airbyte/pull/9367) | Avro & Parquet: support array field with unknown item type; default any improperly typed field to string. | | 0.2.2 | 2021-12-21 | [\#8574](https://github.com/airbytehq/airbyte/pull/8574) | Added namespace to Avro and Parquet record types | | 0.2.1 | 2021-12-20 | [\#8974](https://github.com/airbytehq/airbyte/pull/8974) | Release a new version to ensure there is no excessive logging. | | 0.2.0 | 2021-12-15 | [\#8607](https://github.com/airbytehq/airbyte/pull/8607) | Change the output filename for CSV files - it's now `bucketPath/namespace/streamName/timestamp_epochMillis_randomUuid.csv` | diff --git a/docs/understanding-airbyte/json-avro-conversion.md b/docs/understanding-airbyte/json-avro-conversion.md index d5e48d3f6bb7f..25472d897c9c2 100644 --- a/docs/understanding-airbyte/json-avro-conversion.md +++ b/docs/understanding-airbyte/json-avro-conversion.md @@ -161,7 +161,7 @@ This is not supported in Avro schema. As a compromise, the converter creates a u } ``` -If the Json array has multiple object items, these objects will be recursively merged into one Avro record. For example, the following Json array expects two different objects, each with a different `id` field. +If the Json array has multiple object items, these objects will be recursively merged into one Avro record. For example, the following Json array expects two different objects. The first object has an `id` field, and second has an `id` and `message` field. Their `id` fields have slightly different types. Json schema: @@ -223,7 +223,7 @@ Json object: } ``` -Furthermore, the fields under the `id` record, `id_part_1` and `id_part_2`, will also have their schemas merged. +After conversion, the two object schemas will be merged into one. Furthermore, the fields under the `id` record, `id_part_1` and `id_part_2`, will also be merged. In this way, all possible valid elements from the Json array can be converted to Avro records. Avro schema: @@ -468,6 +468,10 @@ the corresponding Avro schema and record will be: } ``` +### Untyped Field + +Any field without property type specification will default to a `string` field, and its value will be serialized to string. + ## Example Based on the above rules, here is an overall example. Given the following Json schema: