From 2cd62002bea7dd928d4562642d93340603b2224e Mon Sep 17 00:00:00 2001 From: Greg Solovyev Date: Mon, 13 Jun 2022 23:28:46 -0700 Subject: [PATCH] Fallback to parsing datetime and time strings w/ and w/o timezones in case DateTimeParseException is thrown (#13745) * Fall back to parsing w/ or w/o TZ if parsing a date or a time string fails * auto-bump connector version Co-authored-by: Octavia Squidington III --- .../resources/seed/source_definitions.yaml | 2 +- .../src/main/resources/seed/source_specs.yaml | 2 +- .../connectors/source-postgres/Dockerfile | 2 +- .../postgres/PostgresSourceOperations.java | 55 +++++++++++++------ docs/integrations/sources/postgres.md | 1 + 5 files changed, 42 insertions(+), 20 deletions(-) diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 58334a7a57173..37c10f5c055d4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -715,7 +715,7 @@ - name: Postgres sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 dockerRepository: airbyte/source-postgres - dockerImageTag: 0.4.22 + dockerImageTag: 0.4.23 documentationUrl: https://docs.airbyte.io/integrations/sources/postgres icon: postgresql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 88c0959a6e00f..61ae57001ebe1 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -6719,7 +6719,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-postgres:0.4.22" +- dockerImage: "airbyte/source-postgres:0.4.23" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres" connectionSpecification: diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 1b07db6a77492..f201d4184b74b 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.22 +LABEL io.airbyte.version=0.4.23 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 798286efb2977..3b2a9e8e29ffd 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -31,6 +31,7 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.OffsetTime; +import java.time.format.DateTimeParseException; import java.util.Collections; import org.postgresql.jdbc.PgResultSetMetaData; import org.slf4j.Logger; @@ -108,22 +109,42 @@ public void setStatementField(final PreparedStatement preparedStatement, } } - private void setTimeWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { - preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); + private void setTimeWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); + } catch (final DateTimeParseException e) { + //attempt to parse the time w/o timezone. This can be caused by schema created with a different version of the connector + preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); + } } - private void setTimestampWithTimezone(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { - preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + private void setTimestampWithTimezone(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } catch (final DateTimeParseException e) { + //attempt to parse the datetime w/o timezone. This can be caused by schema created with a different version of the connector + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } } @Override - protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { - preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + protected void setTimestamp(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); + } catch (final DateTimeParseException e) { + //attempt to parse the datetime with timezone. This can be caused by schema created with an older version of the connector + preparedStatement.setObject(parameterIndex, OffsetDateTime.parse(value)); + } } @Override - protected void setTime(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { - preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); + protected void setTime(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { + try { + preparedStatement.setObject(parameterIndex, LocalTime.parse(value)); + } catch (final DateTimeParseException e) { + //attempt to parse the datetime with timezone. This can be caused by schema created with an older version of the connector + preparedStatement.setObject(parameterIndex, OffsetTime.parse(value)); + } } @Override @@ -170,21 +191,21 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob } @Override - protected void putDate(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class); + protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + final LocalDate date = getDateTimeObject(resultSet, index, LocalDate.class); node.put(columnName, resolveEra(date, date.toString())); } @Override - protected void putTime(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class); + protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + final LocalTime time = getDateTimeObject(resultSet, index, LocalTime.class); node.put(columnName, time.toString()); } @Override - protected void putTimestamp(ObjectNode node, String columnName, ResultSet resultSet, int index) throws SQLException { - LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); - LocalDate date = timestamp.toLocalDate(); + protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException { + final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class); + final LocalDate date = timestamp.toLocalDate(); node.put(columnName, resolveEra(date, timestamp.toString())); } @@ -214,7 +235,7 @@ public JDBCType getFieldType(final JsonNode field) { } @Override - public JsonSchemaType getJsonType(JDBCType jdbcType) { + public JsonSchemaType getJsonType(final JDBCType jdbcType) { return switch (jdbcType) { case BOOLEAN -> JsonSchemaType.BOOLEAN; case TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL -> JsonSchemaType.NUMBER; @@ -264,7 +285,7 @@ private void putHstoreAsJson(final ObjectNode node, final String columnName, fin final var data = resultSet.getObject(index); try { node.put(columnName, OBJECT_MAPPER.writeValueAsString(data)); - } catch (JsonProcessingException e) { + } catch (final JsonProcessingException e) { throw new RuntimeException("Could not parse 'hstore' value:" + e); } } diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index 4b3781a3e55ce..0f8437a4f373d 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -275,6 +275,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------| +| 0.4.23 | 2022-06-13 | [13655](https://github.com/airbytehq/airbyte/pull/13745) | Fixed handling datetime cursors when upgrading from older versions of the connector | | 0.4.22 | 2022-06-09 | [13655](https://github.com/airbytehq/airbyte/pull/13655) | Fixed bug with unsupported date-time datatypes during incremental sync | | 0.4.21 | 2022-06-06 | [13435](https://github.com/airbytehq/airbyte/pull/13435) | Adjust JDBC fetch size based on max memory and max row size | | 0.4.20 | 2022-06-02 | [13367](https://github.com/airbytehq/airbyte/pull/13367) | Added convertion hstore to json format |