From e01dccbd224f6ece2c47932487b8a7b291129b59 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Tue, 20 Feb 2024 12:28:18 -0800 Subject: [PATCH 1/9] snowflake/cdk-td-changes --- .../destination-snowflake/build.gradle | 2 +- .../SnowflakeInternalStagingDestination.java | 8 +- .../typing_deduping/SnowflakeColumn.java | 11 -- .../SnowflakeDestinationHandler.java | 150 +++++++++++++----- .../SnowflakeSqlGenerator.java | 36 +---- .../AbstractSnowflakeTypingDedupingTest.java | 2 +- .../SnowflakeSqlGeneratorIntegrationTest.java | 13 +- 7 files changed, 126 insertions(+), 96 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumn.java diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 3cc7265e2df9d..cb807139dcacc 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -5,7 +5,7 @@ plugins { airbyteJavaConnector { cdkVersionRequired = '0.20.9' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = false + useLocalCdk = true } java { diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index 472c8d5dec8ae..8fee8750b0596 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -13,6 +13,7 @@ import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; import io.airbyte.cdk.integrations.destination.staging.StagingConsumerFactory; import io.airbyte.commons.json.Jsons; @@ -129,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() { throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface"); } + @Override + protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) { + throw new UnsupportedOperationException("Snowflake does not yet use the native JDBC DV2 interface"); + } + @Override public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonNode config, final ConfiguredAirbyteCatalog catalog, @@ -158,7 +164,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); final int defaultThreadCount = 8; if (disableTypeDedupe) { - typerDeduper = new NoOpTyperDeduperWithV1V2Migrations<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, + typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, defaultThreadCount); } else { typerDeduper = diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumn.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumn.java deleted file mode 100644 index 8415fedf587c3..0000000000000 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumn.java +++ /dev/null @@ -1,11 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.snowflake.typing_deduping; - -/** - * type is notably _not_ a {@link net.snowflake.client.jdbc.SnowflakeType}. That enum doesn't - * contain all the types that snowflake supports (specifically NUMBER). - */ -public record SnowflakeColumn(String name, String type) {} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 7afa49a48cc42..154758507201f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -4,24 +4,43 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META; + import io.airbyte.cdk.db.jdbc.JdbcDatabase; +import io.airbyte.cdk.integrations.base.JavaBaseConstants; +import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition; +import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; +import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; +import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; +import io.airbyte.integrations.base.destination.typing_deduping.Array; +import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; +import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; +import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; +import io.airbyte.integrations.base.destination.typing_deduping.Struct; +import io.airbyte.integrations.base.destination.typing_deduping.Union; +import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import java.util.stream.Collectors; import net.snowflake.client.jdbc.SnowflakeSQLException; import org.apache.commons.text.StringSubstitutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SnowflakeDestinationHandler implements DestinationHandler { +public class SnowflakeDestinationHandler extends JdbcDestinationHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDestinationHandler.class); public static final String EXCEPTION_COMMON_PREFIX = "JavaScript execution error: Uncaught Execution of multiple statements failed on statement"; @@ -30,60 +49,58 @@ public class SnowflakeDestinationHandler implements DestinationHandler findExistingTable(final StreamId id) throws SQLException { + public Optional findExistingTable(final StreamId id) throws SQLException { // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates // VARIANT as VARCHAR - final LinkedHashMap columns = database.queryJsons( - """ - SELECT column_name, data_type, is_nullable - FROM information_schema.columns - WHERE table_catalog = ? - AND table_schema = ? - AND table_name = ? - ORDER BY ordinal_position; - """, - databaseName.toUpperCase(), - id.finalNamespace().toUpperCase(), - id.finalName().toUpperCase()).stream() + final LinkedHashMap columns = database.queryJsons( + """ + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_catalog = ? + AND table_schema = ? + AND table_name = ? + ORDER BY ordinal_position; + """, + databaseName.toUpperCase(), + id.finalNamespace().toUpperCase(), + id.finalName().toUpperCase()).stream() .collect(LinkedHashMap::new, - (map, row) -> map.put( - row.get("COLUMN_NAME").asText(), - new SnowflakeColumnDefinition(row.get("DATA_TYPE").asText(), fromSnowflakeBoolean(row.get("IS_NULLABLE").asText()))), - LinkedHashMap::putAll); + (map, row) -> map.put( + row.get("COLUMN_NAME").asText(), + new ColumnDefinition( + row.get("COLUMN_NAME").asText(), + row.get("DATA_TYPE").asText(), + 0, //unused + fromIsNullableIsoString(row.get("IS_NULLABLE").asText()))), + LinkedHashMap::putAll); if (columns.isEmpty()) { return Optional.empty(); } else { - return Optional.of(new SnowflakeTableDefinition(columns)); + return Optional.of(new TableDefinition(columns)); } } - @Override - public LinkedHashMap findExistingFinalTables(final List list) throws Exception { - return null; - } - - @Override - public boolean isFinalTableEmpty(final StreamId id) throws SQLException { + private boolean isFinalTableEmpty(final StreamId id) throws SQLException { final int rowCount = database.queryInt( """ - SELECT row_count - FROM information_schema.tables - WHERE table_catalog = ? - AND table_schema = ? - AND table_name = ? - """, + SELECT row_count + FROM information_schema.tables + WHERE table_catalog = ? + AND table_schema = ? + AND table_name = ? + """, databaseName.toUpperCase(), id.finalNamespace().toUpperCase(), id.finalName().toUpperCase()); return rowCount == 0; } - @Override + public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { final ResultSet tables = database.getMetaData().getTables( databaseName, @@ -99,7 +116,7 @@ public InitialRawTableState getInitialRawTableState(final StreamId id) throws Ex final Optional minUnloadedTimestamp = Optional.ofNullable(database.queryStrings( conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of( "raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace( - """ + """ SELECT to_varchar( TIMESTAMPADD(NANOSECOND, -1, MIN("_airbyte_extracted_at")), 'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM' @@ -118,7 +135,7 @@ record -> record.getString("MIN_TIMESTAMP")).get(0)); final Optional maxTimestamp = Optional.ofNullable(database.queryStrings( conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of( "raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace( - """ + """ SELECT to_varchar( MAX("_airbyte_extracted_at"), 'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM' @@ -158,12 +175,61 @@ public void execute(final Sql sql) throws Exception { } } - /** - * In snowflake information_schema tables, booleans return "YES" and "NO", which DataBind doesn't - * know how to use - */ - private boolean fromSnowflakeBoolean(final String input) { - return input.equalsIgnoreCase("yes"); + + private Set getPks(final StreamConfig stream) { + return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); + } + + @Override + protected boolean isAirbyteMetaColumnMatch(TableDefinition existingTable) { + return existingTable.columns().containsKey(COLUMN_NAME_AB_META) && + "VARIANT".equals(existingTable.columns().get(COLUMN_NAME_AB_META).type()); + } + + protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) { + final Set pks = getPks(stream); + // soft-resetting https://github.com/airbytehq/airbyte/pull/31082 + @SuppressWarnings("deprecation") final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream() + .anyMatch(c -> pks.contains(c.getKey()) && !c.getValue().isNullable()); + + return !hasPksWithNonNullConstraint + && super.existingSchemaMatchesStreamConfig(stream, existingTable); + } + @Override + public List gatherInitialState(List streamConfigs) throws Exception { + return null; + } + + @Override + protected String toJdbcTypeName(AirbyteType airbyteType) { + if (airbyteType instanceof final AirbyteProtocolType p) { + return toJdbcTypeName(p); + } + + return switch (airbyteType.getTypeName()) { + case Struct.TYPE -> "OBJECT"; + case Array.TYPE -> "ARRAY"; + case UnsupportedOneOf.TYPE -> "VARIANT"; + case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType()); + default -> throw new IllegalArgumentException("Unrecognized type: " + airbyteType.getTypeName()); + }; + } + + private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { + return switch (airbyteProtocolType) { + case STRING -> "TEXT"; + case NUMBER -> "FLOAT"; + case INTEGER -> "NUMBER"; + case BOOLEAN -> "BOOLEAN"; + case TIMESTAMP_WITH_TIMEZONE -> "TIMESTAMP_TZ"; + case TIMESTAMP_WITHOUT_TIMEZONE -> "TIMESTAMP_NTZ"; + // If you change this - also change the logic in extractAndCast + case TIME_WITH_TIMEZONE -> "TEXT"; + case TIME_WITHOUT_TIMEZONE -> "TIME"; + case DATE -> "DATE"; + case UNKNOWN -> "VARIANT"; + }; + } } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 88733c74315df..8813eb3210ce0 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -36,7 +36,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; -public class SnowflakeSqlGenerator implements SqlGenerator { +public class SnowflakeSqlGenerator implements SqlGenerator { public static final String QUOTE = "\""; @@ -134,36 +134,6 @@ public Sql createTable(final StreamConfig stream, final String suffix, final boo """)); } - @Override - public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final SnowflakeTableDefinition existingTable) - throws TableNotMigratedException { - final Set pks = getPks(stream); - - // Check that the columns match, with special handling for the metadata columns. - final LinkedHashMap intendedColumns = stream.columns().entrySet().stream() - .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue())), - LinkedHashMap::putAll); - final LinkedHashMap actualColumns = existingTable.columns().entrySet().stream() - .filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase) - .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) - .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey(), column.getValue().type()), - LinkedHashMap::putAll); - // soft-resetting https://github.com/airbytehq/airbyte/pull/31082 - @SuppressWarnings("deprecation") - final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream() - .anyMatch(c -> pks.contains(c.getKey()) && !c.getValue().isNullable()); - - final boolean sameColumns = actualColumns.equals(intendedColumns) - && !hasPksWithNonNullConstraint - && "TEXT".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID.toUpperCase()).type()) - && "TIMESTAMP_TZ".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT.toUpperCase()).type()) - && "VARIANT".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META.toUpperCase()).type()); - - return sameColumns; - } - @Override public Sql updateTable(final StreamConfig stream, final String finalSuffix, @@ -552,8 +522,4 @@ public static String escapeSingleQuotedString(final String str) { .replace("'", "\\'"); } - private static Set getPks(final StreamConfig stream) { - return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); - } - } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java index a7aac9cef7ccb..2c502d1c1ac9b 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/AbstractSnowflakeTypingDedupingTest.java @@ -107,7 +107,7 @@ protected void globalTeardown() throws Exception { } @Override - protected SqlGenerator getSqlGenerator() { + protected SqlGenerator getSqlGenerator() { return new SnowflakeSqlGenerator(); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java index 13338a83a03ea..bf204e1909d7f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/test-integration/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGeneratorIntegrationTest.java @@ -22,6 +22,7 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.destination.typing_deduping.BaseSqlGeneratorIntegrationTest; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.destination.snowflake.OssCloudEnvVarConsts; @@ -44,7 +45,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; -public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { +public class SnowflakeSqlGeneratorIntegrationTest extends BaseSqlGeneratorIntegrationTest { private static String databaseName; private static JdbcDatabase database; @@ -411,8 +412,9 @@ public void ensurePKsAreIndexedUnique() throws Exception { // should be OK with new tables destinationHandler.execute(createTable); - final Optional existingTableA = destinationHandler.findExistingTable(streamId); - assertTrue(generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTableA.get())); + List initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + assertEquals(1, initialStates.size()); + assertFalse(initialStates.get(0).isSchemaMismatch()); destinationHandler.execute(Sql.of("DROP TABLE " + streamId.finalTableId(""))); // Hack the create query to add NOT NULLs to emulate the old behavior @@ -424,8 +426,9 @@ public void ensurePKsAreIndexedUnique() throws Exception { .collect(joining("\r\n"))) .toList()).toList(); destinationHandler.execute(new Sql(createTableModified)); - final Optional existingTableB = destinationHandler.findExistingTable(streamId); - assertFalse(generator.existingSchemaMatchesStreamConfig(incrementalDedupStream, existingTableB.get())); + initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); + assertEquals(1, initialStates.size()); + assertTrue(initialStates.get(0).isSchemaMismatch()); } } From 86996d1c2d7f89efd3c9ef21ff73d6f32d3956cf Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Tue, 20 Feb 2024 18:16:15 -0800 Subject: [PATCH 2/9] fix dest handler --- .../SnowflakeDestinationHandler.java | 161 +++++++++++++++--- 1 file changed, 141 insertions(+), 20 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 154758507201f..17be4addbaeaa 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -4,8 +4,12 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT; import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID; +import static io.airbyte.cdk.integrations.base.JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition; @@ -17,6 +21,7 @@ import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; +import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl; import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; import io.airbyte.integrations.base.destination.typing_deduping.Sql; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; @@ -27,6 +32,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; +import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -54,6 +60,68 @@ public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase this.database = database; } + public static LinkedHashMap> findExistingTables(final JdbcDatabase database, + final String databaseName, + final List streamIds) throws SQLException { + final LinkedHashMap> existingTables = new LinkedHashMap<>(); + final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?")); + // convert list stream to array + final String[] namespaces = streamIds.stream().map(streamId -> streamId.finalNamespace().toUpperCase()).toArray(String[]::new); + final String[] names = streamIds.stream().map(streamId -> streamId.finalName().toUpperCase()).toArray(String[]::new); + final String query = """ + SELECT table_schema, table_name, column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_catalog = ? + AND table_schema IN (%s) + AND table_name IN (%s) + ORDER BY table_schema, table_name, ordinal_position; + """.formatted(paramHolder, paramHolder); + final String[] bindValues = new String[streamIds.size() * 2 + 1]; + bindValues[0] = databaseName.toUpperCase(); + System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length); + System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length); + final List results = database.queryJsons(query, bindValues); + for (final JsonNode result : results) { + final String tableSchema = result.get("TABLE_SCHEMA").asText(); + final String tableName = result.get("TABLE_NAME").asText(); + final String columnName = result.get("COLUMN_NAME").asText(); + final String dataType = result.get("DATA_TYPE").asText(); + final String isNullable = result.get("IS_NULLABLE").asText(); + final TableDefinition tableDefinition = existingTables + .computeIfAbsent(tableSchema, k -> new LinkedHashMap<>()) + .computeIfAbsent(tableName, k -> new TableDefinition(new LinkedHashMap<>())); + tableDefinition.columns().put(columnName, new ColumnDefinition(columnName, dataType, 0, fromIsNullableIsoString(isNullable))); + } + return existingTables; + } + + private LinkedHashMap> getFinalTableRowCount(final List streamIds) throws SQLException { + final LinkedHashMap> tableRowCounts = new LinkedHashMap<>(); + final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?")); + // convert list stream to array + final String[] namespaces = streamIds.stream().map(streamId -> streamId.finalNamespace().toUpperCase()).toArray(String[]::new); + final String[] names = streamIds.stream().map(streamId -> streamId.finalName().toUpperCase()).toArray(String[]::new); + final String query = """ + SELECT table_schema, table_name, row_count + FROM information_schema.tables + WHERE table_catalog = ? + AND table_schema IN (%s) + AND table_name IN (%s) + """.formatted(paramHolder, paramHolder); + final String[] bindValues = new String[streamIds.size() * 2 + 1]; + bindValues[0] = databaseName.toUpperCase(); + System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length); + System.arraycopy(names, 0, bindValues, namespaces.length + 1, names.length); + final List results = database.queryJsons(query, bindValues); + for (final JsonNode result : results) { + final String tableSchema = result.get("TABLE_SCHEMA").asText(); + final String tableName = result.get("TABLE_NAME").asText(); + final int rowCount = result.get("ROW_COUNT").asInt(); + tableRowCounts.computeIfAbsent(tableSchema, k -> new LinkedHashMap<>()).put(tableName, rowCount); + } + return tableRowCounts; + } + public Optional findExistingTable(final StreamId id) throws SQLException { // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates // VARIANT as VARCHAR @@ -68,8 +136,7 @@ public Optional findExistingTable(final StreamId id) throws SQL """, databaseName.toUpperCase(), id.finalNamespace().toUpperCase(), - id.finalName().toUpperCase()).stream() - .collect(LinkedHashMap::new, + id.finalName().toUpperCase()).stream().collect(LinkedHashMap::new, (map, row) -> map.put( row.get("COLUMN_NAME").asText(), new ColumnDefinition( @@ -85,20 +152,20 @@ public Optional findExistingTable(final StreamId id) throws SQL } } - private boolean isFinalTableEmpty(final StreamId id) throws SQLException { - final int rowCount = database.queryInt( - """ - SELECT row_count - FROM information_schema.tables - WHERE table_catalog = ? - AND table_schema = ? - AND table_name = ? - """, - databaseName.toUpperCase(), - id.finalNamespace().toUpperCase(), - id.finalName().toUpperCase()); - return rowCount == 0; - } +// private boolean isFinalTableEmpty(final StreamId id) throws SQLException { +// final int rowCount = database.queryInt( +// """ +// SELECT row_count +// FROM information_schema.tables +// WHERE table_catalog = ? +// AND table_schema = ? +// AND table_name = ? +// """, +// databaseName.toUpperCase(), +// id.finalNamespace().toUpperCase(), +// id.finalName().toUpperCase()); +// return rowCount == 0; +// } public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { @@ -180,26 +247,80 @@ private Set getPks(final StreamConfig stream) { return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); } + private boolean isAirbyteRawIdColumnMatch(final TableDefinition existingTable) { + final String abRawIdColumnName = COLUMN_NAME_AB_RAW_ID.toUpperCase(); + return existingTable.columns().containsKey(abRawIdColumnName) && + toJdbcTypeName(AirbyteProtocolType.STRING).equals(existingTable.columns().get(abRawIdColumnName).type()); + } + + private boolean isAirbyteExtractedAtColumnMatch(final TableDefinition existingTable) { + final String abExtractedAtColumnName = COLUMN_NAME_AB_EXTRACTED_AT.toUpperCase(); + return existingTable.columns().containsKey(abExtractedAtColumnName) && + toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE).equals(existingTable.columns().get(abExtractedAtColumnName).type()); + } + @Override protected boolean isAirbyteMetaColumnMatch(TableDefinition existingTable) { - return existingTable.columns().containsKey(COLUMN_NAME_AB_META) && - "VARIANT".equals(existingTable.columns().get(COLUMN_NAME_AB_META).type()); + final String abMetaColumnName = COLUMN_NAME_AB_META.toUpperCase(); + return existingTable.columns().containsKey(abMetaColumnName) && + "VARIANT".equals(existingTable.columns().get(abMetaColumnName).type()); } protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) { final Set pks = getPks(stream); + // This is same as JdbcDestinationHandler#existingSchemaMatchesStreamConfig with upper case conversion. + // TODO: Unify this using name transformer or something. + if (!isAirbyteRawIdColumnMatch(existingTable) || + !isAirbyteExtractedAtColumnMatch(existingTable) || + !isAirbyteMetaColumnMatch(existingTable)) { + // Missing AB meta columns from final table, we need them to do proper T+D so trigger soft-reset + return false; + } + final LinkedHashMap intendedColumns = stream.columns().entrySet().stream() + .collect(LinkedHashMap::new, + (map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())), + LinkedHashMap::putAll); + + // Filter out Meta columns since they don't exist in stream config. + final LinkedHashMap actualColumns = existingTable.columns().entrySet().stream() + .filter(column -> V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase) + .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) + .collect(LinkedHashMap::new, + (map, column) -> map.put(column.getKey(), column.getValue().type()), + LinkedHashMap::putAll); // soft-resetting https://github.com/airbytehq/airbyte/pull/31082 @SuppressWarnings("deprecation") final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream() .anyMatch(c -> pks.contains(c.getKey()) && !c.getValue().isNullable()); return !hasPksWithNonNullConstraint - && super.existingSchemaMatchesStreamConfig(stream, existingTable); + && actualColumns.equals(intendedColumns); } @Override public List gatherInitialState(List streamConfigs) throws Exception { - return null; + List streamIds = streamConfigs.stream().map(StreamConfig::id).toList(); + final LinkedHashMap> existingTables = findExistingTables(database, databaseName, streamIds); + final LinkedHashMap> tableRowCounts = getFinalTableRowCount(streamIds); + return streamConfigs.stream().map(streamConfig -> { + try { + final String namespace = streamConfig.id().finalNamespace().toUpperCase(); + final String name = streamConfig.id().finalName().toUpperCase(); + boolean isSchemaMismatch = false; + boolean isFinalTableEmpty = true; + boolean isFinalTablePresent = existingTables.containsKey(namespace) && existingTables.get(namespace).containsKey(name); + boolean hasRowCount = tableRowCounts.containsKey(namespace) && tableRowCounts.get(namespace).containsKey(name); + if (isFinalTablePresent) { + final TableDefinition existingTable = existingTables.get(namespace).get(name); + isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, existingTable); + isFinalTableEmpty = hasRowCount && tableRowCounts.get(namespace).get(name) == 0; + } + final InitialRawTableState initialRawTableState = getInitialRawTableState(streamConfig.id()); + return new DestinationInitialStateImpl(streamConfig, isFinalTablePresent, initialRawTableState, isSchemaMismatch, isFinalTableEmpty); + } catch (Exception e) { + throw new RuntimeException(e); + } + }).collect(Collectors.toList()); } @Override From 3f50331650cc76f5ef745f91e5284e786d5ea1be Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Tue, 20 Feb 2024 18:25:04 -0800 Subject: [PATCH 3/9] remove redundant upperCase call --- .../typing_deduping/SnowflakeDestinationHandler.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 17be4addbaeaa..9fdf933a61411 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -66,8 +66,8 @@ public static LinkedHashMap> find final LinkedHashMap> existingTables = new LinkedHashMap<>(); final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?")); // convert list stream to array - final String[] namespaces = streamIds.stream().map(streamId -> streamId.finalNamespace().toUpperCase()).toArray(String[]::new); - final String[] names = streamIds.stream().map(streamId -> streamId.finalName().toUpperCase()).toArray(String[]::new); + final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new); + final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new); final String query = """ SELECT table_schema, table_name, column_name, data_type, is_nullable FROM information_schema.columns @@ -99,8 +99,8 @@ private LinkedHashMap> getFinalTableRowCo final LinkedHashMap> tableRowCounts = new LinkedHashMap<>(); final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?")); // convert list stream to array - final String[] namespaces = streamIds.stream().map(streamId -> streamId.finalNamespace().toUpperCase()).toArray(String[]::new); - final String[] names = streamIds.stream().map(streamId -> streamId.finalName().toUpperCase()).toArray(String[]::new); + final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new); + final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new); final String query = """ SELECT table_schema, table_name, row_count FROM information_schema.tables From f417343672b9ef2fbf5cd4e3b1f148ddc9f79858 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Tue, 20 Feb 2024 18:59:33 -0800 Subject: [PATCH 4/9] remove override --- .../snowflake/typing_deduping/SnowflakeDestinationHandler.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 9fdf933a61411..da180c1fbd914 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -259,8 +259,7 @@ private boolean isAirbyteExtractedAtColumnMatch(final TableDefinition existingTa toJdbcTypeName(AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE).equals(existingTable.columns().get(abExtractedAtColumnName).type()); } - @Override - protected boolean isAirbyteMetaColumnMatch(TableDefinition existingTable) { + private boolean isAirbyteMetaColumnMatch(TableDefinition existingTable) { final String abMetaColumnName = COLUMN_NAME_AB_META.toUpperCase(); return existingTable.columns().containsKey(abMetaColumnName) && "VARIANT".equals(existingTable.columns().get(abMetaColumnName).type()); From ce9fe812120c6d9db93049256c4c0ac9207a947b Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Tue, 20 Feb 2024 20:22:44 -0800 Subject: [PATCH 5/9] fmt --- .../SnowflakeDestinationHandler.java | 106 ++++++++---------- .../SnowflakeSqlGenerator.java | 4 - 2 files changed, 45 insertions(+), 65 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index da180c1fbd914..8e38601739906 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -11,7 +11,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.cdk.db.jdbc.JdbcDatabase; -import io.airbyte.cdk.integrations.base.JavaBaseConstants; import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition; import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; @@ -19,7 +18,6 @@ import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; import io.airbyte.integrations.base.destination.typing_deduping.Array; import io.airbyte.integrations.base.destination.typing_deduping.ColumnId; -import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialStateImpl; import io.airbyte.integrations.base.destination.typing_deduping.InitialRawTableState; @@ -32,7 +30,6 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.time.Instant; -import java.util.Arrays; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -62,20 +59,21 @@ public SnowflakeDestinationHandler(final String databaseName, final JdbcDatabase public static LinkedHashMap> findExistingTables(final JdbcDatabase database, final String databaseName, - final List streamIds) throws SQLException { + final List streamIds) + throws SQLException { final LinkedHashMap> existingTables = new LinkedHashMap<>(); final String paramHolder = String.join(",", Collections.nCopies(streamIds.size(), "?")); // convert list stream to array final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new); final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new); final String query = """ - SELECT table_schema, table_name, column_name, data_type, is_nullable - FROM information_schema.columns - WHERE table_catalog = ? - AND table_schema IN (%s) - AND table_name IN (%s) - ORDER BY table_schema, table_name, ordinal_position; - """.formatted(paramHolder, paramHolder); + SELECT table_schema, table_name, column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_catalog = ? + AND table_schema IN (%s) + AND table_name IN (%s) + ORDER BY table_schema, table_name, ordinal_position; + """.formatted(paramHolder, paramHolder); final String[] bindValues = new String[streamIds.size() * 2 + 1]; bindValues[0] = databaseName.toUpperCase(); System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length); @@ -102,12 +100,12 @@ private LinkedHashMap> getFinalTableRowCo final String[] namespaces = streamIds.stream().map(StreamId::finalNamespace).toArray(String[]::new); final String[] names = streamIds.stream().map(StreamId::finalName).toArray(String[]::new); final String query = """ - SELECT table_schema, table_name, row_count - FROM information_schema.tables - WHERE table_catalog = ? - AND table_schema IN (%s) - AND table_name IN (%s) - """.formatted(paramHolder, paramHolder); + SELECT table_schema, table_name, row_count + FROM information_schema.tables + WHERE table_catalog = ? + AND table_schema IN (%s) + AND table_name IN (%s) + """.formatted(paramHolder, paramHolder); final String[] bindValues = new String[streamIds.size() * 2 + 1]; bindValues[0] = databaseName.toUpperCase(); System.arraycopy(namespaces, 0, bindValues, 1, namespaces.length); @@ -126,25 +124,25 @@ public Optional findExistingTable(final StreamId id) throws SQL // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates // VARIANT as VARCHAR final LinkedHashMap columns = database.queryJsons( - """ - SELECT column_name, data_type, is_nullable - FROM information_schema.columns - WHERE table_catalog = ? - AND table_schema = ? - AND table_name = ? - ORDER BY ordinal_position; - """, - databaseName.toUpperCase(), - id.finalNamespace().toUpperCase(), - id.finalName().toUpperCase()).stream().collect(LinkedHashMap::new, - (map, row) -> map.put( - row.get("COLUMN_NAME").asText(), - new ColumnDefinition( - row.get("COLUMN_NAME").asText(), - row.get("DATA_TYPE").asText(), - 0, //unused - fromIsNullableIsoString(row.get("IS_NULLABLE").asText()))), - LinkedHashMap::putAll); + """ + SELECT column_name, data_type, is_nullable + FROM information_schema.columns + WHERE table_catalog = ? + AND table_schema = ? + AND table_name = ? + ORDER BY ordinal_position; + """, + databaseName.toUpperCase(), + id.finalNamespace().toUpperCase(), + id.finalName().toUpperCase()).stream().collect(LinkedHashMap::new, + (map, row) -> map.put( + row.get("COLUMN_NAME").asText(), + new ColumnDefinition( + row.get("COLUMN_NAME").asText(), + row.get("DATA_TYPE").asText(), + 0, // unused + fromIsNullableIsoString(row.get("IS_NULLABLE").asText()))), + LinkedHashMap::putAll); if (columns.isEmpty()) { return Optional.empty(); } else { @@ -152,22 +150,6 @@ public Optional findExistingTable(final StreamId id) throws SQL } } -// private boolean isFinalTableEmpty(final StreamId id) throws SQLException { -// final int rowCount = database.queryInt( -// """ -// SELECT row_count -// FROM information_schema.tables -// WHERE table_catalog = ? -// AND table_schema = ? -// AND table_name = ? -// """, -// databaseName.toUpperCase(), -// id.finalNamespace().toUpperCase(), -// id.finalName().toUpperCase()); -// return rowCount == 0; -// } - - public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { final ResultSet tables = database.getMetaData().getTables( databaseName, @@ -183,7 +165,7 @@ public InitialRawTableState getInitialRawTableState(final StreamId id) throws Ex final Optional minUnloadedTimestamp = Optional.ofNullable(database.queryStrings( conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of( "raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace( - """ + """ SELECT to_varchar( TIMESTAMPADD(NANOSECOND, -1, MIN("_airbyte_extracted_at")), 'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM' @@ -202,7 +184,7 @@ record -> record.getString("MIN_TIMESTAMP")).get(0)); final Optional maxTimestamp = Optional.ofNullable(database.queryStrings( conn -> conn.createStatement().executeQuery(new StringSubstitutor(Map.of( "raw_table", id.rawTableId(SnowflakeSqlGenerator.QUOTE))).replace( - """ + """ SELECT to_varchar( MAX("_airbyte_extracted_at"), 'YYYY-MM-DDTHH24:MI:SS.FF9TZH:TZM' @@ -242,7 +224,6 @@ public void execute(final Sql sql) throws Exception { } } - private Set getPks(final StreamConfig stream) { return stream.primaryKey() != null ? stream.primaryKey().stream().map(ColumnId::name).collect(Collectors.toSet()) : Collections.emptySet(); } @@ -267,7 +248,8 @@ private boolean isAirbyteMetaColumnMatch(TableDefinition existingTable) { protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) { final Set pks = getPks(stream); - // This is same as JdbcDestinationHandler#existingSchemaMatchesStreamConfig with upper case conversion. + // This is same as JdbcDestinationHandler#existingSchemaMatchesStreamConfig with upper case + // conversion. // TODO: Unify this using name transformer or something. if (!isAirbyteRawIdColumnMatch(existingTable) || !isAirbyteExtractedAtColumnMatch(existingTable) || @@ -277,18 +259,19 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f } final LinkedHashMap intendedColumns = stream.columns().entrySet().stream() .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())), - LinkedHashMap::putAll); + (map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())), + LinkedHashMap::putAll); // Filter out Meta columns since they don't exist in stream config. final LinkedHashMap actualColumns = existingTable.columns().entrySet().stream() .filter(column -> V2_FINAL_TABLE_METADATA_COLUMNS.stream().map(String::toUpperCase) .noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey()))) .collect(LinkedHashMap::new, - (map, column) -> map.put(column.getKey(), column.getValue().type()), - LinkedHashMap::putAll); + (map, column) -> map.put(column.getKey(), column.getValue().type()), + LinkedHashMap::putAll); // soft-resetting https://github.com/airbytehq/airbyte/pull/31082 - @SuppressWarnings("deprecation") final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream() + @SuppressWarnings("deprecation") + final boolean hasPksWithNonNullConstraint = existingTable.columns().entrySet().stream() .anyMatch(c -> pks.contains(c.getKey()) && !c.getValue().isNullable()); return !hasPksWithNonNullConstraint @@ -352,4 +335,5 @@ private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { case UNKNOWN -> "VARIANT"; }; } + } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java index 8813eb3210ce0..37b0bdaefff8f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeSqlGenerator.java @@ -21,18 +21,14 @@ import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.Struct; -import io.airbyte.integrations.base.destination.typing_deduping.TableNotMigratedException; import io.airbyte.integrations.base.destination.typing_deduping.Union; import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.time.Instant; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.commons.text.StringSubstitutor; From 13a645d77ee409577b7cfdd6c8f068cf86d21fad Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Wed, 21 Feb 2024 15:45:25 -0800 Subject: [PATCH 6/9] moar cleanup and unification --- .../SnowflakeColumnDefinition.java | 19 -------- .../SnowflakeDestinationHandler.java | 30 ------------- .../SnowflakeTableDefinition.java | 14 ------ .../SnowflakeV1V2Migrator.java | 29 ++++++------ .../SnowflakeV2TableMigrator.java | 44 +++++-------------- 5 files changed, 23 insertions(+), 113 deletions(-) delete mode 100644 airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumnDefinition.java delete mode 100644 airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeTableDefinition.java diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumnDefinition.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumnDefinition.java deleted file mode 100644 index 06be84ffe67fc..0000000000000 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeColumnDefinition.java +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.snowflake.typing_deduping; - -/** - * isNullable is only used to execute a migration away from an older version of - * destination-snowflake, where we created PK columns as NOT NULL. This caused a lot of problems - * because many sources emit null PKs. We may want to remove this field eventually. - */ -public record SnowflakeColumnDefinition(String type, boolean isNullable) { - - @Deprecated - public boolean isNullable() { - return isNullable; - } - -} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java index 8e38601739906..5bfeb5d6b25e9 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java @@ -120,36 +120,6 @@ AND table_name IN (%s) return tableRowCounts; } - public Optional findExistingTable(final StreamId id) throws SQLException { - // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates - // VARIANT as VARCHAR - final LinkedHashMap columns = database.queryJsons( - """ - SELECT column_name, data_type, is_nullable - FROM information_schema.columns - WHERE table_catalog = ? - AND table_schema = ? - AND table_name = ? - ORDER BY ordinal_position; - """, - databaseName.toUpperCase(), - id.finalNamespace().toUpperCase(), - id.finalName().toUpperCase()).stream().collect(LinkedHashMap::new, - (map, row) -> map.put( - row.get("COLUMN_NAME").asText(), - new ColumnDefinition( - row.get("COLUMN_NAME").asText(), - row.get("DATA_TYPE").asText(), - 0, // unused - fromIsNullableIsoString(row.get("IS_NULLABLE").asText()))), - LinkedHashMap::putAll); - if (columns.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(new TableDefinition(columns)); - } - } - public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception { final ResultSet tables = database.getMetaData().getTables( databaseName, diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeTableDefinition.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeTableDefinition.java deleted file mode 100644 index 2535d9004b131..0000000000000 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeTableDefinition.java +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (c) 2023 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.integrations.destination.snowflake.typing_deduping; - -import java.util.LinkedHashMap; - -/** - * @param columns Map from column name to type. Type is a plain string because - * {@link net.snowflake.client.jdbc.SnowflakeType} doesn't actually have all the types that - * Snowflake supports. - */ -public record SnowflakeTableDefinition(LinkedHashMap columns) {} diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java index aa6eba7f7f963..3226afa583371 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV1V2Migrator.java @@ -4,8 +4,12 @@ package io.airbyte.integrations.destination.snowflake.typing_deduping; +import static io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler.*; + import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.destination.NamingConventionTransformer; +import io.airbyte.cdk.integrations.destination.jdbc.ColumnDefinition; +import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.integrations.base.destination.typing_deduping.BaseDestinationV1V2Migrator; import io.airbyte.integrations.base.destination.typing_deduping.CollectionUtils; import io.airbyte.integrations.base.destination.typing_deduping.NamespacedTableName; @@ -15,7 +19,7 @@ import java.util.Optional; import lombok.SneakyThrows; -public class SnowflakeV1V2Migrator extends BaseDestinationV1V2Migrator { +public class SnowflakeV1V2Migrator extends BaseDestinationV1V2Migrator { private final NamingConventionTransformer namingConventionTransformer; @@ -48,18 +52,18 @@ protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamCon } @Override - protected boolean schemaMatchesExpectation(final SnowflakeTableDefinition existingTable, final Collection columns) { + protected boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection columns) { return CollectionUtils.containsAllIgnoreCase(existingTable.columns().keySet(), columns); } @SneakyThrows @Override - protected Optional getTableIfExists(final String namespace, final String tableName) throws Exception { - // TODO this is mostly copied from SnowflakeDestinationHandler#findExistingTable, we should probably - // reuse this logic + protected Optional getTableIfExists(final String namespace, final String tableName) throws Exception { + // TODO this looks similar to SnowflakeDestinationHandler#findExistingTables, with a twist; + // databaseName not upper-cased and rawNamespace and rawTableName as-is (no uppercase). // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates // VARIANT as VARCHAR - final LinkedHashMap columns = + final LinkedHashMap columns = database.queryJsons( """ SELECT column_name, data_type, is_nullable @@ -75,12 +79,13 @@ protected Optional getTableIfExists(final String names .stream() .collect(LinkedHashMap::new, (map, row) -> map.put(row.get("COLUMN_NAME").asText(), - new SnowflakeColumnDefinition(row.get("DATA_TYPE").asText(), fromSnowflakeBoolean(row.get("IS_NULLABLE").asText()))), + new ColumnDefinition(row.get("COLUMN_NAME").asText(), row.get("DATA_TYPE").asText(), 0, + fromIsNullableIsoString(row.get("IS_NULLABLE").asText()))), LinkedHashMap::putAll); if (columns.isEmpty()) { return Optional.empty(); } else { - return Optional.of(new SnowflakeTableDefinition(columns)); + return Optional.of(new TableDefinition(columns)); } } @@ -101,12 +106,4 @@ protected boolean doesValidV1RawTableExist(final String namespace, final String return super.doesValidV1RawTableExist(namespace.toUpperCase(), tableName.toUpperCase()); } - /** - * In snowflake information_schema tables, booleans return "YES" and "NO", which DataBind doesn't - * know how to use - */ - private boolean fromSnowflakeBoolean(final String input) { - return input.equalsIgnoreCase("yes"); - } - } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java index 9e04ec3b6f221..eef75f86c7bff 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeV2TableMigrator.java @@ -9,6 +9,7 @@ import io.airbyte.cdk.db.jdbc.JdbcDatabase; import io.airbyte.cdk.integrations.base.TypingAndDedupingFlag; +import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; import io.airbyte.integrations.base.destination.typing_deduping.StreamConfig; import io.airbyte.integrations.base.destination.typing_deduping.StreamId; import io.airbyte.integrations.base.destination.typing_deduping.TypeAndDedupeTransaction; @@ -16,6 +17,7 @@ import io.airbyte.protocol.models.v0.DestinationSyncMode; import java.sql.SQLException; import java.util.LinkedHashMap; +import java.util.List; import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +50,8 @@ public void migrateIfNecessary(final StreamConfig streamConfig) throws Exception streamConfig.id().originalName(), rawNamespace); final boolean syncModeRequiresMigration = streamConfig.destinationSyncMode() != DestinationSyncMode.OVERWRITE; - final boolean existingTableCaseSensitiveExists = findExistingTable_caseSensitive(caseSensitiveStreamId).isPresent(); - final boolean existingTableUppercaseDoesNotExist = !handler.findExistingTable(streamConfig.id()).isPresent(); + final boolean existingTableCaseSensitiveExists = findExistingTable(caseSensitiveStreamId).isPresent(); + final boolean existingTableUppercaseDoesNotExist = findExistingTable(streamConfig.id()).isEmpty(); LOGGER.info( "Checking whether upcasing migration is necessary for {}.{}. Sync mode requires migration: {}; existing case-sensitive table exists: {}; existing uppercased table does not exist: {}", streamConfig.id().originalNamespace(), @@ -87,41 +89,15 @@ private static String escapeIdentifier_caseSensitive(final String identifier) { return identifier.replace("\"", "\"\""); } - // And this was taken from - // https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/typing_deduping/SnowflakeDestinationHandler.java - public Optional findExistingTable_caseSensitive(final StreamId id) throws SQLException { + private Optional findExistingTable(final StreamId id) throws SQLException { // The obvious database.getMetaData().getColumns() solution doesn't work, because JDBC translates // VARIANT as VARCHAR - final LinkedHashMap columns = database.queryJsons( - """ - SELECT column_name, data_type, is_nullable - FROM information_schema.columns - WHERE table_catalog = ? - AND table_schema = ? - AND table_name = ? - ORDER BY ordinal_position; - """, - databaseName.toUpperCase(), - id.finalNamespace(), - id.finalName()).stream() - .collect(LinkedHashMap::new, - (map, row) -> map.put( - row.get("COLUMN_NAME").asText(), - new SnowflakeColumnDefinition(row.get("DATA_TYPE").asText(), fromSnowflakeBoolean(row.get("IS_NULLABLE").asText()))), - LinkedHashMap::putAll); - if (columns.isEmpty()) { - return Optional.empty(); - } else { - return Optional.of(new SnowflakeTableDefinition(columns)); + LinkedHashMap> existingTableMap = + SnowflakeDestinationHandler.findExistingTables(database, databaseName, List.of(id)); + if (existingTableMap.containsKey(id.finalNamespace()) && existingTableMap.get(id.finalNamespace()).containsKey(id.finalName())) { + return Optional.of(existingTableMap.get(id.finalNamespace()).get(id.finalName())); } - } - - /** - * In snowflake information_schema tables, booleans return "YES" and "NO", which DataBind doesn't - * know how to use - */ - private boolean fromSnowflakeBoolean(String input) { - return input.equalsIgnoreCase("yes"); + return Optional.empty(); } } From 61d7dbc130f1801e07f161b17b8ed9442400c9e0 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Thu, 22 Feb 2024 10:07:40 -0800 Subject: [PATCH 7/9] fix compilation --- .../snowflake/SnowflakeInternalStagingDestination.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java index 8fee8750b0596..253212ecf628c 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeInternalStagingDestination.java @@ -162,13 +162,11 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN final SnowflakeV1V2Migrator migrator = new SnowflakeV1V2Migrator(getNamingResolver(), database, databaseName); final SnowflakeV2TableMigrator v2TableMigrator = new SnowflakeV2TableMigrator(database, databaseName, sqlGenerator, snowflakeDestinationHandler); final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false); - final int defaultThreadCount = 8; if (disableTypeDedupe) { - typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, - defaultThreadCount); + typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator); } else { typerDeduper = - new DefaultTyperDeduper<>(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator, defaultThreadCount); + new DefaultTyperDeduper(sqlGenerator, snowflakeDestinationHandler, parsedCatalog, migrator, v2TableMigrator); } return StagingConsumerFactory.builder( From ef20df62be4fb45c668e73de753f9b55fdef25f3 Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Thu, 22 Feb 2024 16:00:11 -0800 Subject: [PATCH 8/9] logistics --- .../connectors/destination-snowflake/build.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index cb807139dcacc..18647e88fcec5 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -3,9 +3,9 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.20.9' + cdkVersionRequired = '0.23.0' features = ['db-destinations', 's3-destinations', 'typing-deduping'] - useLocalCdk = true + useLocalCdk = false } java { From 2238cca7b7f1dcda189c8c0b84d3d41ae11a64bc Mon Sep 17 00:00:00 2001 From: Gireesh Sreepathi Date: Fri, 23 Feb 2024 12:36:28 -0800 Subject: [PATCH 9/9] version logistics, new cdk Signed-off-by: Gireesh Sreepathi --- .../connectors/destination-snowflake/build.gradle | 2 +- .../connectors/destination-snowflake/metadata.yaml | 2 +- docs/integrations/destinations/snowflake.md | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte-integrations/connectors/destination-snowflake/build.gradle b/airbyte-integrations/connectors/destination-snowflake/build.gradle index 18647e88fcec5..b84e054c06091 100644 --- a/airbyte-integrations/connectors/destination-snowflake/build.gradle +++ b/airbyte-integrations/connectors/destination-snowflake/build.gradle @@ -3,7 +3,7 @@ plugins { } airbyteJavaConnector { - cdkVersionRequired = '0.23.0' + cdkVersionRequired = '0.23.2' features = ['db-destinations', 's3-destinations', 'typing-deduping'] useLocalCdk = false } diff --git a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml index 021cfd26e6851..d39c5a8c96697 100644 --- a/airbyte-integrations/connectors/destination-snowflake/metadata.yaml +++ b/airbyte-integrations/connectors/destination-snowflake/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 424892c4-daac-4491-b35d-c6688ba547ba - dockerImageTag: 3.5.13 + dockerImageTag: 3.5.14 dockerRepository: airbyte/destination-snowflake documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake githubIssueLabel: destination-snowflake diff --git a/docs/integrations/destinations/snowflake.md b/docs/integrations/destinations/snowflake.md index f7a6f3e7a95dd..39be90148e991 100644 --- a/docs/integrations/destinations/snowflake.md +++ b/docs/integrations/destinations/snowflake.md @@ -246,7 +246,8 @@ Otherwise, make sure to grant the role the required permissions in the desired n | Version | Date | Pull Request | Subject | |:----------------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------------------| -| 3.5.13 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | +| 3.5.14 | 2024-02-22 | [35456](https://github.com/airbytehq/airbyte/pull/35456) | Adopt CDK 0.23.0; Gather initial state upfront, reduce information_schema calls | +| 3.5.13 | 2024-02-22 | [35569](https://github.com/airbytehq/airbyte/pull/35569) | Fix logging bug. | | 3.5.12 | 2024-02-15 | [35240](https://github.com/airbytehq/airbyte/pull/35240) | Adopt CDK 0.20.9 | | 3.5.11 | 2024-02-12 | [35194](https://github.com/airbytehq/airbyte/pull/35194) | Reorder auth options | | 3.5.10 | 2024-02-12 | [35144](https://github.com/airbytehq/airbyte/pull/35144) | Adopt CDK 0.20.2 |