Skip to content

Commit

Permalink
🐞 Destination databricks: update jdbc driver to patch log4j (#7622)
Browse files Browse the repository at this point in the history
* Download spark jdbc driver in build cmd

* Download jdbc driver in ci integration test

* Update comments

* Set up cloud sdk

* Add comments

* Download jdbc driver from databricks directly

* Update readme

* Use unzip command

* Install unzip for databricks

* Add databricks build status

* Close database

* Log more error information

* Close database when checking connection

* Update spec
  • Loading branch information
tuliren authored Jan 7, 2022
1 parent 511819b commit dfb7b8c
Show file tree
Hide file tree
Showing 12 changed files with 59 additions and 9 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/publish-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ jobs:
uses: actions/checkout@v2
with:
repository: ${{github.event.pull_request.head.repo.full_name}} # always use the branch's repository
- name: Install Unzip for Databricks
if: github.event.inputs.connector == 'connectors/destination-databricks'
run: |
apt-get update && apt-get install -y unzip
- name: Install Java
uses: actions/setup-java@v1
with:
Expand Down
4 changes: 4 additions & 0 deletions .github/workflows/test-command.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ jobs:
uses: actions/checkout@v2
with:
repository: ${{ github.event.inputs.repo }}
- name: Install Unzip for Databricks
if: github.event.inputs.connector == 'connectors/destination-databricks'
run: |
apt-get update && apt-get install -y unzip
- name: Install Java
uses: actions/setup-java@v1
with:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-github:0.2.9"
- dockerImage: "airbyte/source-github:0.2.10"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/github"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/builds.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@
| BigQuery | [![destination-bigquery](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-bigquery%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-bigquery) |
| ClickHouse | [![destination-clickhouse](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-clickhouse%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-clickhouse) |
| Cassandra | [![destination-cassandra](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-cassandra%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-cassandra) |
| Databricks | (Temporarily Not Available) |
| Databricks | [![destination-databricks](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-databricks%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-databricks) |
| Dev Null | [![destination-dev-null](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-dev-null%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-dev-null) |
| Elasticsearch | (Temporarily Not Available) |
| End-to-End Testing | [![destination-e2e-test](https://img.shields.io/endpoint?url=https%3A%2F%2Fdnsgjos7lj2fu.cloudfront.net%2Ftests%2Fsummary%2Fdestination-e2e-test%2Fbadge.json)](https://dnsgjos7lj2fu.cloudfront.net/tests/summary/destination-e2e-test) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ For information about how to use this connector within Airbyte, see [the User Do
## Databricks JDBC Driver
This connector requires a JDBC driver to connect to Databricks cluster. The driver is developed by Simba. Before downloading and using this driver, you must agree to the [JDBC ODBC driver license](https://databricks.com/jdbc-odbc-driver-license). This means that you can only use this driver to connector third party applications to Apache Spark SQL within a Databricks offering using the ODBC and/or JDBC protocols. The driver can be downloaded from [here](https://databricks.com/spark/jdbc-drivers-download).

The CI downloads the JDBC driver in [this script](https://github.com/airbytehq/airbyte/blob/master/tools/lib/databricks.sh).

This is currently a private connector that is only available in Airbyte Cloud. To build and publish this connector, first download the driver and put it under the `lib` directory. Please do not publish this connector publicly. We are working on a solution to publicize it.

## Local development
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,17 @@ protected void tearDown(final TestDestinationEnv testEnv) throws SQLException {
.deleteObjects(new DeleteObjectsRequest(s3Config.getBucketName()).withKeys(keysToDelete));
LOGGER.info("Deleted {} file(s).", result.getDeletedObjects().size());
}
s3Client.shutdown();

// clean up database
LOGGER.info("Dropping database schema {}", databricksConfig.getDatabaseSchema());
final Database database = getDatabase(databricksConfig);
// we cannot use jooq dropSchemaIfExists method here because there is no proper dialect for
// Databricks, and it incorrectly quotes the schema name
database.query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", databricksConfig.getDatabaseSchema())));
try (final Database database = getDatabase(databricksConfig)) {
// we cannot use jooq dropSchemaIfExists method here because there is no proper dialect for
// Databricks, and it incorrectly quotes the schema name
database.query(ctx -> ctx.execute(String.format("DROP SCHEMA IF EXISTS %s CASCADE;", databricksConfig.getDatabaseSchema())));
} catch (final Exception e) {
throw new SQLException(e);
}
}

private static Database getDatabase(final DatabricksDestinationConfig databricksConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private static void closeAsOneTransaction(final List<StreamCopier> streamCopiers
for (final var copier : streamCopiers) {
copier.removeFileAndDropTmpTable();
}
db.close();
}
if (firstException != null) {
throw firstException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,9 @@ public AirbyteConnectionStatus check(final JsonNode config) {
.withMessage("Could not connect to the staging persistence with the provided configuration. \n" + e.getMessage());
}

try {
try (final JdbcDatabase database = getDatabase(config)) {
final var nameTransformer = getNameTransformer();
final var outputSchema = nameTransformer.convertStreamName(config.get(schemaFieldName).asText());
final JdbcDatabase database = getDatabase(config);
AbstractJdbcDestination.attemptSQLCreateAndDropTableOperations(outputSchema, database, nameTransformer, getSqlOperations());

return new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public StandardCheckConnectionOutput run(final StandardCheckConnectionInput inpu
LOGGER.debug("Check connection job received output: {}", output);
return output;
} else {
throw new WorkerException("Error while getting checking connection.");
throw new WorkerException(String.format("Error checking connection, status: %s, exit code: %d", status, exitCode));
}

} catch (final Exception e) {
Expand Down
6 changes: 6 additions & 0 deletions tools/bin/ci_integration_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
set -e

. tools/lib/lib.sh
. tools/lib/databricks.sh

# runs integration tests for an integration name

connector="$1"
all_integration_tests=$(./gradlew integrationTest --dry-run | grep 'integrationTest SKIPPED' | cut -d: -f 4)
run() {
if [[ "$connector" == "all" ]] ; then
_get_databricks_jdbc_driver
echo "Running: ./gradlew --no-daemon --scan integrationTest"
./gradlew --no-daemon --scan integrationTest
else
Expand All @@ -34,6 +36,10 @@ else
integrationTestCommand=":airbyte-integrations:connectors:$connector:integrationTest"
fi
if [ -n "$selected_integration_test" ] ; then
if [[ "$selected_integration_test" == *"databricks"* ]] ; then
_get_databricks_jdbc_driver
fi

echo "Running: ./gradlew --no-daemon --scan $integrationTestCommand"
./gradlew --no-daemon --scan "$integrationTestCommand"
else
Expand Down
6 changes: 6 additions & 0 deletions tools/integrations/manage.sh
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ set -e
set -x

. tools/lib/lib.sh
. tools/lib/databricks.sh

USAGE="
Usage: $(basename "$0") <cmd>
Expand Down Expand Up @@ -37,6 +38,11 @@ cmd_build() {
[ -d "$path" ] || error "Path must be the root path of the integration"

local run_tests=$1; shift || run_tests=true

if [[ "airbyte-integrations/connectors/destination-databricks" == "${path}" ]]; then
_get_databricks_jdbc_driver
fi

echo "Building $path"
./gradlew --no-daemon "$(_to_gradle_path "$path" clean)"
./gradlew --no-daemon "$(_to_gradle_path "$path" build)"
Expand Down
24 changes: 24 additions & 0 deletions tools/lib/databricks.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

. tools/lib/lib.sh

# Whoever runs this script must accept the following terms & conditions:
# https://databricks.com/jdbc-odbc-driver-license
_get_databricks_jdbc_driver() {
local driver_zip="SimbaSparkJDBC42-2.6.21.1039.zip"
local driver_file="SparkJDBC42.jar"
local driver_url="https://databricks-bi-artifacts.s3.us-east-2.amazonaws.com/simbaspark-drivers/jdbc/2.6.21/${driver_zip}"
local connector_path="airbyte-integrations/connectors/destination-databricks"

if [[ -f "${connector_path}/lib/${driver_file}" ]] ; then
echo "[Databricks] Spark JDBC driver already exists"
else
echo "[Databricks] Downloading Spark JDBC driver..."
curl -o "${connector_path}/lib/${driver_zip}" "${driver_url}"

echo "[Databricks] Extracting Spark JDBC driver..."
unzip "${connector_path}/lib/${driver_zip}" "${driver_file}"
mv "${driver_file}" "${connector_path}/lib/"
rm "${connector_path}/lib/${driver_zip}"
fi
}

0 comments on commit dfb7b8c

Please sign in to comment.