Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source IBM Db2: Connector provides wrong values for a datatype #7670

Merged
merged 10 commits into from
Nov 11, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "447e0381-3780-4b46-bb62-00a4e3c8b8e2",
"name": "IBM Db2",
"dockerRepository": "airbyte/source-db2",
"dockerImageTag": "0.1.2",
"dockerImageTag": "0.1.3",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/db2"
}
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@
- name: IBM Db2
sourceDefinitionId: 447e0381-3780-4b46-bb62-00a4e3c8b8e2
dockerRepository: airbyte/source-db2
dockerImageTag: 0.1.2
dockerImageTag: 0.1.3
documentationUrl: https://docs.airbyte.io/integrations/sources/db2
sourceType: database
- name: Instagram
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2;
package io.airbyte.db.jdbc;

import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Db2JdbcStreamingQueryConfiguration implements
JdbcStreamingQueryConfiguration {
public class Db2JdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration {

@Override
public void accept(final Connection connection, final PreparedStatement preparedStatement)
throws SQLException {
public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException {
connection.setAutoCommit(false);
preparedStatement.setFetchSize(1000);
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-db2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.name=airbyte/source-db2
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
Expand All @@ -24,12 +26,13 @@ public class Db2Source extends AbstractJdbcSource implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(Db2Source.class);
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";
private static Db2SourceOperations operations;

private static final String KEY_STORE_PASS = RandomStringUtils.randomAlphanumeric(8);
private static final String KEY_STORE_FILE_PATH = "clientkeystore.jks";

public Db2Source() {
super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration());
super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration(), new Db2SourceOperations());
}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Db2SourceOperations extends JdbcSourceOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(Db2SourceOperations.class);
private static final List<String> DB2_UNIQUE_NUMBER_TYPES = List.of("DECFLOAT");

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
final int columnCount = queryContext.getMetaData().getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());

for (int i = 1; i <= columnCount; i++) {
setFields(queryContext, i, jsonNode);
}

return jsonNode;
}

/* Helpers */

private void setFields(ResultSet queryContext, int index, ObjectNode jsonNode) throws SQLException {
try {
queryContext.getObject(index);
if (!queryContext.wasNull()) {
setJsonField(queryContext, index, jsonNode);
}
} catch (SQLException e) {
if (DB2_UNIQUE_NUMBER_TYPES.contains(queryContext.getMetaData().getColumnTypeName(index))) {
db2UniqueTypes(queryContext, index, jsonNode);
} else {
throw new SQLException(e.getCause());
}
}
}

private void db2UniqueTypes(ResultSet resultSet, int index, ObjectNode jsonNode) throws SQLException {
String columnType = resultSet.getMetaData().getColumnTypeName(index);
String columnName = resultSet.getMetaData().getColumnName(index);
if (DB2_UNIQUE_NUMBER_TYPES.contains(columnType)) {
putDecfloat(jsonNode, columnName, resultSet, index);
}
}

private void putDecfloat(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index) {
try {
final double value = resultSet.getDouble(index);
node.put(columnName, value);
} catch (final SQLException e) {
node.put(columnName, (Double) null);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -142,13 +142,15 @@ protected void initTests() {
"DOUBLE('1.7976931348623157E+308')")
.addExpectedValues(null, "-1.7976931348623157E308", "-2.2250738585072014E-308", "2.2250738585072014E-308", "1.7976931348623157E308")
.build());

// DECFLOAT type tests
addDataTypeTestData(
TestDataHolder.builder()
.createTablePatternSql(CREATE_TABLE_SQL)
.sourceType("DECFLOAT")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.fullSourceDataType("DECFLOAT(16)")
.addInsertValues("null", "0", "DECFLOAT(10E+307, 16)", "DECFLOAT(10E-307, 16)")
.addInsertValues("null", "0", "1.0E308", "1.0E-306")
.addExpectedValues(null, "0", "1E+308", "1E-306")
.build());
addDataTypeTestData(
Expand All @@ -160,16 +162,14 @@ protected void initTests() {
.addInsertValues("null", "0", "DECFLOAT(10E+307, 34)", "DECFLOAT(10E-307, 34)")
.addExpectedValues(null, "0", "1E+308", "1E-306")
.build());

// TODO "SNaN", "NaN", "Infinity" - fail with an exception in Db2 Driver during conversion to a
// BigDecimal.
// Could be fixed by mapping DECFLOAT to Double or String according to:
// https://www.ibm.com/docs/en/db2-for-zos/12?topic=dttmddtija-retrieval-special-values-from-decfloat-columns-in-java-applications
/*
* addDataTypeTestData( TestDataHolder.builder() .createTablePatternSql(CREATE_TABLE_SQL)
* .sourceType("DECFLOAT") .airbyteType(JsonSchemaPrimitive.NUMBER) .addInsertValues("SNaN", "NaN",
* "Infinity") .addExpectedValues() .build());
*/
addDataTypeTestData(
TestDataHolder.builder()
.createTablePatternSql(CREATE_TABLE_SQL)
.sourceType("DECFLOAT")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("SNaN", "NaN", "Infinity", "-Infinity")
.addExpectedValues("NaN", "NaN", "Infinity", "-Infinity")
.build());

// Boolean values
addDataTypeTestData(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/db2.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ You can also enter your own password for the keystore, but if you don't, the pas

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.3 | 2021-11-05 | [7670](https://github.com/airbytehq/airbyte/pull/7670) | Updated unique DB2 types transformation |
| 0.1.2 | 2021-10-25 | [7355](https://github.com/airbytehq/airbyte/pull/7355) | Added ssl support |
| 0.1.1 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |
| 0.1.0 | 2021-06-22 | [4197](https://github.com/airbytehq/airbyte/pull/4197) | New Source: IBM DB2 |
Expand Down