Skip to content

Commit

Permalink
🐛 Source IBM Db2: Connector provides wrong values for a datatype (air…
Browse files Browse the repository at this point in the history
…bytehq#7670)

* added DB2 type transformation

* added DB2 type transformation

* fixed code style

* fixed remarks

* fixed remarks

* fixed remarks

* added java doc for certificate generation

* bump new version
  • Loading branch information
andriikorotkov authored and schlattk committed Jan 4, 2022
1 parent 25f3b63 commit c6cadaa
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "447e0381-3780-4b46-bb62-00a4e3c8b8e2",
"name": "IBM Db2",
"dockerRepository": "airbyte/source-db2",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/db2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@
- name: IBM Db2
sourceDefinitionId: 447e0381-3780-4b46-bb62-00a4e3c8b8e2
dockerRepository: airbyte/source-db2
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/db2
sourceType: database
- name: Instagram
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2;
package io.airbyte.db.jdbc;

import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Db2JdbcStreamingQueryConfiguration implements
JdbcStreamingQueryConfiguration {
public class Db2JdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration {

@Override
public void accept(final Connection connection, final PreparedStatement preparedStatement)
throws SQLException {
public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException {
connection.setAutoCommit(false);
preparedStatement.setFetchSize(1000);
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-db2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-db2
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
Expand All @@ -24,12 +26,13 @@ public class Db2Source extends AbstractJdbcSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(Db2Source.class);
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";
private static Db2SourceOperations operations;

private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8);
private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks";

public Db2Source() {
super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration());
super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration(), new Db2SourceOperations());
}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Db2SourceOperations extends JdbcSourceOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(Db2SourceOperations.class);
private static final List<String> DB2_UNIQUE_NUMBER_TYPES = List.of("DECFLOAT");

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
final int columnCount = queryContext.getMetaData().getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());

for (int i = 1; i <= columnCount; i++) {
setFields(queryContext, i, jsonNode);
}

return jsonNode;
}

/* Helpers */

private void setFields(ResultSet queryContext, int index, ObjectNode jsonNode) throws SQLException {
try {
queryContext.getObject(index);
if (!queryContext.wasNull()) {
setJsonField(queryContext, index, jsonNode);
}
} catch (SQLException e) {
if (DB2_UNIQUE_NUMBER_TYPES.contains(queryContext.getMetaData().getColumnTypeName(index))) {
db2UniqueTypes(queryContext, index, jsonNode);
} else {
throw new SQLException(e.getCause());
}
}
}

private void db2UniqueTypes(ResultSet resultSet, int index, ObjectNode jsonNode) throws SQLException {
String columnType = resultSet.getMetaData().getColumnTypeName(index);
String columnName = resultSet.getMetaData().getColumnName(index);
if (DB2_UNIQUE_NUMBER_TYPES.contains(columnType)) {
putDecfloat(jsonNode, columnName, resultSet, index);
}
}

private void putDecfloat(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index) {
try {
final double value = resultSet.getDouble(index);
node.put(columnName, value);
} catch (final SQLException e) {
node.put(columnName, (Double) null);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,15 @@ protected void initTests() {
"DOUBLE('1.7976931348623157E+308')")
.addExpectedValues(null, "-1.7976931348623157E308", "-2.2250738585072014E-308", "2.2250738585072014E-308", "1.7976931348623157E308")
.build());

// DECFLOAT type tests
addDataTypeTestData(
TestDataHolder.builder()
.createTablePatternSql(CREATE_TABLE_SQL)
.sourceType("DECFLOAT")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.fullSourceDataType("DECFLOAT(16)")
.addInsertValues("null", "0", "DECFLOAT(10E+307, 16)", "DECFLOAT(10E-307, 16)")
.addInsertValues("null", "0", "1.0E308", "1.0E-306")
.addExpectedValues(null, "0", "1E+308", "1E-306")
.build());
addDataTypeTestData(
Expand All @@ -160,16 +162,14 @@ protected void initTests() {
.addInsertValues("null", "0", "DECFLOAT(10E+307, 34)", "DECFLOAT(10E-307, 34)")
.addExpectedValues(null, "0", "1E+308", "1E-306")
.build());

// TODO "SNaN", "NaN", "Infinity" - fail with an exception in Db2 Driver during conversion to a
// BigDecimal.
// Could be fixed by mapping DECFLOAT to Double or String according to:
// https://www.ibm.com/docs/en/db2-for-zos/12?topic=dttmddtija-retrieval-special-values-from-decfloat-columns-in-java-applications
/*
* addDataTypeTestData( TestDataHolder.builder() .createTablePatternSql(CREATE_TABLE_SQL)
* .sourceType("DECFLOAT") .airbyteType(JsonSchemaPrimitive.NUMBER) .addInsertValues("SNaN", "NaN",
* "Infinity") .addExpectedValues() .build());
*/
addDataTypeTestData(
TestDataHolder.builder()
.createTablePatternSql(CREATE_TABLE_SQL)
.sourceType("DECFLOAT")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("SNaN", "NaN", "Infinity", "-Infinity")
.addExpectedValues("NaN", "NaN", "Infinity", "-Infinity")
.build());

// Boolean values
addDataTypeTestData(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/db2.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ You can also enter your own password for the keystore, but if you don't, the pas

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.3 | 2021-11-05 | [7670](https://github.com/airbytehq/airbyte/pull/7670) | Updated unique DB2 types transformation |
| 0.1.2 | 2021-10-25 | [7355](https://github.com/airbytehq/airbyte/pull/7355) | Added ssl support |
| 0.1.1 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |
| 0.1.0 | 2021-06-22 | [4197](https://github.com/airbytehq/airbyte/pull/4197) | New Source: IBM DB2 |
Expand Down

0 comments on commit c6cadaa

Please sign in to comment.