Skip to content

Commit

Permalink
🎉Source Snowflake: Source/Destination doesn't respect DATE data type (#…
Browse files Browse the repository at this point in the history
…14828)

airbyte-5577: Respect date/datetime types for snowflake source.
  • Loading branch information
alexandertsukanov authored Jul 25, 2022
1 parent 0aa9ad7 commit 9185363
Show file tree
Hide file tree
Showing 7 changed files with 83 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,7 @@
- name: Snowflake
sourceDefinitionId: e2d65910-8c8b-40a1-ae7d-ee2416b2bfa2
dockerRepository: airbyte/source-snowflake
dockerImageTag: 0.1.14
dockerImageTag: 0.1.15
documentationUrl: https://docs.airbyte.io/integrations/sources/snowflake
icon: snowflake.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8772,7 +8772,7 @@
- - "client_secret"
oauthFlowOutputParameters:
- - "refresh_token"
- dockerImage: "airbyte/source-snowflake:0.1.14"
- dockerImage: "airbyte/source-snowflake:0.1.15"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-snowflake

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.14
LABEL io.airbyte.version=0.1.15
LABEL io.airbyte.name=airbyte/source-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import io.airbyte.protocol.models.JsonSchemaType;
import java.math.BigDecimal;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -38,4 +40,22 @@ protected void setTimestamp(final PreparedStatement preparedStatement, final int
preparedStatement.setString(parameterIndex, value);
}

@Override
public JsonSchemaType getJsonType(JDBCType jdbcType) {
return switch (jdbcType) {
case BIT, BOOLEAN -> JsonSchemaType.BOOLEAN;
case TINYINT, SMALLINT, REAL, FLOAT, DOUBLE, INTEGER, BIGINT, NUMERIC, DECIMAL -> JsonSchemaType.NUMBER;
case CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR -> JsonSchemaType.STRING;
case DATE -> JsonSchemaType.STRING_DATE;
case TIME -> JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE;
case TIMESTAMP -> JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE;
case TIMESTAMP_WITH_TIMEZONE -> JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE;
case TIME_WITH_TIMEZONE -> JsonSchemaType.STRING_TIME_WITH_TIMEZONE;
case BLOB, BINARY, VARBINARY, LONGVARBINARY -> JsonSchemaType.STRING_BASE_64;
case ARRAY -> JsonSchemaType.ARRAY;
// since column types aren't necessarily meaningful to Airbyte, liberally convert all unrecgonised
// types to String
default -> JsonSchemaType.STRING;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,18 @@
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.snowflake.SnowflakeSource;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.math.BigDecimal;
import java.nio.file.Path;
import java.sql.JDBCType;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -92,4 +99,34 @@ void testCheckFailure() throws Exception {
assertEquals(Status.FAILED, actual.getStatus());
}

@Override
protected AirbyteCatalog getCatalog(final String defaultNamespace) {
return new AirbyteCatalog().withStreams(List.of(
CatalogHelpers.createAirbyteStream(
TABLE_NAME,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
Field.of(COL_ID, JsonSchemaType.NUMBER),
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
Field.of(COL_FIRST_NAME, JsonSchemaType.STRING),
Field.of(COL_LAST_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -252,50 +252,50 @@ protected void initTests() {
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("DATE")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_DATE)
.addInsertValues("null", "'0001-01-01'", "'9999-12-31'")
.addExpectedValues(null, "0001-01-01T00:00:00Z", "9999-12-31T00:00:00Z")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("DATETIME")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'0001-01-01 00:00:00'", "'9999-12-31 23:59:59'", "'9999-12-31 23:59:59.123456'")
.addExpectedValues(null, "0001-01-01T00:00:00.000000Z", "9999-12-31T23:59:59.000000Z", "9999-12-31T23:59:59.123456Z")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIME")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
.addInsertValues("null", "'00:00:00'", "'1:59 PM'", "'23:59:59'")
.addExpectedValues(null, "1970-01-01T00:00:00Z", "1970-01-01T13:59:00Z",
"1970-01-01T23:59:59Z")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123'", "'2018-03-22 12:00:00.123456'")
.addExpectedValues(null, "2018-03-22T12:00:00.123000Z", "2018-03-22T12:00:00.123456Z")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP_LTZ")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123 +05:00'", "'2018-03-22 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-22T07:00:00.123000Z", "2018-03-22T07:00:00.123456Z")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP_NTZ")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITHOUT_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123 +05:00'", "'2018-03-22 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-22T12:00:00.123000Z", "2018-03-22T12:00:00.123456Z")
.build());
addDataTypeTestData(
TestDataHolder.builder()
.sourceType("TIMESTAMP_TZ")
.airbyteType(JsonSchemaType.STRING)
.airbyteType(JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE)
.addInsertValues("null", "'2018-03-22 12:00:00.123 +05:00'", "'2018-03-22 12:00:00.123456 +05:00'")
.addExpectedValues(null, "2018-03-22T07:00:00.123000Z", "2018-03-22T07:00:00.123456Z")
.build());
Expand Down
31 changes: 16 additions & 15 deletions docs/integrations/sources/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,18 +120,19 @@ To read more please check official [Snowflake documentation](https://docs.snowfl

## Changelog

| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.1.14 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected |
| 0.1.13 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors |
| 0.1.12 | 2022-04-29 | [12480](https://github.com/airbytehq/airbyte/pull/12480) | Query tables with adaptive fetch size to optimize JDBC memory consumption |
| 0.1.11 | 2022-04-27 | [10953](https://github.com/airbytehq/airbyte/pull/10953) | Implement OAuth flow |
| 0.1.9 | 2022-02-21 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Fixed cursor for old connectors that use non-microsecond format. Now connectors work with both formats |
| 0.1.8 | 2022-02-18 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Updated timestamp transformation with microseconds |
| 0.1.7 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.6 | 2022-01-25 | [9623](https://github.com/airbytehq/airbyte/pull/9623) | Add jdbc_url_params support for optional JDBC parameters |
| 0.1.5 | 2022-01-19 | [9567](https://github.com/airbytehq/airbyte/pull/9567) | Added parameter for keeping JDBC session alive |
| 0.1.4 | 2021-12-30 | [9203](https://github.com/airbytehq/airbyte/pull/9203) | Update connector fields title/description |
| 0.1.3 | 2021-01-11 | [9304](https://github.com/airbytehq/airbyte/pull/9304) | Upgrade version of JDBC driver |
| 0.1.2 | 2021-10-21 | [7257](https://github.com/airbytehq/airbyte/pull/7257) | Fixed parsing of extreme values for FLOAT and NUMBER data types |
| 0.1.1 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |
| Version | Date | Pull Request | Subject |
|:----------| :--- | :--- | :--- |
| 0.1.15 | 2022-07-22 | [14828](https://github.com/airbytehq/airbyte/pull/14828) | Source Snowflake: Source/Destination doesn't respect DATE data type |
| 0.1.14 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected |
| 0.1.13 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors |
| 0.1.12 | 2022-04-29 | [12480](https://github.com/airbytehq/airbyte/pull/12480) | Query tables with adaptive fetch size to optimize JDBC memory consumption |
| 0.1.11 | 2022-04-27 | [10953](https://github.com/airbytehq/airbyte/pull/10953) | Implement OAuth flow |
| 0.1.9 | 2022-02-21 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Fixed cursor for old connectors that use non-microsecond format. Now connectors work with both formats |
| 0.1.8 | 2022-02-18 | [10242](https://github.com/airbytehq/airbyte/pull/10242) | Updated timestamp transformation with microseconds |
| 0.1.7 | 2022-02-14 | [10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.6 | 2022-01-25 | [9623](https://github.com/airbytehq/airbyte/pull/9623) | Add jdbc_url_params support for optional JDBC parameters |
| 0.1.5 | 2022-01-19 | [9567](https://github.com/airbytehq/airbyte/pull/9567) | Added parameter for keeping JDBC session alive |
| 0.1.4 | 2021-12-30 | [9203](https://github.com/airbytehq/airbyte/pull/9203) | Update connector fields title/description |
| 0.1.3 | 2021-01-11 | [9304](https://github.com/airbytehq/airbyte/pull/9304) | Upgrade version of JDBC driver |
| 0.1.2 | 2021-10-21 | [7257](https://github.com/airbytehq/airbyte/pull/7257) | Fixed parsing of extreme values for FLOAT and NUMBER data types |
| 0.1.1 | 2021-08-13 | [4699](https://github.com/airbytehq/airbyte/pull/4699) | Added json config validator |

0 comments on commit 9185363

Please sign in to comment.