Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Destination Postgres: CDK T+D initial state gathering #35385

Merged
merged 7 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions airbyte-cdk/java/airbyte-cdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ MavenLocal debugging steps:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.23.2 | 2024-02-22 | [\#35385](https://github.com/airbytehq/airbyte/pull/35342) | Bugfix: inverted logic of disableTypeDedupe flag |
| 0.23.1 | 2024-02-22 | [\#35527](https://github.com/airbytehq/airbyte/pull/35527) | reduce shutdown timeouts |
| 0.23.0 | 2024-02-22 | [\#35342](https://github.com/airbytehq/airbyte/pull/35342) | Consolidate and perform upfront gathering of DB metadata state |
| 0.21.4 | 2024-02-21 | [\#35511](https://github.com/airbytehq/airbyte/pull/35511) | Reduce CDC state compression limit to 1MB |
| 0.21.3 | 2024-02-20 | [\#35394](https://github.com/airbytehq/airbyte/pull/35394) | Add Junit progress information to the test logs |
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.23.0
version=0.23.2
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ private TyperDeduper getV2TyperDeduper(final JsonNode config, final ConfiguredAi
final var migrator = new JdbcV1V2Migrator(namingResolver, database, databaseName);
final NoopV2TableMigrator v2TableMigrator = new NoopV2TableMigrator();
final DestinationHandler destinationHandler = getDestinationHandler(databaseName, database);
final boolean disableTypeDedupe = !config.has(DISABLE_TYPE_DEDUPE) || config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final boolean disableTypeDedupe = config.has(DISABLE_TYPE_DEDUPE) && config.get(DISABLE_TYPE_DEDUPE).asBoolean(false);
final TyperDeduper typerDeduper;
if (disableTypeDedupe) {
typerDeduper = new NoOpTyperDeduperWithV1V2Migrations(sqlGenerator, destinationHandler, parsedCatalog, migrator, v2TableMigrator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.4'
cdkVersionRequired = '0.23.2'
features = ['db-destinations', 'typing-deduping', 'datastore-postgres']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.0
dockerImageTag: 2.0.1
dockerRepository: airbyte/destination-postgres-strict-encrypt
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ plugins {
}

airbyteJavaConnector {
cdkVersionRequired = '0.20.4'
cdkVersionRequired = '0.23.2'
features = ['db-destinations', 'datastore-postgres', 'typing-deduping']
useLocalCdk = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 25c5221d-dce2-4163-ade9-739ef790f503
dockerImageTag: 2.0.0
dockerImageTag: 2.0.1
dockerRepository: airbyte/destination-postgres
documentationUrl: https://docs.airbyte.com/integrations/destinations/postgres
githubIssueLabel: destination-postgres
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresDestinationHandler;
import io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
Expand Down Expand Up @@ -127,6 +130,11 @@ protected JdbcSqlGenerator getSqlGenerator() {
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
}

@Override
protected JdbcDestinationHandler getDestinationHandler(String databaseName, JdbcDatabase database) {
return new PostgresDestinationHandler(databaseName, database);
}

@Override
public boolean isV2Destination() {
return true;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.postgres.typing_deduping;

import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
import io.airbyte.integrations.base.destination.typing_deduping.Array;
import io.airbyte.integrations.base.destination.typing_deduping.Struct;
import io.airbyte.integrations.base.destination.typing_deduping.Union;
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf;

public class PostgresDestinationHandler extends JdbcDestinationHandler {

public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) {
super(databaseName, jdbcDatabase);
}

@Override
protected String toJdbcTypeName(AirbyteType airbyteType) {
// This is mostly identical to the postgres implementation, but swaps jsonb to super
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this comment applies :P

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lol it was inverted in redshift's handler, i fixed there and propagated that here 😂

if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) {
return toJdbcTypeName(airbyteProtocolType);
}
return switch (airbyteType.getTypeName()) {
case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "jsonb";
// No nested Unions supported so this will definitely not result in infinite recursion.
case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType());
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType);
};
}

private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> "varchar";
case NUMBER -> "numeric";
case INTEGER -> "int8";
case BOOLEAN -> "bool";
case TIMESTAMP_WITH_TIMEZONE -> "timestamptz";
case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp";
case TIME_WITH_TIMEZONE -> "timetz";
case TIME_WITHOUT_TIMEZONE -> "time";
case DATE -> "date";
case UNKNOWN -> "jsonb";
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@
import static org.jooq.impl.DSL.rowNumber;
import static org.jooq.impl.DSL.val;

import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType;
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType;
Expand All @@ -37,7 +34,6 @@
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand All @@ -54,13 +50,6 @@ public class PostgresSqlGenerator extends JdbcSqlGenerator {

public static final DataType<?> JSONB_TYPE = new DefaultDataType<>(null, Object.class, "jsonb");

private static final Map<String, String> POSTGRES_TYPE_NAME_TO_JDBC_TYPE = ImmutableMap.of(
"numeric", "decimal",
"int8", "bigint",
"bool", "boolean",
"timestamptz", "timestamp with time zone",
"timetz", "time with time zone");

public PostgresSqlGenerator(final NamingConventionTransformer namingTransformer) {
super(namingTransformer);
}
Expand Down Expand Up @@ -309,29 +298,6 @@ protected Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Op
.orderBy(orderedFields).as(ROW_NUMBER_COLUMN_NAME);
}

@Override
public boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, final TableDefinition existingTable) {
// Check that the columns match, with special handling for the metadata columns.
// This is mostly identical to the redshift implementation, but swaps super to jsonb
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toDialectType(column.getValue()).getTypeName()),
LinkedHashMap::putAll);
final LinkedHashMap<String, String> actualColumns = existingTable.columns().entrySet().stream()
.filter(column -> JavaBaseConstants.V2_FINAL_TABLE_METADATA_COLUMNS.stream()
.noneMatch(airbyteColumnName -> airbyteColumnName.equals(column.getKey())))
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey(), jdbcTypeNameFromPostgresTypeName(column.getValue().type())),
LinkedHashMap::putAll);

final boolean sameColumns = actualColumns.equals(intendedColumns)
&& "varchar".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_RAW_ID).type())
&& "timestamptz".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT).type())
&& "jsonb".equals(existingTable.columns().get(JavaBaseConstants.COLUMN_NAME_AB_META).type());

return sameColumns;
}

/**
* Extract a raw field, leaving it as jsonb
*/
Expand All @@ -343,8 +309,4 @@ private Field<String> jsonTypeof(final Field<?> field) {
return function("JSONB_TYPEOF", SQLDataType.VARCHAR, field);
}

private static String jdbcTypeNameFromPostgresTypeName(final String redshiftType) {
return POSTGRES_TYPE_NAME_TO_JDBC_TYPE.getOrDefault(redshiftType, redshiftType);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,23 @@
package io.airbyte.integrations.destination.postgres.typing_deduping;

import static io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.JSONB_TYPE;
import static org.junit.jupiter.api.Assertions.assertAll;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState;
import io.airbyte.integrations.base.destination.typing_deduping.Sql;
import io.airbyte.integrations.destination.postgres.PostgresDestination;
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer;
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase;
import java.util.Optional;
import java.util.List;
import javax.sql.DataSource;
import org.jooq.DataType;
import org.jooq.Field;
Expand Down Expand Up @@ -76,8 +75,8 @@ protected JdbcSqlGenerator getSqlGenerator() {
}

@Override
protected DestinationHandler<TableDefinition> getDestinationHandler() {
return new JdbcDestinationHandler(databaseName, database);
protected DestinationHandler getDestinationHandler() {
return new PostgresDestinationHandler(databaseName, database);
}

@Override
Expand All @@ -96,29 +95,11 @@ public void testCreateTableIncremental() throws Exception {
final Sql sql = generator.createTable(incrementalDedupStream, "", false);
destinationHandler.execute(sql);

final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id());

assertTrue(existingTable.isPresent());
assertAll(
() -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("_airbyte_meta").type()),
() -> assertEquals("int8", existingTable.get().columns().get("id1").type()),
() -> assertEquals("int8", existingTable.get().columns().get("id2").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("struct").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("array").type()),
() -> assertEquals("varchar", existingTable.get().columns().get("string").type()),
() -> assertEquals("numeric", existingTable.get().columns().get("number").type()),
() -> assertEquals("int8", existingTable.get().columns().get("integer").type()),
() -> assertEquals("bool", existingTable.get().columns().get("boolean").type()),
() -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()),
() -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()),
() -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()),
() -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()),
() -> assertEquals("date", existingTable.get().columns().get("date").type()),
() -> assertEquals("jsonb", existingTable.get().columns().get("unknown").type()));
// TODO assert on table indexing, etc.
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream));
assertEquals(1, initialStates.size());
final DestinationInitialState initialState = initialStates.getFirst();
assertTrue(initialState.isFinalTablePresent());
assertFalse(initialState.isSchemaMismatch());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do an assertEquals on the entire initialState object? In theory we can hand-construct the expected value, right?

Copy link
Contributor

@edgao edgao Feb 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

owait, we don't have the table schema at all anymore right? so there's no particularly interesting fields to assert on

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private String generateBigString() {
}

@Override
protected SqlGenerator<?> getSqlGenerator() {
protected SqlGenerator getSqlGenerator() {
return new PostgresSqlGenerator(new PostgresSQLNameTransformer());
}

Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/destinations/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ Now that you have set up the Postgres destination connector, check out the follo

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------|
| 2.0.1 | 2024-02-22 | [35385](https://github.com/airbytehq/airbyte/pull/35385) | Upgrade CDK to 0.23.0; Gathering required initial state upfront |
| 2.0.0 | 2024-02-09 | [35042](https://github.com/airbytehq/airbyte/pull/35042) | GA release V2 destinations format. |
| 0.6.3 | 2024-02-06 | [34891](https://github.com/airbytehq/airbyte/pull/34891) | Remove varchar limit, use system defaults |
| 0.6.2 | 2024-01-30 | [34683](https://github.com/airbytehq/airbyte/pull/34683) | CDK Upgrade 0.16.3; Fix dependency mismatches in slf4j lib |
Expand Down Expand Up @@ -220,4 +221,4 @@ Now that you have set up the Postgres destination connector, check out the follo
| 0.3.13 | 2021-12-01 | [\#8371](https://github.com/airbytehq/airbyte/pull/8371) | Fixed incorrect handling "\n" in ssh key |
| 0.3.12 | 2021-11-08 | [\#7719](https://github.com/airbytehq/airbyte/pull/7719) | Improve handling of wide rows by buffering records based on their byte size rather than their count |
| 0.3.11 | 2021-09-07 | [\#5743](https://github.com/airbytehq/airbyte/pull/5743) | Add SSH Tunnel support |
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |
| 0.3.10 | 2021-08-11 | [\#5336](https://github.com/airbytehq/airbyte/pull/5336) | Destination Postgres: fix \u0000\(NULL\) value processing |
Loading