Skip to content

Commit 393ba35

Browse files
🐛 fix s3/gcs bucket cleanup (#11728)
* Restrict bucket clean up * bumpversion * Fix GCS acceptance tests
1 parent 0fa9f12 commit 393ba35

File tree

18 files changed

+340
-90
lines changed

18 files changed

+340
-90
lines changed

airbyte-integrations/connectors/destination-gcs/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-gcs
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.2.1
19+
LABEL io.airbyte.version=0.2.2
2020
LABEL io.airbyte.name=airbyte/destination-gcs

airbyte-integrations/connectors/destination-gcs/src/main/java/io/airbyte/integrations/destination/gcs/GcsStorageOperations.java

+4-19
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66

77
import com.amazonaws.services.s3.AmazonS3;
88
import com.amazonaws.services.s3.model.DeleteObjectsRequest.KeyVersion;
9-
import com.amazonaws.services.s3.model.ObjectListing;
109
import io.airbyte.integrations.destination.NamingConventionTransformer;
1110
import io.airbyte.integrations.destination.s3.S3DestinationConfig;
1211
import io.airbyte.integrations.destination.s3.S3StorageOperations;
@@ -29,24 +28,10 @@ public GcsStorageOperations(final NamingConventionTransformer nameTransformer,
2928
* difference is that the AmazonS3#deleteObjects method is replaced with AmazonS3#deleteObject.
3029
*/
3130
@Override
32-
public void cleanUpBucketObject(final String objectPath, final List<String> stagedFiles) {
33-
final String bucket = s3Config.getBucketName();
34-
ObjectListing objects = s3Client.listObjects(bucket, objectPath);
35-
while (objects.getObjectSummaries().size() > 0) {
36-
final List<KeyVersion> keysToDelete = objects.getObjectSummaries()
37-
.stream()
38-
.map(obj -> new KeyVersion(obj.getKey()))
39-
.filter(obj -> stagedFiles.isEmpty() || stagedFiles.contains(obj.getKey()))
40-
.toList();
41-
for (final KeyVersion keyToDelete : keysToDelete) {
42-
s3Client.deleteObject(bucket, keyToDelete.getKey());
43-
}
44-
LOGGER.info("Storage bucket {} has been cleaned-up ({} objects were deleted)...", objectPath, keysToDelete.size());
45-
if (objects.isTruncated()) {
46-
objects = s3Client.listNextBatchOfObjects(objects);
47-
} else {
48-
break;
49-
}
31+
protected void cleanUpObjects(final String bucket, final List<KeyVersion> keysToDelete) {
32+
for (final KeyVersion keyToDelete : keysToDelete) {
33+
LOGGER.info("Deleting object {}", keyToDelete.getKey());
34+
s3Client.deleteObject(bucket, keyToDelete.getKey());
5035
}
5136
}
5237

airbyte-integrations/connectors/destination-gcs/src/main/resources/spec.json

+1-6
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,7 @@
99
"$schema": "http://json-schema.org/draft-07/schema#",
1010
"title": "GCS Destination Spec",
1111
"type": "object",
12-
"required": [
13-
"gcs_bucket_name",
14-
"gcs_bucket_path",
15-
"credential",
16-
"format"
17-
],
12+
"required": ["gcs_bucket_name", "gcs_bucket_path", "credential", "format"],
1813
"additionalProperties": false,
1914
"properties": {
2015
"gcs_bucket_name": {

airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsAvroDestinationAcceptanceTest.java

+34-4
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99

1010
import com.amazonaws.services.s3.model.S3Object;
1111
import com.amazonaws.services.s3.model.S3ObjectSummary;
12+
import com.fasterxml.jackson.core.JsonProcessingException;
1213
import com.fasterxml.jackson.databind.JsonNode;
14+
import com.fasterxml.jackson.databind.ObjectMapper;
1315
import com.fasterxml.jackson.databind.ObjectReader;
1416
import com.fasterxml.jackson.databind.node.ObjectNode;
1517
import io.airbyte.commons.json.Jsons;
@@ -18,6 +20,9 @@
1820
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
1921
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
2022
import io.airbyte.integrations.standardtest.destination.DateTimeUtils;
23+
import io.airbyte.protocol.models.AirbyteMessage;
24+
import io.airbyte.protocol.models.AirbyteMessage.Type;
25+
import io.airbyte.protocol.models.AirbyteRecordMessage;
2126
import java.util.ArrayList;
2227
import java.util.Arrays;
2328
import java.util.LinkedList;
@@ -28,6 +33,7 @@
2833
import org.apache.avro.generic.GenericData;
2934
import org.apache.avro.generic.GenericData.Record;
3035
import org.apache.avro.generic.GenericDatumReader;
36+
import org.apache.commons.lang3.StringUtils;
3137

3238
public class GcsAvroDestinationAcceptanceTest extends GcsDestinationAcceptanceTest {
3339

@@ -79,13 +85,13 @@ public boolean requiresDateTimeConversionForSync() {
7985
}
8086

8187
@Override
82-
public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNames) {
83-
for (String path : dateTimeFieldNames.keySet()) {
88+
public void convertDateTime(final ObjectNode data, final Map<String, String> dateTimeFieldNames) {
89+
for (final String path : dateTimeFieldNames.keySet()) {
8490
if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) {
85-
var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
91+
final var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
8692
pathFields.remove(0); // first element always empty string
8793
// if pathFields.size() == 1 -> /field else /field/nestedField..
88-
var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
94+
final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
8995
: "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1));
9096
switch (dateTimeFieldNames.get(path)) {
9197
case DATE_TIME -> {
@@ -114,4 +120,28 @@ public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNa
114120
}
115121
}
116122

123+
@Override
124+
protected void deserializeNestedObjects(final List<AirbyteMessage> messages, final List<AirbyteRecordMessage> actualMessages) {
125+
for (final AirbyteMessage message : messages) {
126+
if (message.getType() == Type.RECORD) {
127+
final var iterator = message.getRecord().getData().fieldNames();
128+
while (iterator.hasNext()) {
129+
final var fieldName = iterator.next();
130+
if (message.getRecord().getData().get(fieldName).isContainerNode()) {
131+
message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> {
132+
final var data = message.getRecord().getData().get(fieldName).get(f);
133+
final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f,
134+
dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\""));
135+
try {
136+
((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData));
137+
} catch (final JsonProcessingException e) {
138+
e.printStackTrace();
139+
}
140+
});
141+
}
142+
}
143+
}
144+
}
145+
}
146+
117147
}

airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsCsvDestinationAcceptanceTest.java

+14
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@
1212
import io.airbyte.integrations.base.JavaBaseConstants;
1313
import io.airbyte.integrations.destination.s3.S3Format;
1414
import io.airbyte.integrations.destination.s3.csv.S3CsvFormatConfig.Flattening;
15+
import io.airbyte.protocol.models.AirbyteCatalog;
16+
import io.airbyte.protocol.models.AirbyteMessage;
17+
import io.airbyte.protocol.models.AirbyteRecordMessage;
1518
import java.io.IOException;
1619
import java.io.InputStreamReader;
1720
import java.io.Reader;
@@ -109,4 +112,15 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
109112
return jsonRecords;
110113
}
111114

115+
@Override
116+
protected void retrieveRawRecordsAndAssertSameMessages(final AirbyteCatalog catalog,
117+
final List<AirbyteMessage> messages,
118+
final String defaultSchema)
119+
throws Exception {
120+
final List<AirbyteRecordMessage> actualMessages = retrieveRawRecords(catalog, defaultSchema);
121+
deserializeNestedObjects(messages, actualMessages);
122+
123+
assertSameMessages(messages, actualMessages, false);
124+
}
125+
112126
}

airbyte-integrations/connectors/destination-gcs/src/test-integration/java/io/airbyte/integrations/destination/gcs/GcsParquetDestinationAcceptanceTest.java

+34-6
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,9 @@
99

1010
import com.amazonaws.services.s3.model.S3Object;
1111
import com.amazonaws.services.s3.model.S3ObjectSummary;
12+
import com.fasterxml.jackson.core.JsonProcessingException;
1213
import com.fasterxml.jackson.databind.JsonNode;
14+
import com.fasterxml.jackson.databind.ObjectMapper;
1315
import com.fasterxml.jackson.databind.ObjectReader;
1416
import com.fasterxml.jackson.databind.node.ObjectNode;
1517
import io.airbyte.commons.json.Jsons;
@@ -19,6 +21,9 @@
1921
import io.airbyte.integrations.destination.s3.avro.JsonFieldNameUpdater;
2022
import io.airbyte.integrations.destination.s3.util.AvroRecordHelper;
2123
import io.airbyte.integrations.standardtest.destination.DateTimeUtils;
24+
import io.airbyte.protocol.models.AirbyteMessage;
25+
import io.airbyte.protocol.models.AirbyteMessage.Type;
26+
import io.airbyte.protocol.models.AirbyteRecordMessage;
2227
import java.io.IOException;
2328
import java.net.URI;
2429
import java.net.URISyntaxException;
@@ -28,6 +33,7 @@
2833
import java.util.List;
2934
import java.util.Map;
3035
import org.apache.avro.generic.GenericData;
36+
import org.apache.commons.lang3.StringUtils;
3137
import org.apache.hadoop.conf.Configuration;
3238
import org.apache.parquet.avro.AvroReadSupport;
3339
import org.apache.parquet.hadoop.ParquetReader;
@@ -85,13 +91,13 @@ public boolean requiresDateTimeConversionForSync() {
8591
}
8692

8793
@Override
88-
public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNames) {
89-
for (String path : dateTimeFieldNames.keySet()) {
94+
public void convertDateTime(final ObjectNode data, final Map<String, String> dateTimeFieldNames) {
95+
for (final String path : dateTimeFieldNames.keySet()) {
9096
if (!data.at(path).isMissingNode() && DateTimeUtils.isDateTimeValue(data.at(path).asText())) {
91-
var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
97+
final var pathFields = new ArrayList<>(Arrays.asList(path.split("/")));
9298
pathFields.remove(0); // first element always empty string
9399
// if pathFields.size() == 1 -> /field else /field/nestedField..
94-
var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
100+
final var pathWithoutLastField = pathFields.size() == 1 ? "/" + pathFields.get(0)
95101
: "/" + String.join("/", pathFields.subList(0, pathFields.size() - 1));
96102
switch (dateTimeFieldNames.get(path)) {
97103
case DATE_TIME -> {
@@ -103,7 +109,6 @@ public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNa
103109
((ObjectNode) data.at(pathWithoutLastField)).put(
104110
pathFields.get(pathFields.size() - 1),
105111
(DateTimeUtils.getEpochMicros(data.at(path).asText()) / 1000) * 1000);
106-
((ObjectNode) data.at(pathWithoutLastField)).set("_airbyte_additional_properties", null);
107112
}
108113
}
109114
case DATE -> {
@@ -113,12 +118,35 @@ public void convertDateTime(ObjectNode data, Map<String, String> dateTimeFieldNa
113118
} else {
114119
((ObjectNode) data.at(pathWithoutLastField)).put(pathFields.get(pathFields.size() - 1),
115120
DateTimeUtils.getEpochDay((data.at(path).asText())));
116-
((ObjectNode) data.at(pathWithoutLastField)).set("_airbyte_additional_properties", null);
117121
}
118122
}
119123
}
120124
}
121125
}
122126
}
123127

128+
@Override
129+
protected void deserializeNestedObjects(final List<AirbyteMessage> messages, final List<AirbyteRecordMessage> actualMessages) {
130+
for (final AirbyteMessage message : messages) {
131+
if (message.getType() == Type.RECORD) {
132+
final var iterator = message.getRecord().getData().fieldNames();
133+
while (iterator.hasNext()) {
134+
final var fieldName = iterator.next();
135+
if (message.getRecord().getData().get(fieldName).isContainerNode()) {
136+
message.getRecord().getData().get(fieldName).fieldNames().forEachRemaining(f -> {
137+
final var data = message.getRecord().getData().get(fieldName).get(f);
138+
final var wrappedData = String.format("{\"%s\":%s,\"_airbyte_additional_properties\":null}", f,
139+
dateTimeFieldNames.containsKey(f) || !data.isTextual() ? data.asText() : StringUtils.wrap(data.asText(), "\""));
140+
try {
141+
((ObjectNode) message.getRecord().getData()).set(fieldName, new ObjectMapper().readTree(wrappedData));
142+
} catch (final JsonProcessingException e) {
143+
e.printStackTrace();
144+
}
145+
});
146+
}
147+
}
148+
}
149+
}
150+
}
151+
124152
}

airbyte-integrations/connectors/destination-s3/Dockerfile

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,5 +16,5 @@ ENV APPLICATION destination-s3
1616

1717
COPY --from=build /airbyte /airbyte
1818

19-
LABEL io.airbyte.version=0.3.0
19+
LABEL io.airbyte.version=0.3.1
2020
LABEL io.airbyte.name=airbyte/destination-s3

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/BlobStorageOperations.java

+6-5
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,9 @@ public interface BlobStorageOperations {
1414
String getBucketObjectPath(String namespace, String streamName, DateTime writeDatetime, String customFormat);
1515

1616
/**
17-
* Create a storage object where to store data in the destination for a @param streamName using
18-
* location of @param objectPath
17+
* Create a storage object where to store data in the destination for a @param objectPath
1918
*/
20-
void createBucketObjectIfNotExists(String streamName) throws Exception;
19+
void createBucketObjectIfNotExists(String objectPath) throws Exception;
2120

2221
/**
2322
* Upload the data files into the storage area.
@@ -29,9 +28,11 @@ public interface BlobStorageOperations {
2928
/**
3029
* Remove files that were just stored in the bucket
3130
*/
32-
void cleanUpBucketObject(String streamName, List<String> stagedFiles) throws Exception;
31+
void cleanUpBucketObject(String objectPath, List<String> stagedFiles) throws Exception;
3332

34-
void dropBucketObject(String streamName);
33+
void cleanUpBucketObject(String namespace, String StreamName, String objectPath, String pathFormat);
34+
35+
void dropBucketObject(String objectPath);
3536

3637
boolean isValidData(JsonNode jsonNode);
3738

airbyte-integrations/connectors/destination-s3/src/main/java/io/airbyte/integrations/destination/s3/S3ConsumerFactory.java

+10-4
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(
8181
final String customOutputFormat = String.join("/", bucketPath, s3Config.getPathFormat());
8282
final String fullOutputPath = storageOperations.getBucketObjectPath(namespace, streamName, SYNC_DATETIME, customOutputFormat);
8383
final DestinationSyncMode syncMode = stream.getDestinationSyncMode();
84-
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, bucketPath, fullOutputPath, syncMode);
84+
final WriteConfig writeConfig = new WriteConfig(namespace, streamName, bucketPath, customOutputFormat, fullOutputPath, syncMode);
8585
LOGGER.info("Write config: {}", writeConfig);
8686
return writeConfig;
8787
};
@@ -95,10 +95,16 @@ private OnStartFunction onStartFunction(final BlobStorageOperations storageOpera
9595
final String namespace = writeConfig.getNamespace();
9696
final String stream = writeConfig.getStreamName();
9797
final String outputBucketPath = writeConfig.getOutputBucketPath();
98-
LOGGER.info("Clearing storage area in destination started for namespace {} stream {} bucketObject {}", namespace, stream, outputBucketPath);
98+
final String pathFormat = writeConfig.getPathFormat();
99+
LOGGER.info("Clearing storage area in destination started for namespace {} stream {} bucketObject {} pathFormat {}",
100+
namespace, stream, outputBucketPath, pathFormat);
99101
AirbyteSentry.executeWithTracing("PrepareStreamStorage",
100-
() -> storageOperations.dropBucketObject(outputBucketPath),
101-
Map.of("namespace", Objects.requireNonNullElse(namespace, "null"), "stream", stream, "storage", outputBucketPath));
102+
() -> storageOperations.cleanUpBucketObject(namespace, stream, outputBucketPath, pathFormat),
103+
Map.of(
104+
"namespace", Objects.requireNonNullElse(namespace, "null"),
105+
"stream", stream,
106+
"storage", outputBucketPath,
107+
"pathFormat", pathFormat));
102108
LOGGER.info("Clearing storage area in destination completed for namespace {} stream {} bucketObject {}", namespace, stream,
103109
outputBucketPath);
104110
}

0 commit comments

Comments
 (0)