Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination S3: add LZO compression support #15394

Merged
merged 17 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@
- name: S3
destinationDefinitionId: 4816b78f-1489-44c1-9060-4b19d5fa9362
dockerRepository: airbyte/destination-s3
dockerImageTag: 0.3.12
dockerImageTag: 0.3.13
documentationUrl: https://docs.airbyte.io/integrations/destinations/s3
icon: s3.svg
resourceRequirements:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3974,7 +3974,7 @@
supported_destination_sync_modes:
- "append"
- "overwrite"
- dockerImage: "airbyte/destination-s3:0.3.12"
- dockerImage: "airbyte/destination-s3:0.3.13"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/destinations/s3"
connectionSpecification:
Expand Down
29 changes: 27 additions & 2 deletions airbyte-integrations/connectors/destination-s3/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,31 @@ WORKDIR /airbyte
ENV APPLICATION destination-s3

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.3.12
RUN /bin/bash -c 'set -e && \
ARCH=`uname -m` && \
if [ "$ARCH" == "x86_64" ] || [ "$ARCH" = "amd64" ]; then \
echo "$ARCH" && \
apt-get update; \
apt-get install lzop liblzo2-2 liblzo2-dev -y; \
elif [ "$ARCH" == "aarch64" ] || [ "$ARCH" = "arm64" ]; then \
echo "$ARCH" && \
apt-get update; \
apt-get install lzop liblzo2-2 liblzo2-dev wget curl unzip zip build-essential maven git -y; \
wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz -P /tmp; \
cd /tmp && tar xvfz lzo-2.10.tar.gz; \
cd /tmp/lzo-2.10/ && ./configure --enable-shared --prefix /usr/local/lzo-2.10; \
cd /tmp/lzo-2.10/ && make; \
cd /tmp/lzo-2.10/ && make install; \
git clone https://github.com/twitter/hadoop-lzo.git /usr/lib/hadoop/lib/hadoop-lzo/; \
curl -s "https://get.sdkman.io" | bash; \
source /root/.sdkman/bin/sdkman-init.sh; \
sdk install java 8.0.342-librca; \
sdk use java 8.0.342-librca; \
cd /usr/lib/hadoop/lib/hadoop-lzo/ && C_INCLUDE_PATH=/usr/local/lzo-2.10/include LIBRARY_PATH=/usr/local/lzo-2.10/lib mvn clean package; \
find /usr/lib/hadoop/lib/hadoop-lzo/ -name '*libgplcompression*' -exec cp {} /usr/lib/ \; ;\
else \
echo "unknown arch" ;\
fi'

LABEL io.airbyte.version=0.3.13
LABEL io.airbyte.name=airbyte/destination-s3
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,12 @@ plugins {
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}
repositories {
mavenCentral()
maven {
url = uri("https://maven.twttr.com")
}
}

application {
mainClass = 'io.airbyte.integrations.destination.s3.S3Destination'
Expand Down Expand Up @@ -32,7 +38,7 @@ dependencies {
}
implementation ('org.apache.parquet:parquet-avro:1.12.3') { exclude group: 'org.slf4j', module: 'slf4j-log4j12'}
implementation ('com.github.airbytehq:json-avro-converter:1.0.1') { exclude group: 'ch.qos.logback', module: 'logback-classic'}

implementation group: 'com.hadoop.gplcompression', name: 'hadoop-lzo', version: '0.4.20'
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.xerial.snappy:snappy-java:1.1.8.4'
testImplementation "org.mockito:mockito-inline:4.1.0"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.s3.util;

import io.airbyte.commons.io.LineGobbler;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JavaProcessRunner {

private static final Logger LOGGER = LoggerFactory.getLogger(JavaProcessRunner.class);

public static void runProcess(final String path, final Runtime run, final String... commands) throws IOException, InterruptedException {
LOGGER.info("Running process: " + Arrays.asList(commands));
final Process pr = path.equals(System.getProperty("user.dir")) ? run.exec(commands) : run.exec(commands, null, new File(path));
LineGobbler.gobble(pr.getErrorStream(), LOGGER::error);
LineGobbler.gobble(pr.getInputStream(), LOGGER::info);
if (!pr.waitFor(10, TimeUnit.MINUTES)) {
pr.destroy();
throw new RuntimeException("Timeout while executing: " + Arrays.toString(commands));
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.integrations.destination.s3.parquet;

import static io.airbyte.integrations.destination.s3.util.JavaProcessRunner.runProcess;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

Expand Down Expand Up @@ -77,6 +78,51 @@ public void testCompressedParquetWriter() throws Exception {
runTest(195L, 215L, config, getExpectedString());
}

private static String resolveArchitecture() {
return System.getProperty("os.name").replace(' ', '_') + "-" + System.getProperty("os.arch") + "-" + System.getProperty("sun.arch.data.model");
}

@Test
public void testLzoCompressedParquet() throws Exception {
Copy link
Contributor

@girarda girarda Aug 10, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to install these dependencies in the test?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this testing?
this unit test emulates the same behavior as described in the dockerfile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to install these dependencies in the test?

unit tests does not use builded destination-s3 image

final String currentDir = System.getProperty("user.dir");
Runtime runtime = Runtime.getRuntime();
final String architecture = resolveArchitecture();
if (architecture.equals("Linux-amd64-64") || architecture.equals("Linux-x86_64-64")) {
runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get update");
runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get install lzop liblzo2-2 liblzo2-dev -y");
runLzoParquetTest();
} else if (architecture.equals("Linux-aarch64-64") || architecture.equals("Linux-arm64-64")) {
runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get update");
runProcess(currentDir, runtime, "/bin/sh", "-c", "apt-get install lzop liblzo2-2 liblzo2-dev " +
"wget curl unzip zip build-essential maven git -y");
runProcess(currentDir, runtime, "/bin/sh", "-c", "wget http://www.oberhumer.com/opensource/lzo/download/lzo-2.10.tar.gz -P /usr/local/tmp");
runProcess("/usr/local/tmp/", runtime, "/bin/sh", "-c", "tar xvfz lzo-2.10.tar.gz");
runProcess("/usr/local/tmp/lzo-2.10/", runtime, "/bin/sh", "-c", "./configure --enable-shared --prefix /usr/local/lzo-2.10");
runProcess("/usr/local/tmp/lzo-2.10/", runtime, "/bin/sh", "-c", "make && make install");
runProcess(currentDir, runtime, "/bin/sh", "-c", "git clone https://github.com/twitter/hadoop-lzo.git /usr/lib/hadoop/lib/hadoop-lzo/");
runProcess(currentDir, runtime, "/bin/sh", "-c", "curl -s https://get.sdkman.io | bash");
runProcess(currentDir, runtime, "/bin/bash", "-c", "source /root/.sdkman/bin/sdkman-init.sh;" +
" sdk install java 8.0.342-librca;" +
" sdk use java 8.0.342-librca;" +
" cd /usr/lib/hadoop/lib/hadoop-lzo/ " +
"&& C_INCLUDE_PATH=/usr/local/lzo-2.10/include " +
"LIBRARY_PATH=/usr/local/lzo-2.10/lib mvn clean package");
runProcess(currentDir, runtime, "/bin/sh", "-c",
"find /usr/lib/hadoop/lib/hadoop-lzo/ -name '*libgplcompression*' -exec cp {} /usr/lib/ \\;");
runLzoParquetTest();
}
}

private void runLzoParquetTest() throws Exception {
final S3DestinationConfig config = S3DestinationConfig.getS3DestinationConfig(Jsons.jsonNode(Map.of(
"format", Map.of(
"format_type", "parquet",
"compression_codec", "LZO"),
"s3_bucket_name", "test",
"s3_bucket_region", "us-east-2")));
runTest(195L, 215L, config, getExpectedString());
}

private static String getExpectedString() {
return "{\"_airbyte_ab_id\": \"<UUID>\", \"_airbyte_emitted_at\": \"<timestamp>\", "
+ "\"field1\": 10000.0, \"another_field\": true, "
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/s3.md
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ In order for everything to work correctly, it is also necessary that the user wh

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.3.13 | 2022-08-09 | [\#15394](https://github.com/airbytehq/airbyte/pull/15394) | Added LZO compression support to Parquet format |
| 0.3.12 | 2022-08-05 | [\#14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiple log bindings |
| 0.3.11 | 2022-07-15 | [\#14494](https://github.com/airbytehq/airbyte/pull/14494) | Make S3 output filename configurable. |
| 0.3.10 | 2022-06-30 | [\#14332](https://github.com/airbytehq/airbyte/pull/14332) | Change INSTANCE_PROFILE to use `AWSDefaultProfileCredential`, which supports more authentications on AWS |
Expand Down