-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Destination Postgres: CDK T+D initial state gathering #35385
Changes from all commits
ebef209
9b3cd6e
7fe83d0
0781794
9e376a8
80e5c27
f4c0a96
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1 @@ | ||
version=0.23.0 | ||
version=0.23.2 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Copyright (c) 2023 Airbyte, Inc., all rights reserved. | ||
*/ | ||
|
||
package io.airbyte.integrations.destination.postgres.typing_deduping; | ||
|
||
import io.airbyte.cdk.db.jdbc.JdbcDatabase; | ||
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; | ||
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteProtocolType; | ||
import io.airbyte.integrations.base.destination.typing_deduping.AirbyteType; | ||
import io.airbyte.integrations.base.destination.typing_deduping.Array; | ||
import io.airbyte.integrations.base.destination.typing_deduping.Struct; | ||
import io.airbyte.integrations.base.destination.typing_deduping.Union; | ||
import io.airbyte.integrations.base.destination.typing_deduping.UnsupportedOneOf; | ||
|
||
public class PostgresDestinationHandler extends JdbcDestinationHandler { | ||
|
||
public PostgresDestinationHandler(String databaseName, JdbcDatabase jdbcDatabase) { | ||
super(databaseName, jdbcDatabase); | ||
} | ||
|
||
@Override | ||
protected String toJdbcTypeName(AirbyteType airbyteType) { | ||
// This is mostly identical to the postgres implementation, but swaps jsonb to super | ||
if (airbyteType instanceof final AirbyteProtocolType airbyteProtocolType) { | ||
return toJdbcTypeName(airbyteProtocolType); | ||
} | ||
return switch (airbyteType.getTypeName()) { | ||
case Struct.TYPE, UnsupportedOneOf.TYPE, Array.TYPE -> "jsonb"; | ||
// No nested Unions supported so this will definitely not result in infinite recursion. | ||
case Union.TYPE -> toJdbcTypeName(((Union) airbyteType).chooseType()); | ||
default -> throw new IllegalArgumentException("Unsupported AirbyteType: " + airbyteType); | ||
}; | ||
} | ||
|
||
private String toJdbcTypeName(final AirbyteProtocolType airbyteProtocolType) { | ||
return switch (airbyteProtocolType) { | ||
case STRING -> "varchar"; | ||
case NUMBER -> "numeric"; | ||
case INTEGER -> "int8"; | ||
case BOOLEAN -> "bool"; | ||
case TIMESTAMP_WITH_TIMEZONE -> "timestamptz"; | ||
case TIMESTAMP_WITHOUT_TIMEZONE -> "timestamp"; | ||
case TIME_WITH_TIMEZONE -> "timetz"; | ||
case TIME_WITHOUT_TIMEZONE -> "time"; | ||
case DATE -> "date"; | ||
case UNKNOWN -> "jsonb"; | ||
}; | ||
} | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,24 +5,23 @@ | |
package io.airbyte.integrations.destination.postgres.typing_deduping; | ||
|
||
import static io.airbyte.integrations.destination.postgres.typing_deduping.PostgresSqlGenerator.JSONB_TYPE; | ||
import static org.junit.jupiter.api.Assertions.assertAll; | ||
import static org.junit.jupiter.api.Assertions.assertEquals; | ||
import static org.junit.jupiter.api.Assertions.assertFalse; | ||
import static org.junit.jupiter.api.Assertions.assertTrue; | ||
|
||
import com.fasterxml.jackson.databind.JsonNode; | ||
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase; | ||
import io.airbyte.cdk.db.jdbc.JdbcDatabase; | ||
import io.airbyte.cdk.db.jdbc.JdbcUtils; | ||
import io.airbyte.cdk.integrations.destination.jdbc.TableDefinition; | ||
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler; | ||
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator; | ||
import io.airbyte.cdk.integrations.standardtest.destination.typing_deduping.JdbcSqlGeneratorIntegrationTest; | ||
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler; | ||
import io.airbyte.integrations.base.destination.typing_deduping.DestinationInitialState; | ||
import io.airbyte.integrations.base.destination.typing_deduping.Sql; | ||
import io.airbyte.integrations.destination.postgres.PostgresDestination; | ||
import io.airbyte.integrations.destination.postgres.PostgresSQLNameTransformer; | ||
import io.airbyte.integrations.destination.postgres.PostgresTestDatabase; | ||
import java.util.Optional; | ||
import java.util.List; | ||
import javax.sql.DataSource; | ||
import org.jooq.DataType; | ||
import org.jooq.Field; | ||
|
@@ -76,8 +75,8 @@ protected JdbcSqlGenerator getSqlGenerator() { | |
} | ||
|
||
@Override | ||
protected DestinationHandler<TableDefinition> getDestinationHandler() { | ||
return new JdbcDestinationHandler(databaseName, database); | ||
protected DestinationHandler getDestinationHandler() { | ||
return new PostgresDestinationHandler(databaseName, database); | ||
} | ||
|
||
@Override | ||
|
@@ -96,29 +95,11 @@ public void testCreateTableIncremental() throws Exception { | |
final Sql sql = generator.createTable(incrementalDedupStream, "", false); | ||
destinationHandler.execute(sql); | ||
|
||
final Optional<TableDefinition> existingTable = destinationHandler.findExistingTable(incrementalDedupStream.id()); | ||
|
||
assertTrue(existingTable.isPresent()); | ||
assertAll( | ||
() -> assertEquals("varchar", existingTable.get().columns().get("_airbyte_raw_id").type()), | ||
() -> assertEquals("timestamptz", existingTable.get().columns().get("_airbyte_extracted_at").type()), | ||
() -> assertEquals("jsonb", existingTable.get().columns().get("_airbyte_meta").type()), | ||
() -> assertEquals("int8", existingTable.get().columns().get("id1").type()), | ||
() -> assertEquals("int8", existingTable.get().columns().get("id2").type()), | ||
() -> assertEquals("timestamptz", existingTable.get().columns().get("updated_at").type()), | ||
() -> assertEquals("jsonb", existingTable.get().columns().get("struct").type()), | ||
() -> assertEquals("jsonb", existingTable.get().columns().get("array").type()), | ||
() -> assertEquals("varchar", existingTable.get().columns().get("string").type()), | ||
() -> assertEquals("numeric", existingTable.get().columns().get("number").type()), | ||
() -> assertEquals("int8", existingTable.get().columns().get("integer").type()), | ||
() -> assertEquals("bool", existingTable.get().columns().get("boolean").type()), | ||
() -> assertEquals("timestamptz", existingTable.get().columns().get("timestamp_with_timezone").type()), | ||
() -> assertEquals("timestamp", existingTable.get().columns().get("timestamp_without_timezone").type()), | ||
() -> assertEquals("timetz", existingTable.get().columns().get("time_with_timezone").type()), | ||
() -> assertEquals("time", existingTable.get().columns().get("time_without_timezone").type()), | ||
() -> assertEquals("date", existingTable.get().columns().get("date").type()), | ||
() -> assertEquals("jsonb", existingTable.get().columns().get("unknown").type())); | ||
// TODO assert on table indexing, etc. | ||
List<DestinationInitialState> initialStates = destinationHandler.gatherInitialState(List.of(incrementalDedupStream)); | ||
assertEquals(1, initialStates.size()); | ||
final DestinationInitialState initialState = initialStates.getFirst(); | ||
assertTrue(initialState.isFinalTablePresent()); | ||
assertFalse(initialState.isSchemaMismatch()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we do an assertEquals on the entire initialState object? In theory we can hand-construct the expected value, right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. owait, we don't have the table schema at all anymore right? so there's no particularly interesting fields to assert on |
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this comment applies :P
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lol it was inverted in redshift's handler, i fixed there and propagated that here 😂