Skip to content

Commit

Permalink
🎉 Update destination-s3 to handle the new data types protocol (#20088)
Browse files Browse the repository at this point in the history
* Update base-java-s3 to support V1 protocol

* Fix formatting

* Azur Blob test fix

* Destination GCS test fix

* java-base-s3 correct test dataset

* Destination GCS test fix

* Refactor tests comparators from epoch

* Remove bignumber from V1 dataset

* Clean up

* Update json-avro-converter version

* Bump version + changelog

* Base64 type should be decoded for s3 tests

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
suhomud and octavia-squidington-iii authored Dec 15, 2022
1 parent 0da45bc commit 1b8376e
Show file tree
Hide file tree
Showing 46 changed files with 2,767 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.17
dockerImageTag: 0.3.18
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5297,7 +5297,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.17"
- dockerImage: "airbyte/destination-s3:0.3.18"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/s3"
connectionSpecification:
Expand Down
4 changes: 2 additions & 2 deletions airbyte-integrations/bases/base-java-s3/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ dependencies {
implementation 'com.github.alexmojaki:s3-stream-upload:2.2.2'

implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
implementation group: 'com.hadoop.gplcompression', name: 'hadoop-lzo', version: '0.4.20'

// parquet
Expand All @@ -28,7 +28,7 @@ dependencies {
}

implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'}
implementation ('com.github.airbytehq:json-avro-converter:1.1.0') { exclude group: 'ch.qos.logback', module: 'logback-classic'}

testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.xerial.snappy:snappy-java:1.1.8.4'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,27 +15,48 @@
*/
public enum JsonSchemaType {

STRING("string", true, null, Schema.Type.STRING),
NUMBER_INT("number", true, "integer", Schema.Type.INT),
NUMBER_BIGINT("string", true, "big_integer", Schema.Type.STRING),
NUMBER_FLOAT("number", true, "float", Schema.Type.FLOAT),
NUMBER("number", true, null, Schema.Type.DOUBLE),
INTEGER("integer", true, null, Schema.Type.INT),
BOOLEAN("boolean", true, null, Schema.Type.BOOLEAN),
NULL("null", true, null, Schema.Type.NULL),
OBJECT("object", false, null, Schema.Type.RECORD),
ARRAY("array", false, null, Schema.Type.ARRAY),
COMBINED("combined", false, null, Schema.Type.UNION);
STRING_V1("WellKnownTypes.json#/definitions/String", Schema.Type.STRING),
INTEGER_V1("WellKnownTypes.json#/definitions/Integer", Schema.Type.INT),
NUMBER_V1("WellKnownTypes.json#/definitions/Number", Schema.Type.DOUBLE),
BOOLEAN_V1("WellKnownTypes.json#/definitions/Boolean", Schema.Type.BOOLEAN),
BINARY_DATA_V1("WellKnownTypes.json#/definitions/BinaryData", Schema.Type.BYTES),
DATE_V1("WellKnownTypes.json#/definitions/Date", Schema.Type.INT),
TIMESTAMP_WITH_TIMEZONE_V1("WellKnownTypes.json#/definitions/TimestampWithTimezone", Schema.Type.LONG),
TIMESTAMP_WITHOUT_TIMEZONE_V1("WellKnownTypes.json#/definitions/TimestampWithoutTimezone", Schema.Type.LONG),
TIME_WITH_TIMEZONE_V1("WellKnownTypes.json#/definitions/TimeWithTimezone", Schema.Type.LONG),
TIME_WITHOUT_TIMEZONE_V1("WellKnownTypes.json#/definitions/TimeWithoutTimezone", Schema.Type.LONG),
OBJECT("object", Schema.Type.RECORD),
ARRAY("array", Schema.Type.ARRAY),
COMBINED("combined", Schema.Type.UNION),
@Deprecated
STRING_V0("string", null, Schema.Type.STRING),
@Deprecated
NUMBER_INT_V0("number", "integer", Schema.Type.INT),
@Deprecated
NUMBER_BIGINT_V0("string", "big_integer", Schema.Type.STRING),
@Deprecated
NUMBER_FLOAT_V0("number", "float", Schema.Type.FLOAT),
@Deprecated
NUMBER_V0("number", null, Schema.Type.DOUBLE),
@Deprecated
INTEGER_V0("integer", null, Schema.Type.INT),
@Deprecated
BOOLEAN_V0("boolean", null, Schema.Type.BOOLEAN),
@Deprecated
NULL("null", null, Schema.Type.NULL);

private final String jsonSchemaType;
private final boolean isPrimitive;
private final Schema.Type avroType;
private final String jsonSchemaAirbyteType;
private String jsonSchemaAirbyteType;

JsonSchemaType(final String jsonSchemaType, final boolean isPrimitive, final String jsonSchemaAirbyteType, final Schema.Type avroType) {
JsonSchemaType(final String jsonSchemaType, final String jsonSchemaAirbyteType, final Schema.Type avroType) {
this.jsonSchemaType = jsonSchemaType;
this.jsonSchemaAirbyteType = jsonSchemaAirbyteType;
this.isPrimitive = isPrimitive;
this.avroType = avroType;
}

JsonSchemaType(final String jsonSchemaType, final Schema.Type avroType) {
this.jsonSchemaType = jsonSchemaType;
this.avroType = avroType;
}

Expand Down Expand Up @@ -75,10 +96,6 @@ public String getJsonSchemaType() {
return jsonSchemaType;
}

public boolean isPrimitive() {
return isPrimitive;
}

public Schema.Type getAvroType() {
return avroType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
*/
public class JsonToAvroSchemaConverter {

private static final String REFERENCE_TYPE = "$ref";
private static final String TYPE = "type";
private static final String AIRBYTE_TYPE = "airbyte_type";
private static final Schema UUID_SCHEMA = LogicalTypes.uuid()
Expand All @@ -54,7 +55,7 @@ static List<JsonSchemaType> getNonNullTypes(final String fieldName, final JsonNo
}

/**
* When no type is specified, it will default to string.
* When no type or $ref are specified, it will default to string.
*/
static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fieldDefinition) {
final Optional<JsonNode> combinedRestriction = getCombinedRestriction(fieldDefinition);
Expand All @@ -63,25 +64,31 @@ static List<JsonSchemaType> getTypes(final String fieldName, final JsonNode fiel
}

final JsonNode typeProperty = fieldDefinition.get(TYPE);
final JsonNode referenceType = fieldDefinition.get(REFERENCE_TYPE);

final JsonNode airbyteTypeProperty = fieldDefinition.get(AIRBYTE_TYPE);
final String airbyteType = airbyteTypeProperty == null ? null : airbyteTypeProperty.asText();
if (typeProperty == null || typeProperty.isNull()) {
LOGGER.warn("Field \"{}\" has no type specification. It will default to string", fieldName);
return Collections.singletonList(JsonSchemaType.STRING);
}

if (typeProperty.isArray()) {
if (typeProperty != null && typeProperty.isArray()) {
return MoreIterators.toList(typeProperty.elements()).stream()
.map(s -> JsonSchemaType.fromJsonSchemaType(s.asText()))
.collect(Collectors.toList());
}

if (typeProperty.isTextual()) {
if (hasTextValue(typeProperty)) {
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(typeProperty.asText(), airbyteType));
}

LOGGER.warn("Field \"{}\" has unexpected type {}. It will default to string.", fieldName, typeProperty);
return Collections.singletonList(JsonSchemaType.STRING);
if (hasTextValue(referenceType)) {
return Collections.singletonList(JsonSchemaType.fromJsonSchemaType(referenceType.asText(), airbyteType));
}

LOGGER.warn("Field \"{}\" has unexpected type {}. It will default to string.", fieldName, referenceType);
return Collections.singletonList(JsonSchemaType.STRING_V1);
}

private static boolean hasTextValue(JsonNode value) {
return value != null && !value.isNull() && value.isTextual();
}

static Optional<JsonNode> getCombinedRestriction(final JsonNode fieldDefinition) {
Expand Down Expand Up @@ -218,8 +225,14 @@ Schema parseSingleType(final String fieldName,

final Schema fieldSchema;
switch (fieldType) {
case INTEGER, NUMBER, NUMBER_INT, NUMBER_BIGINT, NUMBER_FLOAT, BOOLEAN -> fieldSchema = Schema.create(fieldType.getAvroType());
case STRING -> {
case INTEGER_V1, NUMBER_V1, BOOLEAN_V1, STRING_V1, BINARY_DATA_V1 -> fieldSchema = Schema.create(fieldType.getAvroType());
case DATE_V1 -> fieldSchema = LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
case TIMESTAMP_WITH_TIMEZONE_V1, TIMESTAMP_WITHOUT_TIMEZONE_V1 -> fieldSchema = LogicalTypes.timestampMicros()
.addToSchema(Schema.create(Schema.Type.LONG));
case TIME_WITH_TIMEZONE_V1, TIME_WITHOUT_TIMEZONE_V1 -> fieldSchema = LogicalTypes.timeMicros().addToSchema(Schema.create(Schema.Type.LONG));
case INTEGER_V0, NUMBER_V0, NUMBER_INT_V0, NUMBER_BIGINT_V0, NUMBER_FLOAT_V0, BOOLEAN_V0 -> fieldSchema =
Schema.create(fieldType.getAvroType());
case STRING_V0 -> {
if (fieldDefinition.has("format")) {
final String format = fieldDefinition.get("format").asText();
fieldSchema = switch (format) {
Expand All @@ -244,13 +257,14 @@ Schema parseSingleType(final String fieldName,
LOGGER.warn("Array field \"{}\" does not specify the items type. It will default to an array of strings", fieldName);
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else if (items.isObject()) {
if (!items.has("type") || items.get("type").isNull()) {
LOGGER.warn("Array field \"{}\" does not specify the items type. it will default to an array of strings", fieldName);
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
} else {
if ((items.has("type") && !items.get("type").isNull()) ||
items.has("$ref") && !items.get("$ref").isNull()) {
// Objects inside Json array has no names. We name it with the ".items" suffix.
final String elementFieldName = fieldName + ".items";
fieldSchema = Schema.createArray(parseJsonField(elementFieldName, fieldNamespace, items, appendExtraProps, addStringToLogicalTypes));
} else {
LOGGER.warn("Array field \"{}\" does not specify the items type. it will default to an array of strings", fieldName);
fieldSchema = Schema.createArray(Schema.createUnion(NULL_SCHEMA, STRING_SCHEMA));
}
} else if (items.isArray()) {
final List<Schema> arrayElementTypes =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,25 @@ public void testFromJsonSchemaType(String type, String airbyteType, JsonSchemaTy
public static class JsonSchemaTypeProvider implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
Arguments.of("number", "integer", JsonSchemaType.NUMBER_INT),
Arguments.of("string", "big_integer", JsonSchemaType.NUMBER_BIGINT),
Arguments.of("number", "float", JsonSchemaType.NUMBER_FLOAT),
Arguments.of("number", null, JsonSchemaType.NUMBER),
Arguments.of("string", null, JsonSchemaType.STRING),
Arguments.of("integer", null, JsonSchemaType.INTEGER),
Arguments.of("boolean", null, JsonSchemaType.BOOLEAN),
Arguments.of("WellKnownTypes.json#/definitions/Number", null, JsonSchemaType.NUMBER_V1),
Arguments.of("WellKnownTypes.json#/definitions/String", null, JsonSchemaType.STRING_V1),
Arguments.of("WellKnownTypes.json#/definitions/Integer", null, JsonSchemaType.INTEGER_V1),
Arguments.of("WellKnownTypes.json#/definitions/Boolean", null, JsonSchemaType.BOOLEAN_V1),
Arguments.of("WellKnownTypes.json#/definitions/BinaryData", null, JsonSchemaType.BINARY_DATA_V1),
Arguments.of("WellKnownTypes.json#/definitions/Date", null, JsonSchemaType.DATE_V1),
Arguments.of("WellKnownTypes.json#/definitions/TimestampWithTimezone", null, JsonSchemaType.TIMESTAMP_WITH_TIMEZONE_V1),
Arguments.of("WellKnownTypes.json#/definitions/TimestampWithoutTimezone", null, JsonSchemaType.TIMESTAMP_WITHOUT_TIMEZONE_V1),
Arguments.of("WellKnownTypes.json#/definitions/TimeWithTimezone", null, JsonSchemaType.TIME_WITH_TIMEZONE_V1),
Arguments.of("WellKnownTypes.json#/definitions/TimeWithoutTimezone", null, JsonSchemaType.TIME_WITHOUT_TIMEZONE_V1),
Arguments.of("number", "integer", JsonSchemaType.NUMBER_INT_V0),
Arguments.of("string", "big_integer", JsonSchemaType.NUMBER_BIGINT_V0),
Arguments.of("number", "float", JsonSchemaType.NUMBER_FLOAT_V0),
Arguments.of("number", null, JsonSchemaType.NUMBER_V0),
Arguments.of("string", null, JsonSchemaType.STRING_V0),
Arguments.of("integer", null, JsonSchemaType.INTEGER_V0),
Arguments.of("boolean", null, JsonSchemaType.BOOLEAN_V0),
Arguments.of("null", null, JsonSchemaType.NULL),
Arguments.of("object", null, JsonSchemaType.OBJECT),
Arguments.of("array", null, JsonSchemaType.ARRAY),
Expand Down
Loading

0 comments on commit 1b8376e

Please sign in to comment.