From dfb7b8cd3f22426ed0bc55f7b7fe83de4524a1d6 Mon Sep 17 00:00:00 2001 From: LiRen Tu Date: Thu, 6 Jan 2022 20:22:56 -0800 Subject: [PATCH] =?UTF-8?q?=F0=9F=90=9E=20Destination=20databricks:=20upda?= =?UTF-8?q?te=20jdbc=20driver=20to=20patch=20log4j=20(#7622)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Download spark jdbc driver in build cmd * Download jdbc driver in ci integration test * Update comments * Set up cloud sdk * Add comments * Download jdbc driver from databricks directly * Update readme * Use unzip command * Install unzip for databricks * Add databricks build status * Close database * Log more error information * Close database when checking connection * Update spec --- .github/workflows/publish-command.yml | 4 ++++ .github/workflows/test-command.yml | 4 ++++ .../src/main/resources/seed/source_specs.yaml | 2 +- airbyte-integrations/builds.md | 2 +- .../destination-databricks/README.md | 2 ++ .../DatabricksDestinationAcceptanceTest.java | 12 ++++++---- .../jdbc/copy/CopyConsumerFactory.java | 1 + .../jdbc/copy/CopyDestination.java | 3 +-- .../workers/DefaultCheckConnectionWorker.java | 2 +- tools/bin/ci_integration_test.sh | 6 +++++ tools/integrations/manage.sh | 6 +++++ tools/lib/databricks.sh | 24 +++++++++++++++++++ 12 files changed, 59 insertions(+), 9 deletions(-) create mode 100644 tools/lib/databricks.sh diff --git a/.github/workflows/publish-command.yml b/.github/workflows/publish-command.yml index 3b4cd245a375f..f73a4dacd4161 100644 --- a/.github/workflows/publish-command.yml +++ b/.github/workflows/publish-command.yml @@ -66,6 +66,10 @@ jobs: uses: actions/checkout@v2 with: repository: ${{github.event.pull_request.head.repo.full_name}} # always use the branch's repository + - name: Install Unzip for Databricks + if: github.event.inputs.connector == 'connectors/destination-databricks' + run: | + apt-get update && apt-get install -y unzip - name: Install Java uses: actions/setup-java@v1 with: diff --git a/.github/workflows/test-command.yml b/.github/workflows/test-command.yml index 000c6dd993b08..8c747eaf76451 100644 --- a/.github/workflows/test-command.yml +++ b/.github/workflows/test-command.yml @@ -61,6 +61,10 @@ jobs: uses: actions/checkout@v2 with: repository: ${{ github.event.inputs.repo }} + - name: Install Unzip for Databricks + if: github.event.inputs.connector == 'connectors/destination-databricks' + run: | + apt-get update && apt-get install -y unzip - name: Install Java uses: actions/setup-java@v1 with: diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 718f72a3fec34..49af7a5f0f6c6 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -1985,7 +1985,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-github:0.2.9" +- dockerImage: "airbyte/source-github:0.2.10" spec: documentationUrl: "https://docs.airbyte.io/integrations/sources/github" connectionSpecification: diff --git a/airbyte-integrations/builds.md b/airbyte-integrations/builds.md index 794c8fa333846..001ae80fd3248 100644 --- a/airbyte-integrations/builds.md +++ b/airbyte-integrations/builds.md @@ -114,7 +114,7 @@ | BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) | | ClickHouse | [![destination-clickhouse](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-clickhouse%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-clickhouse) | | Cassandra | [![destination-cassandra](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-cassandra%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-cassandra) | -| Databricks | (Temporarily Not Available) | +| Databricks | [![destination-databricks](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-databricks%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-databricks) | | Dev Null | [![destination-dev-null](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-dev-null%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-dev-null) | | Elasticsearch | (Temporarily Not Available) | | End-to-End Testing | [![destination-e2e-test](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-e2e-test%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-e2e-test) | diff --git a/airbyte-integrations/connectors/destination-databricks/README.md b/airbyte-integrations/connectors/destination-databricks/README.md index 5a9ab5bf1cb1e..5d96d1c7d0d31 100644 --- a/airbyte-integrations/connectors/destination-databricks/README.md +++ b/airbyte-integrations/connectors/destination-databricks/README.md @@ -6,6 +6,8 @@ For information about how to use this connector within Airbyte, see [the User Do ## Databricks JDBC Driver This connector requires a JDBC driver to connect to Databricks cluster. The driver is developed by Simba. Before downloading and using this driver, you must agree to the [JDBC ODBC driver license](https://databricks.com/jdbc-odbc-driver-license). This means that you can only use this driver to connector third party applications to Apache Spark SQL within a Databricks offering using the ODBC and/or JDBC protocols. The driver can be downloaded from [here](https://databricks.com/spark/jdbc-drivers-download). +The CI downloads the JDBC driver in [this script](https://github.com/airbytehq/airbyte/blob/master/tools/lib/databricks.sh). + This is currently a private connector that is only available in Airbyte Cloud. To build and publish this connector, first download the driver and put it under the `lib` directory. Please do not publish this connector publicly. We are working on a solution to publicize it. ## Local development diff --git a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java index 1bdda60327006..55f03862fa300 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-databricks/src/test-integration/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationAcceptanceTest.java @@ -127,13 +127,17 @@ protected void tearDown(final TestDestinationEnv testEnv) throws SQLException { .deleteObjects(new DeleteObjectsRequest(s3Config.getBucketName()).withKeys(keysToDelete)); LOGGER.info("Deleted {} file(s).", result.getDeletedObjects().size()); } + s3Client.shutdown(); // clean up database LOGGER.info("Dropping database schema {}", databricksConfig.getDatabaseSchema()); - final Database database = getDatabase(databricksConfig); - // we cannot use jooq dropSchemaIfExists method here because there is no proper dialect for - // Databricks, and it incorrectly quotes the schema name - database.query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", databricksConfig.getDatabaseSchema()))); + try (final Database database = getDatabase(databricksConfig)) { + // we cannot use jooq dropSchemaIfExists method here because there is no proper dialect for + // Databricks, and it incorrectly quotes the schema name + database.query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", databricksConfig.getDatabaseSchema()))); + } catch (final Exception e) { + throw new SQLException(e); + } } private static Database getDatabase(final DatabricksDestinationConfig databricksConfig) { diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java index 2a336586c8283..f27678d52d2d5 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyConsumerFactory.java @@ -148,6 +148,7 @@ private static void closeAsOneTransaction(final List streamCopiers for (final var copier : streamCopiers) { copier.removeFileAndDropTmpTable(); } + db.close(); } if (firstException != null) { throw firstException; diff --git a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java index 33bad47fdd9a1..fe3736956aed5 100644 --- a/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java +++ b/airbyte-integrations/connectors/destination-jdbc/src/main/java/io/airbyte/integrations/destination/jdbc/copy/CopyDestination.java @@ -54,10 +54,9 @@ public AirbyteConnectionStatus check(final JsonNode config) { .withMessage("Could not connect to the staging persistence with the provided configuration. \n" + e.getMessage()); } - try { + try (final JdbcDatabase database = getDatabase(config)) { final var nameTransformer = getNameTransformer(); final var outputSchema = nameTransformer.convertStreamName(config.get(schemaFieldName).asText()); - final JdbcDatabase database = getDatabase(config); AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations()); return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java index 1acb9ff03b414..d6a5204a1ff25 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultCheckConnectionWorker.java @@ -77,7 +77,7 @@ public StandardCheckConnectionOutput run(final StandardCheckConnectionInput inpu LOGGER.debug("Check connection job received output: {}", output); return output; } else { - throw new WorkerException("Error while getting checking connection."); + throw new WorkerException(String.format("Error checking connection, status: %s, exit code: %d", status, exitCode)); } } catch (final Exception e) { diff --git a/tools/bin/ci_integration_test.sh b/tools/bin/ci_integration_test.sh index b0f651c59b837..6c66eac1bdec3 100755 --- a/tools/bin/ci_integration_test.sh +++ b/tools/bin/ci_integration_test.sh @@ -3,6 +3,7 @@ set -e . tools/lib/lib.sh +. tools/lib/databricks.sh # runs integration tests for an integration name @@ -10,6 +11,7 @@ connector="$1" all_integration_tests=$(./gradlew integrationTest --dry-run | grep 'integrationTest SKIPPED' | cut -d: -f 4) run() { if [[ "$connector" == "all" ]] ; then + _get_databricks_jdbc_driver echo "Running: ./gradlew --no-daemon --scan integrationTest" ./gradlew --no-daemon --scan integrationTest else @@ -34,6 +36,10 @@ else integrationTestCommand=":airbyte-integrations:connectors:$connector:integrationTest" fi if [ -n "$selected_integration_test" ] ; then + if [[ "$selected_integration_test" == *"databricks"* ]] ; then + _get_databricks_jdbc_driver + fi + echo "Running: ./gradlew --no-daemon --scan $integrationTestCommand" ./gradlew --no-daemon --scan "$integrationTestCommand" else diff --git a/tools/integrations/manage.sh b/tools/integrations/manage.sh index 38395b060e068..f6ae667e3a240 100755 --- a/tools/integrations/manage.sh +++ b/tools/integrations/manage.sh @@ -4,6 +4,7 @@ set -e set -x . tools/lib/lib.sh +. tools/lib/databricks.sh USAGE=" Usage: $(basename "$0") @@ -37,6 +38,11 @@ cmd_build() { [ -d "$path" ] || error "Path must be the root path of the integration" local run_tests=$1; shift || run_tests=true + + if [[ "airbyte-integrations/connectors/destination-databricks" == "${path}" ]]; then + _get_databricks_jdbc_driver + fi + echo "Building $path" ./gradlew --no-daemon "$(_to_gradle_path "$path" clean)" ./gradlew --no-daemon "$(_to_gradle_path "$path" build)" diff --git a/tools/lib/databricks.sh b/tools/lib/databricks.sh new file mode 100644 index 0000000000000..72bb57c951246 --- /dev/null +++ b/tools/lib/databricks.sh @@ -0,0 +1,24 @@ +#!/usr/bin/env bash + +. tools/lib/lib.sh + +# Whoever runs this script must accept the following terms & conditions: +# https://databricks.com/jdbc-odbc-driver-license +_get_databricks_jdbc_driver() { + local driver_zip="SimbaSparkJDBC42-2.6.21.1039.zip" + local driver_file="SparkJDBC42.jar" + local driver_url="https://databricks-bi-artifacts.s3.us-east-2.amazonaws.com/simbaspark-drivers/jdbc/2.6.21/${driver_zip}" + local connector_path="airbyte-integrations/connectors/destination-databricks" + + if [[ -f "${connector_path}/lib/${driver_file}" ]] ; then + echo "[Databricks] Spark JDBC driver already exists" + else + echo "[Databricks] Downloading Spark JDBC driver..." + curl -o "${connector_path}/lib/${driver_zip}" "${driver_url}" + + echo "[Databricks] Extracting Spark JDBC driver..." + unzip "${connector_path}/lib/${driver_zip}" "${driver_file}" + mv "${driver_file}" "${connector_path}/lib/" + rm "${connector_path}/lib/${driver_zip}" + fi +}