Skip to content

Commit

Permalink
🎉 New Destination: S3-Glue (#18695)
Browse files Browse the repository at this point in the history
* destination-s3-glue

* reimplement schema generation to use recursion

* configure serialization library

* include data on update

* flatten data field

* improve location path

* generate s3-glue destination

* refactor s3-glue as separate connector

* add acceptance tests and cleanup

* check field presence

* override test image name

* add s3-glue readme

* format files

* add redpanda readme

* add s3 glue to source def

* auto-bump connector version

Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored Nov 17, 2022
1 parent 86eb221 commit 34d4a71
Show file tree
Hide file tree
Showing 24 changed files with 1,535 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@
memory_limit: "1Gi"
memory_request: "1Gi"
releaseStage: generally_available
- name: S3 Glue
destinationDefinitionId: 471e5cab-8ed1-49f3-ba11-79c687784737
dockerRepository: airbyte/destination-s3-glue
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.com/integrations/destinations/s3-glue
releaseStage: alpha
- name: SFTP-JSON
destinationDefinitionId: e9810f61-4bab-46d2-bb22-edfc902e0644
dockerRepository: airbyte/destination-sftp-json
Expand Down
187 changes: 187 additions & 0 deletions airbyte-config/init/src/main/resources/seed/destination_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5366,6 +5366,193 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-s3-glue:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/s3"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "S3 Destination Spec"
type: "object"
required:
- "s3_bucket_name"
- "s3_bucket_path"
- "s3_bucket_region"
- "format"
- "glue_database"
- "glue_serialization_library"
properties:
access_key_id:
type: "string"
description: "The access key ID to access the S3 bucket. Airbyte requires\
\ Read and Write permissions to the given bucket. Read more <a href=\"\
https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\"\
>here</a>."
title: "S3 Key ID"
airbyte_secret: true
examples:
- "A012345678910EXAMPLE"
order: 0
secret_access_key:
type: "string"
description: "The corresponding secret to the access key ID. Read more <a\
\ href=\"https://docs.aws.amazon.com/general/latest/gr/aws-sec-cred-types.html#access-keys-and-secret-access-keys\"\
>here</a>"
title: "S3 Access Key"
airbyte_secret: true
examples:
- "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"
order: 1
s3_bucket_name:
title: "S3 Bucket Name"
type: "string"
description: "The name of the S3 bucket. Read more <a href=\"https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html\"\
>here</a>."
examples:
- "airbyte_sync"
order: 2
s3_bucket_path:
title: "S3 Bucket Path"
description: "Directory under the S3 bucket where data will be written.\
\ Read more <a href=\"https://docs.airbyte.com/integrations/destinations/s3#:~:text=to%20format%20the-,bucket%20path,-%3A\"\
>here</a>"
type: "string"
examples:
- "data_sync/test"
order: 3
s3_bucket_region:
title: "S3 Bucket Region"
type: "string"
default: ""
description: "The region of the S3 bucket. See <a href=\"https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-regions-availability-zones.html#concepts-available-regions\"\
>here</a> for all region codes."
enum:
- ""
- "us-east-1"
- "us-east-2"
- "us-west-1"
- "us-west-2"
- "af-south-1"
- "ap-east-1"
- "ap-south-1"
- "ap-northeast-1"
- "ap-northeast-2"
- "ap-northeast-3"
- "ap-southeast-1"
- "ap-southeast-2"
- "ca-central-1"
- "cn-north-1"
- "cn-northwest-1"
- "eu-central-1"
- "eu-north-1"
- "eu-south-1"
- "eu-west-1"
- "eu-west-2"
- "eu-west-3"
- "sa-east-1"
- "me-south-1"
- "us-gov-east-1"
- "us-gov-west-1"
order: 4
format:
title: "Output Format"
type: "object"
description: "Format of the data output. See <a href=\"https://docs.airbyte.com/integrations/destinations/s3/#supported-output-schema\"\
>here</a> for more details"
oneOf:
- title: "JSON Lines: Newline-delimited JSON"
required:
- "format_type"
properties:
format_type:
title: "Format Type"
type: "string"
enum:
- "JSONL"
default: "JSONL"
compression:
title: "Compression"
type: "object"
description: "Whether the output files should be compressed. If compression\
\ is selected, the output filename will have an extra extension\
\ (GZIP: \".jsonl.gz\")."
oneOf:
- title: "No Compression"
requires: "compression_type"
properties:
compression_type:
type: "string"
enum:
- "No Compression"
default: "No Compression"
- title: "GZIP"
requires: "compression_type"
properties:
compression_type:
type: "string"
enum:
- "GZIP"
default: "GZIP"
flatten_data:
title: "Flatten Data"
description: "If true data will be flattened and won't be nested in\
\ the _airbyte_data field"
type: "boolean"
default: true
order: 5
s3_endpoint:
title: "Endpoint"
type: "string"
default: ""
description: "Your S3 endpoint url. Read more <a href=\"https://docs.aws.amazon.com/general/latest/gr/s3.html#:~:text=Service%20endpoints-,Amazon%20S3%20endpoints,-When%20you%20use\"\
>here</a>"
examples:
- "http://localhost:9000"
order: 6
s3_path_format:
title: "S3 Path Format"
description: "Format string on how data will be organized inside the S3\
\ bucket directory. Read more <a href=\"https://docs.airbyte.com/integrations/destinations/s3#:~:text=The%20full%20path%20of%20the%20output%20data%20with%20the%20default%20S3%20path%20format\"\
>here</a>"
type: "string"
examples:
- "${NAMESPACE}/${STREAM_NAME}/${YEAR}_${MONTH}_${DAY}_${EPOCH}_"
order: 7
file_name_pattern:
type: "string"
description: "The pattern allows you to set the file-name format for the\
\ S3 staging file(s)"
title: "S3 Filename pattern"
examples:
- "{date}"
- "{date:yyyy_MM}"
- "{timestamp}"
- "{part_number}"
- "{sync_id}"
order: 8
glue_database:
type: "string"
description: "Name of the glue database for creating the tables, leave blank\
\ if no integration"
title: "Glue database name"
examples:
- "airbyte_database"
order: 9
glue_serialization_library:
title: "Serialization Library"
description: "The library that your query engine will use for reading and\
\ writing data in your lake."
type: "string"
enum:
- "org.openx.data.jsonserde.JsonSerDe"
- "org.apache.hive.hcatalog.data.JsonSerDe"
default: "org.openx.data.jsonserde.JsonSerDe"
order: 10
supportsIncremental: true
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-sftp-json:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/sftp-json"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ public abstract class BaseS3Destination extends BaseConnector implements Destina

private final NamingConventionTransformer nameTransformer;

public BaseS3Destination() {
protected BaseS3Destination() {
this(new S3DestinationConfigFactory());
}

public BaseS3Destination(final S3DestinationConfigFactory configFactory) {
protected BaseS3Destination(final S3DestinationConfigFactory configFactory) {
this.configFactory = configFactory;
this.nameTransformer = new S3NameTransformer();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ private static List<WriteConfig> createWriteConfigs(final BlobStorageOperations
.collect(Collectors.toList());
}

private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
final BlobStorageOperations storageOperations,
private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(final BlobStorageOperations storageOperations,
final NamingConventionTransformer namingResolver,
final S3DestinationConfig s3Config) {
return stream -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public final class S3DestinationConstants {
// gzip compression for CSV and JSONL
public static final String COMPRESSION_ARG_NAME = "compression";
public static final String COMPRESSION_TYPE_ARG_NAME = "compression_type";
public static final String FLATTEN_DATA = "flatten_data";
public static final CompressionType DEFAULT_COMPRESSION_TYPE = CompressionType.GZIP;

private S3DestinationConstants() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.airbyte.integrations.destination.s3.jsonl;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.functional.CheckedBiFunction;
Expand All @@ -21,6 +23,7 @@
import java.io.OutputStream;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;

Expand All @@ -30,10 +33,13 @@ public class JsonLSerializedBuffer extends BaseSerializedBuffer {

private PrintWriter printWriter;

protected JsonLSerializedBuffer(final BufferStorage bufferStorage, final boolean gzipCompression) throws Exception {
private final boolean flattenData;

protected JsonLSerializedBuffer(final BufferStorage bufferStorage, final boolean gzipCompression, final boolean flattenData) throws Exception {
super(bufferStorage);
// we always want to compress jsonl files
withCompression(gzipCompression);
this.flattenData = flattenData;
}

@Override
Expand All @@ -46,7 +52,12 @@ protected void writeRecord(final AirbyteRecordMessage recordMessage) {
final ObjectNode json = MAPPER.createObjectNode();
json.put(JavaBaseConstants.COLUMN_NAME_AB_ID, UUID.randomUUID().toString());
json.put(JavaBaseConstants.COLUMN_NAME_EMITTED_AT, recordMessage.getEmittedAt());
json.set(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getData());
if (flattenData) {
Map<String, JsonNode> data = MAPPER.convertValue(recordMessage.getData(), new TypeReference<>() {});
json.setAll(data);
} else {
json.set(JavaBaseConstants.COLUMN_NAME_DATA, recordMessage.getData());
}
printWriter.println(Jsons.serialize(json));
}

Expand All @@ -66,7 +77,8 @@ public static CheckedBiFunction<AirbyteStreamNameNamespacePair, ConfiguredAirbyt
final CompressionType compressionType = config == null
? S3DestinationConstants.DEFAULT_COMPRESSION_TYPE
: config.getCompressionType();
return new JsonLSerializedBuffer(createStorageFunction.call(), compressionType != CompressionType.NO_COMPRESSION);
final boolean flattenData = config != null && config.getFlattenData();
return new JsonLSerializedBuffer(createStorageFunction.call(), compressionType != CompressionType.NO_COMPRESSION, flattenData);
};

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.integrations.destination.s3.S3DestinationConstants.COMPRESSION_ARG_NAME;
import static io.airbyte.integrations.destination.s3.S3DestinationConstants.DEFAULT_COMPRESSION_TYPE;
import static io.airbyte.integrations.destination.s3.S3DestinationConstants.FLATTEN_DATA;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.integrations.destination.s3.S3Format;
Expand All @@ -20,10 +21,13 @@ public class S3JsonlFormatConfig implements S3FormatConfig {

private final CompressionType compressionType;

private final boolean flattenData;

public S3JsonlFormatConfig(final JsonNode formatConfig) {
this.compressionType = formatConfig.has(COMPRESSION_ARG_NAME)
? CompressionTypeHelper.parseCompressionType(formatConfig.get(COMPRESSION_ARG_NAME))
: DEFAULT_COMPRESSION_TYPE;
this.flattenData = formatConfig.has(FLATTEN_DATA) && formatConfig.get(FLATTEN_DATA).asBoolean();
}

@Override
Expand All @@ -40,10 +44,15 @@ public CompressionType getCompressionType() {
return compressionType;
}

public boolean getFlattenData() {
return flattenData;
}

@Override
public String toString() {
return "S3JsonlFormatConfig{" +
", compression=" + compressionType.name() +
"compressionType=" + compressionType +
", flattenData=" + flattenData +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
18 changes: 18 additions & 0 deletions airbyte-integrations/connectors/destination-s3-glue/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM airbyte/integration-base-java:dev AS build

WORKDIR /airbyte
ENV APPLICATION destination-s3-glue

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar

FROM airbyte/integration-base-java:dev

WORKDIR /airbyte
ENV APPLICATION destination-s3-glue

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-s3-glue
Loading

0 comments on commit 34d4a71

Please sign in to comment.