Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐛 Source IBM Db2: Connector provides wrong values for a datatype #7670

Merged
merged 10 commits into from
Nov 11, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2;
package io.airbyte.db.jdbc;

import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

public class Db2JdbcStreamingQueryConfiguration implements
JdbcStreamingQueryConfiguration {
public class Db2JdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration {

@Override
public void accept(final Connection connection, final PreparedStatement preparedStatement)
throws SQLException {
public void accept(final Connection connection, final PreparedStatement preparedStatement) throws SQLException {
connection.setAutoCommit(false);
preparedStatement.setFetchSize(1000);
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-db2/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-db2
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.Db2JdbcStreamingQueryConfiguration;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
Expand All @@ -20,7 +22,7 @@ public class Db2Source extends AbstractJdbcSource implements Source {
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";

public Db2Source() {
super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration());
super(DRIVER_CLASS, new Db2JdbcStreamingQueryConfiguration(), new Db2SourceOperations());
}

public static void main(final String[] args) throws Exception {
Expand Down Expand Up @@ -49,4 +51,9 @@ public Set<String> getExcludedInternalNameSpaces() {
"SYSPROC", "SYSPUBLIC", "SYSSTAT", "SYSTOOLS");
}

@Override
protected JdbcSourceOperations getSourceOperations() {
return new Db2SourceOperations();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.db2;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.JDBCType;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Db2SourceOperations extends JdbcSourceOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(Db2SourceOperations.class);
private static final List<String> DB2_UNIQUE_NUMBER_TYPES = List.of("DECFLOAT");

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
final int columnCount = queryContext.getMetaData().getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());

for (int i = 1; i <= columnCount; i++) {
setFields(queryContext, i, jsonNode);
}

return jsonNode;
}

@Override
protected void setJsonField(ResultSet queryContext, int index, ObjectNode jsonNode) throws SQLException {
final int columnTypeInt = queryContext.getMetaData().getColumnType(index);
final String columnName = queryContext.getMetaData().getColumnName(index);
final JDBCType columnType = safeGetJdbcType(columnTypeInt);

switch (columnType) {
case BIT, BOOLEAN -> putBoolean(jsonNode, columnName, queryContext, index);
case TINYINT, SMALLINT -> putShortInt(jsonNode, columnName, queryContext, index);
case INTEGER -> putInteger(jsonNode, columnName, queryContext, index);
case BIGINT -> putBigInt(jsonNode, columnName, queryContext, index);
case FLOAT, DOUBLE -> putDouble(jsonNode, columnName, queryContext, index);
case REAL -> putReal(jsonNode, columnName, queryContext, index);
case NUMERIC, DECIMAL -> putNumber(jsonNode, columnName, queryContext, index);
case CHAR, VARCHAR, LONGVARCHAR -> putString(jsonNode, columnName, queryContext, index);
case DATE -> putDate(jsonNode, columnName, queryContext, index);
case TIME -> putTime(jsonNode, columnName, queryContext, index);
case TIMESTAMP -> putTimestamp(jsonNode, columnName, queryContext, index);
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> putBinary(jsonNode, columnName, queryContext, index);
default -> putDefault(jsonNode, columnName, queryContext, index);
}
}

/* Helpers */

private void setFields(ResultSet queryContext, int index, ObjectNode jsonNode) throws SQLException {
try {
queryContext.getObject(index);
if (!queryContext.wasNull()) {
setJsonField(queryContext, index, jsonNode);
}
} catch (SQLException e) {
if (DB2_UNIQUE_NUMBER_TYPES.contains(queryContext.getMetaData().getColumnTypeName(index))) {
db2UniqueTypes(queryContext, index, jsonNode);
} else {
throw new SQLException(e.getCause());
}
}
}

private void db2UniqueTypes(ResultSet resultSet, int index, ObjectNode jsonNode) throws SQLException {
String columnType = resultSet.getMetaData().getColumnTypeName(index);
String columnName = resultSet.getMetaData().getColumnName(index);
if (DB2_UNIQUE_NUMBER_TYPES.contains(columnType)) {
putDecfloat(jsonNode, columnName, resultSet, index);
}
}

private void putDecfloat(final ObjectNode node,
final String columnName,
final ResultSet resultSet,
final int index) {
try {
final double value = resultSet.getDouble(index);
node.put(columnName, value);
} catch (final SQLException e) {
node.put(columnName, (Double) null);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -139,13 +139,15 @@ protected void initTests() {
"DOUBLE('1.7976931348623157E+308')")
.addExpectedValues(null, "-1.7976931348623157E308", "-2.2250738585072014E-308", "2.2250738585072014E-308", "1.7976931348623157E308")
.build());

// DECFLOAT type tests
addDataTypeTestData(
TestDataHolder.builder()
.createTablePatternSql(CREATE_TABLE_SQL)
.sourceType("DECFLOAT")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.fullSourceDataType("DECFLOAT(16)")
.addInsertValues("null", "0", "DECFLOAT(10E+307, 16)", "DECFLOAT(10E-307, 16)")
.addInsertValues("null", "0", "1.0E308", "1.0E-306")
.addExpectedValues(null, "0", "1E+308", "1E-306")
.build());
addDataTypeTestData(
Expand All @@ -157,16 +159,14 @@ protected void initTests() {
.addInsertValues("null", "0", "DECFLOAT(10E+307, 34)", "DECFLOAT(10E-307, 34)")
.addExpectedValues(null, "0", "1E+308", "1E-306")
.build());

// TODO "SNaN", "NaN", "Infinity" - fail with an exception in Db2 Driver during conversion to a
// BigDecimal.
// Could be fixed by mapping DECFLOAT to Double or String according to:
// https://www.ibm.com/docs/en/db2-for-zos/12?topic=dttmddtija-retrieval-special-values-from-decfloat-columns-in-java-applications
/*
* addDataTypeTestData( TestDataHolder.builder() .createTablePatternSql(CREATE_TABLE_SQL)
* .sourceType("DECFLOAT") .airbyteType(JsonSchemaPrimitive.NUMBER) .addInsertValues("SNaN", "NaN",
* "Infinity") .addExpectedValues() .build());
*/
addDataTypeTestData(
TestDataHolder.builder()
.createTablePatternSql(CREATE_TABLE_SQL)
.sourceType("DECFLOAT")
.airbyteType(JsonSchemaPrimitive.NUMBER)
.addInsertValues("SNaN", "NaN", "Infinity", "-Infinity")
.addExpectedValues("NaN", "NaN", "Infinity", "-Infinity")
.build());

// Boolean values
addDataTypeTestData(
Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/db2.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Your database user should now be ready for use with Airbyte.

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.2 | 2021-11-05 | [7670](https://github.com/airbytehq/airbyte/pull/7670) | Updated unique DB2 types transformation |
| 0.1.1 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |
| 0.1.0 | 2021-06-22 | [4197](https://github.com/airbytehq/airbyte/pull/4197) | New Source: IBM DB2 |