diff --git a/airbyte-cdk/java/airbyte-cdk/README.md b/airbyte-cdk/java/airbyte-cdk/README.md index fd772777a9880..99b4fe58d1ba6 100644 --- a/airbyte-cdk/java/airbyte-cdk/README.md +++ b/airbyte-cdk/java/airbyte-cdk/README.md @@ -144,6 +144,7 @@ Maven and Gradle will automatically reference the correct (pinned) version of th | Version | Date | Pull Request | Subject | |:--------|:-----------|:-----------------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 0.28.20 | 2024-04-01 | [\#36584](https://github.com/airbytehq/airbyte/pull/36584) | Changes to make source-postgres compileable | | 0.28.19 | 2024-03-29 | [\#36619](https://github.com/airbytehq/airbyte/pull/36619) | Changes to make destination-postgres compileable | | 0.28.19 | 2024-03-29 | [\#36588](https://github.com/airbytehq/airbyte/pull/36588) | Changes to make destination-redshift compileable | | 0.28.19 | 2024-03-29 | [\#36610](https://github.com/airbytehq/airbyte/pull/36610) | remove airbyte-api generation, pull depdendency jars instead | diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/AbstractDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/AbstractDatabase.kt index 0dc001477877a..091b4b0be57d6 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/AbstractDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/AbstractDatabase.kt @@ -13,6 +13,6 @@ import com.fasterxml.jackson.databind.JsonNode * accidentally closing a shared resource. */ open class AbstractDatabase { - @JvmField var sourceConfig: JsonNode? = null - @JvmField var databaseConfig: JsonNode? = null + var sourceConfig: JsonNode? = null + var databaseConfig: JsonNode? = null } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt index 0c55344455440..eaac7acf8aba7 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/DbAnalyticsUtils.kt @@ -13,6 +13,7 @@ import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage object DbAnalyticsUtils { const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid" + @JvmStatic fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage { return AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1") } diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DatabaseDriver.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DatabaseDriver.kt index acd40d32b67cd..8f20d997b2c8b 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DatabaseDriver.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/factory/DatabaseDriver.kt @@ -34,6 +34,7 @@ enum class DatabaseDriver(val driverClassName: String, val urlFormatString: Stri * @param driverClassName The driver class name. * @return The matching [DatabaseDriver] enumerated value or `null` if no match is found. */ + @JvmStatic fun findByDriverClassName(driverClassName: String?): DatabaseDriver { lateinit var selected: DatabaseDriver diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt index 6ce36e575ed9b..8a381acc2bfee 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/db/jdbc/AbstractJdbcCompatibleSourceOperations.kt @@ -98,7 +98,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun putBoolean( + protected open fun putBoolean( node: ObjectNode, columnName: String?, resultSet: ResultSet, @@ -156,7 +156,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun putDouble( + protected open fun putDouble( node: ObjectNode, columnName: String?, resultSet: ResultSet, @@ -188,7 +188,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun putBigDecimal( + protected open fun putBigDecimal( node: ObjectNode, columnName: String?, resultSet: ResultSet, @@ -208,7 +208,12 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun putDate(node: ObjectNode, columnName: String?, resultSet: ResultSet, index: Int) { + protected open fun putDate( + node: ObjectNode, + columnName: String?, + resultSet: ResultSet, + index: Int + ) { node.put(columnName, resultSet.getString(index)) } @@ -280,7 +285,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun setTimestamp( + protected open fun setTimestamp( preparedStatement: PreparedStatement, parameterIndex: Int, value: String? @@ -293,7 +298,7 @@ abstract class AbstractJdbcCompatibleSourceOperations : } @Throws(SQLException::class) - protected fun setDate( + protected open fun setDate( preparedStatement: PreparedStatement, parameterIndex: Int, value: String diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt index e8ff27ccad66a..ac01c9e1f0a5a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/JdbcConnector.kt @@ -36,6 +36,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon * @param driverClassName name of the JDBC driver * @return DataSourceBuilder class used to create dynamic fields for DataSource */ + @JvmStatic fun getConnectionTimeout( connectionProperties: Map, driverClassName: String? diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt index f064ef6d923d0..e6aed62660241 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/AirbyteTraceMessageUtility.kt @@ -33,6 +33,7 @@ object AirbyteTraceMessageUtility { ) } + @JvmStatic fun emitEstimateTrace( byteEstimate: Long, type: AirbyteEstimateTraceMessage.Type?, @@ -55,10 +56,12 @@ object AirbyteTraceMessageUtility { ) } + @JvmStatic fun emitAnalyticsTrace(airbyteAnalyticsTraceMessage: AirbyteAnalyticsTraceMessage) { emitMessage(makeAnalyticsTraceAirbyteMessage(airbyteAnalyticsTraceMessage)) } + @JvmStatic fun emitErrorTrace( e: Throwable, displayMessage: String?, diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshHelpers.kt b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshHelpers.kt index aa5ace8399367..04cfae5c28e6d 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshHelpers.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/kotlin/io/airbyte/cdk/integrations/base/ssh/SshHelpers.kt @@ -17,6 +17,7 @@ object SshHelpers { get() = getSpecAndInjectSsh(Optional.empty()) @Throws(IOException::class) + @JvmStatic fun getSpecAndInjectSsh(group: Optional): ConnectorSpecification? { val originalSpec = Jsons.deserialize( @@ -28,6 +29,7 @@ object SshHelpers { @JvmOverloads @Throws(IOException::class) + @JvmStatic fun injectSshIntoSpec( connectorSpecification: ConnectorSpecification, group: Optional = Optional.empty() diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties index 4d445b590b010..7a4f9d280d6ea 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties +++ b/airbyte-cdk/java/airbyte-cdk/core/src/main/resources/version.properties @@ -1 +1 @@ -version=0.28.19 +version=0.28.20 diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.kt index 84c1ab31344dd..941bcdc25588a 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/integrations/base/ssh/SshBastionContainer.kt @@ -134,6 +134,7 @@ class SshBastionContainer : AutoCloseable { private val SSH_USER: String? = "sshuser" private val SSH_PASSWORD: String? = "secret" + @JvmStatic /** * Returns the inner docker network ip address and port of a container. This can be used to * reach a container from another container running on the same network @@ -157,6 +158,7 @@ class SshBastionContainer : AutoCloseable { ) } + @JvmStatic /** * Returns the outer docker network ip address and port of a container. This can be used to * reach a container from the host machine diff --git a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt index aaace4f3b3274..e587de3b26f94 100644 --- a/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt +++ b/airbyte-cdk/java/airbyte-cdk/core/src/testFixtures/kotlin/io/airbyte/cdk/testutils/TestDatabase.kt @@ -302,10 +302,7 @@ protected constructor(val container: C) : AutoCloseable { } companion object { - private val DEFAULT_CDC_REPLICATION_INITIAL_WAIT: Duration = Duration.ofSeconds(5) - fun getDefaultCdcReplicationInitialWait(): Duration { - return DEFAULT_CDC_REPLICATION_INITIAL_WAIT - } + @JvmField val DEFAULT_CDC_REPLICATION_INITIAL_WAIT: Duration? = Duration.ofSeconds(5) } } diff --git a/airbyte-cdk/java/airbyte-cdk/datastore-postgres/src/main/kotlin/io/airbyte/cdk/db/PgLsn.kt b/airbyte-cdk/java/airbyte-cdk/datastore-postgres/src/main/kotlin/io/airbyte/cdk/db/PgLsn.kt index b2155323ba92f..28be30badbee5 100644 --- a/airbyte-cdk/java/airbyte-cdk/datastore-postgres/src/main/kotlin/io/airbyte/cdk/db/PgLsn.kt +++ b/airbyte-cdk/java/airbyte-cdk/datastore-postgres/src/main/kotlin/io/airbyte/cdk/db/PgLsn.kt @@ -35,6 +35,7 @@ class PgLsn private constructor(private val lsn: Long) : Comparable { return PgLsn(lsn) } + @JvmStatic fun fromPgString(lsn: String): PgLsn { return PgLsn(lsnToLong(lsn)) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt index c23ad4c0b73ee..96f4c815a9df4 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/AirbyteDebeziumHandler.kt @@ -154,6 +154,7 @@ class AirbyteDebeziumHandler( */ const val QUEUE_CAPACITY: Int = 10000 + @JvmStatic fun isAnyStreamIncrementalSyncMode(catalog: ConfiguredAirbyteCatalog): Boolean { return catalog.streams .stream() diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt index 0930a2ef37901..ca8c65f5fa566 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/CdcTargetPosition.kt @@ -70,7 +70,7 @@ interface CdcTargetPosition { * @return Returns `true` if both offsets are at the same position. Otherwise, it returns * `false` */ - fun isSameOffset(offsetA: Map<*, *>, offsetB: Map): Boolean { + fun isSameOffset(offsetA: Map, offsetB: Map): Boolean { return false } } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/DebeziumIteratorConstants.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/DebeziumIteratorConstants.kt index f143e084e32a3..95108db4aec55 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/DebeziumIteratorConstants.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/DebeziumIteratorConstants.kt @@ -12,6 +12,6 @@ object DebeziumIteratorConstants { // TODO: Move these variables to a separate class IteratorConstants, as they will be used in // state // iterators for non debezium cases too. - val SYNC_CHECKPOINT_DURATION: Duration = Duration.ofMinutes(15) + @JvmField val SYNC_CHECKPOINT_DURATION: Duration = Duration.ofMinutes(15) const val SYNC_CHECKPOINT_RECORDS: Int = 10000 } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt index ce9fa8cd00358..2e3749e336c4f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/AirbyteFileOffsetBackingStore.kt @@ -193,6 +193,7 @@ class AirbyteFileOffsetBackingStore( return ByteBuffer.wrap(s!!.toByteArray(StandardCharsets.UTF_8)) } + @JvmStatic fun initializeState( cdcState: JsonNode?, dbName: Optional diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumConverterUtils.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumConverterUtils.kt index 6c87fb88a35a4..a2044acb064f2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumConverterUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumConverterUtils.kt @@ -24,6 +24,7 @@ class DebeziumConverterUtils private constructor() { private val LOGGER: Logger = LoggerFactory.getLogger(DebeziumConverterUtils::class.java) /** TODO : Replace usage of this method with [io.airbyte.cdk.db.jdbc.DateTimeConverter] */ + @JvmStatic fun convertDate(input: Any): String { /** * While building this custom converter we were not sure what type debezium could return @@ -61,6 +62,7 @@ class DebeziumConverterUtils private constructor() { return input.toString() } + @JvmStatic fun convertDefaultValue(field: RelationalColumn): Any? { if (field.isOptional) { return null diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.kt index 8c4bc32ae2fc3..4ebfb06937114 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/debezium/internals/DebeziumMessageProducer.kt @@ -38,7 +38,7 @@ class DebeziumMessageProducer( * generating an unneeded usage of networking and processing. */ private val initialOffset: HashMap - private val previousCheckpointOffset: HashMap + private val previousCheckpointOffset: HashMap private val offsetManager: AirbyteFileOffsetBackingStore? private val targetPosition: CdcTargetPosition private val schemaHistoryManager: Optional @@ -55,7 +55,7 @@ class DebeziumMessageProducer( throw RuntimeException("Offset manager cannot be null") } this.schemaHistoryManager = schemaHistoryManager - this.previousCheckpointOffset = offsetManager.read() as HashMap + this.previousCheckpointOffset = offsetManager.read() as HashMap this.initialOffset = HashMap(this.previousCheckpointOffset) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt index b73d956d499b3..5bf04dfbf25b2 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/AbstractJdbcSource.kt @@ -83,13 +83,13 @@ import org.slf4j.LoggerFactory @SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE") abstract class AbstractJdbcSource( driverClass: String, - protected val streamingQueryConfigProvider: Supplier, + @JvmField val streamingQueryConfigProvider: Supplier, sourceOperations: JdbcCompatibleSourceOperations ) : AbstractDbSource(driverClass), Source { - protected val sourceOperations: JdbcCompatibleSourceOperations + @JvmField val sourceOperations: JdbcCompatibleSourceOperations override var quoteString: String? = null - protected var dataSources: MutableCollection = ArrayList() + @JvmField val dataSources: MutableCollection = ArrayList() init { this.sourceOperations = sourceOperations @@ -304,7 +304,10 @@ abstract class AbstractJdbcSource( // needs to override isNotInternalSchema for connectors that override // getPrivilegesTableForCurrentUser() - protected fun isNotInternalSchema(jsonNode: JsonNode, internalSchemas: Set): Boolean { + protected open fun isNotInternalSchema( + jsonNode: JsonNode, + internalSchemas: Set + ): Boolean { return !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText()) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcDataSourceUtils.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcDataSourceUtils.kt index eda6d797635aa..a5e06aefbb489 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcDataSourceUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcDataSourceUtils.kt @@ -18,6 +18,7 @@ object JdbcDataSourceUtils { * @param defaultParameters connection properties map as specified by each Jdbc source * @throws IllegalArgumentException */ + @JvmStatic fun assertCustomParametersDontOverwriteDefaultParameters( customParameters: Map, defaultParameters: Map @@ -57,6 +58,7 @@ object JdbcDataSourceUtils { * @param config A configuration used to check Jdbc connection * @return A mapping of the default connection properties */ + @JvmStatic fun getDefaultConnectionProperties(config: JsonNode): Map { // NOTE that Postgres returns an empty map for some reason? return parseJdbcParameters( diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSSLConnectionUtils.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSSLConnectionUtils.kt index 8752397229912..8578ecd42f1a3 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSSLConnectionUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/JdbcSSLConnectionUtils.kt @@ -71,6 +71,7 @@ class JdbcSSLConnectionUtils { * @return map containing relevant parsed values including location of keystore or an empty * map */ + @JvmStatic fun parseSSLConfig(config: JsonNode): Map { LOGGER.debug("source config: {}", config) @@ -143,6 +144,7 @@ class JdbcSSLConnectionUtils { return additionalParameters } + @JvmStatic fun prepareCACertificateKeyStore(config: JsonNode): Pair? { // if config available // if has CA cert - make keystore @@ -266,6 +268,7 @@ class JdbcSSLConnectionUtils { return clientCertKeyStorePair } + @JvmStatic fun fileFromCertPem(certPem: String?): Path { try { val path = Files.createTempFile(null, ".crt") diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/dto/JdbcPrivilegeDto.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/dto/JdbcPrivilegeDto.kt index 0e689e819b3ae..2f9f9be1f4065 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/dto/JdbcPrivilegeDto.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/jdbc/dto/JdbcPrivilegeDto.kt @@ -79,6 +79,7 @@ class JdbcPrivilegeDto( } companion object { + @JvmStatic fun builder(): JdbcPrivilegeDtoBuilder { return JdbcPrivilegeDtoBuilder() } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt index 4b5d6014c5c6e..201a845d76350 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/AbstractDbSource.kt @@ -52,7 +52,7 @@ abstract class AbstractDbSource protected constructor(driverClassName: String) : JdbcConnector(driverClassName), Source, AutoCloseable { // TODO: Remove when the flag is not use anymore - protected var featureFlags: FeatureFlags = EnvVariableFeatureFlags() + var featureFlags: FeatureFlags = EnvVariableFeatureFlags() @Trace(operationName = CHECK_TRACE_OPERATION_NAME) @Throws(Exception::class) @@ -270,7 +270,7 @@ protected constructor(driverClassName: String) : * @throws SQLException exception */ @Throws(SQLException::class) - protected fun verifyCursorColumnValues( + protected open fun verifyCursorColumnValues( database: Database, schema: String?, tableName: String?, @@ -286,7 +286,7 @@ protected constructor(driverClassName: String) : * * @param database database */ - protected fun estimateFullRefreshSyncSize( + protected open fun estimateFullRefreshSyncSize( database: Database, configuredAirbyteStream: ConfiguredAirbyteStream? ) { @@ -327,7 +327,7 @@ protected constructor(driverClassName: String) : ) } - protected fun getIncrementalIterators( + protected open fun getIncrementalIterators( database: Database, catalog: ConfiguredAirbyteCatalog, tableNameToTable: Map>>, @@ -601,7 +601,7 @@ protected constructor(driverClassName: String) : * Oracle DB - the schema is the user, you cannot REVOKE a privilege on a table from its owner). */ @Throws(SQLException::class) - protected fun getPrivilegesTableForCurrentUser( + protected open fun getPrivilegesTableForCurrentUser( database: JdbcDatabase?, schema: String? ): Set { @@ -670,7 +670,7 @@ protected constructor(driverClassName: String) : */ get - protected val excludedViews: Set + protected open val excludedViews: Set /** * Get list of system views in order to exclude them from the `discover` result list. * @@ -765,7 +765,7 @@ protected constructor(driverClassName: String) : cursorFieldType: DataType ): AutoCloseableIterator - protected val stateEmissionFrequency: Int + protected open val stateEmissionFrequency: Int /** * When larger than 0, the incremental iterator will emit intermediate state for every N * records. Please note that if intermediate state emission is enabled, the incremental diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt index 4bf46677fd3b5..1e656231152ba 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/DbSourceDiscoverUtil.kt @@ -147,6 +147,7 @@ object DbSourceDiscoverUtil { return AirbyteCatalog().withStreams(streams) } + @JvmStatic fun getFullyQualifiedTableName(nameSpace: String?, tableName: String): String { return if (nameSpace != null) "$nameSpace.$tableName" else tableName } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt index bd164a44486a2..e0b55663dcd0b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/RelationalDbQueryUtils.kt @@ -28,6 +28,7 @@ object RelationalDbQueryUtils { } } + @JvmStatic fun enquoteIdentifierList(identifiers: List, quoteString: String): String { val joiner = StringJoiner(",") for (identifier in identifiers) { @@ -37,6 +38,7 @@ object RelationalDbQueryUtils { } /** @return fully qualified table name with the schema (if a schema exists) in quotes. */ + @JvmStatic fun getFullyQualifiedTableNameWithQuoting( nameSpace: String?, tableName: String, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt index 777792a4f6edf..be452fd41360b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/SourceStateIterator.kt @@ -54,7 +54,7 @@ open class SourceStateIterator( try { val message = messageIterator.next() val processedMessage = - sourceStateMessageProducer.processRecordMessage(stream!!, message) + sourceStateMessageProducer.processRecordMessage(stream, message) recordCount++ return processedMessage } catch (e: Exception) { @@ -63,7 +63,7 @@ open class SourceStateIterator( } else if (!hasEmittedFinalState) { hasEmittedFinalState = true val finalStateMessageForStream = - sourceStateMessageProducer.createFinalStateMessage(stream!!) + sourceStateMessageProducer.createFinalStateMessage(stream) finalStateMessageForStream!!.withSourceStats( AirbyteStateStats().withRecordCount(recordCount.toDouble()) ) diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManagerFactory.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManagerFactory.kt index 2d34be63c3b87..30816a665a94e 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManagerFactory.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StateManagerFactory.kt @@ -26,6 +26,7 @@ object StateManagerFactory { * manager. * @return A newly created [StateManager] implementation based on the provided state. */ + @JvmStatic fun createStateManager( supportedStateType: AirbyteStateMessage.AirbyteStateType?, initialState: List?, diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt index e09c7d90d03bf..d2d7997747612 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/main/kotlin/io/airbyte/cdk/integrations/source/relationaldb/state/StreamStateManager.kt @@ -21,7 +21,7 @@ import org.slf4j.LoggerFactory * This implementation generates a state object for each stream detected in catalog/map of known * streams to cursor information stored in this manager. */ -class StreamStateManager +open class StreamStateManager /** * Constructs a new [StreamStateManager] that is seeded with the provided [AirbyteStateMessage]. * diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt index 9afef4bfcd9fd..812b38ea29ff6 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debezium/CdcSourceTest.kt @@ -94,20 +94,20 @@ abstract class CdcSourceTest> { protected abstract fun addCdcDefaultCursorField(stream: AirbyteStream?) - protected abstract fun assertExpectedStateMessages(stateMessages: List?) + protected abstract fun assertExpectedStateMessages(stateMessages: List) // TODO: this assertion should be added into test cases in this class, we will need to implement // corresponding iterator for other connectors before // doing so. - protected fun assertExpectedStateMessageCountMatches( - stateMessages: List?, + protected open fun assertExpectedStateMessageCountMatches( + stateMessages: List, totalCount: Long ) { // Do nothing. } @BeforeEach - protected fun setup() { + protected open fun setup() { testdb = createTestDatabase() createTables() populateTables() @@ -381,7 +381,7 @@ abstract class CdcSourceTest> { assertExpectedStateMessageCountMatches(stateMessages, MODEL_RECORDS.size.toLong()) } - protected fun compareTargetPositionFromTheRecordsWithTargetPostionGeneratedBeforeSync( + protected open fun compareTargetPositionFromTheRecordsWithTargetPostionGeneratedBeforeSync( targetPosition: CdcTargetPosition<*>?, record: AirbyteRecordMessage ) { @@ -412,8 +412,8 @@ abstract class CdcSourceTest> { assertCdcMetaData(recordMessages2[0].data, false) } - protected fun assertExpectedStateMessagesFromIncrementalSync( - stateMessages: List? + protected open fun assertExpectedStateMessagesFromIncrementalSync( + stateMessages: List ) { assertExpectedStateMessages(stateMessages) } @@ -529,8 +529,8 @@ abstract class CdcSourceTest> { ) } - protected fun assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync( - stateAfterFirstBatch: List? + protected open fun assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync( + stateAfterFirstBatch: List ) { assertExpectedStateMessages(stateAfterFirstBatch) } @@ -650,7 +650,9 @@ abstract class CdcSourceTest> { assertExpectedStateMessageCountMatches(stateMessages, 0) } - protected fun assertExpectedStateMessagesForNoData(stateMessages: List?) { + protected open fun assertExpectedStateMessagesForNoData( + stateMessages: List + ) { assertExpectedStateMessages(stateMessages) } @@ -703,7 +705,7 @@ abstract class CdcSourceTest> { @Test @Throws(Exception::class) - fun newTableSnapshotTest() { + open fun newTableSnapshotTest() { val firstBatchIterator = source().read(config()!!, configuredCatalog, null) val dataFromFirstBatch = AutoCloseableIterators.toListAndClose(firstBatchIterator) val recordsFromFirstBatch = extractRecordMessages(dataFromFirstBatch) @@ -920,7 +922,7 @@ abstract class CdcSourceTest> { ) } - protected fun assertStateMessagesForNewTableSnapshotTest( + protected open fun assertStateMessagesForNewTableSnapshotTest( stateMessages: List, stateMessageEmittedAfterFirstSyncCompletion: AirbyteStateMessage ) { @@ -1042,13 +1044,14 @@ abstract class CdcSourceTest> { companion object { private val LOGGER: Logger = LoggerFactory.getLogger(CdcSourceTest::class.java) - protected const val MODELS_STREAM_NAME: String = "models" - protected val STREAM_NAMES: Set = java.util.Set.of(MODELS_STREAM_NAME) + const val MODELS_STREAM_NAME: String = "models" + @JvmField val STREAM_NAMES: Set = java.util.Set.of(MODELS_STREAM_NAME) protected const val COL_ID: String = "id" protected const val COL_MAKE_ID: String = "make_id" protected const val COL_MODEL: String = "model" - protected val MODEL_RECORDS: List = + @JvmField + val MODEL_RECORDS: List = ImmutableList.of( Jsons.jsonNode(ImmutableMap.of(COL_ID, 11, COL_MAKE_ID, 1, COL_MODEL, "Fiesta")), Jsons.jsonNode(ImmutableMap.of(COL_ID, 12, COL_MAKE_ID, 1, COL_MODEL, "Focus")), diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debug/DebugUtil.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debug/DebugUtil.kt index d04a8ea5e0146..e0d59e0a2fe4b 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debug/DebugUtil.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/debug/DebugUtil.kt @@ -20,6 +20,7 @@ import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog object DebugUtil { @Suppress("deprecation") @Throws(Exception::class) + @JvmStatic fun debug(debugSource: Source) { val debugConfig = config val configuredAirbyteCatalog = catalog diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt index ed86b6ac48f81..131822f1052ba 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.kt @@ -110,7 +110,7 @@ abstract class JdbcSourceAcceptanceTest> { @BeforeEach @Throws(Exception::class) - fun setup() { + open fun setup() { testdb = createTestDatabase() if (supportsSchemas()) { createSchemas() @@ -178,7 +178,7 @@ abstract class JdbcSourceAcceptanceTest> { ) } - protected fun maybeSetShorterConnectionTimeout(config: JsonNode?) { + protected open fun maybeSetShorterConnectionTimeout(config: JsonNode?) { // Optionally implement this to speed up test cases which will result in a connection // timeout. } @@ -419,7 +419,7 @@ abstract class JdbcSourceAcceptanceTest> { Assertions.assertTrue(actualMessages.containsAll(expectedMessages)) } - protected val airbyteMessagesReadOneColumn: List + protected open val airbyteMessagesReadOneColumn: List get() { val expectedMessages = testMessages @@ -472,7 +472,7 @@ abstract class JdbcSourceAcceptanceTest> { Assertions.assertTrue(actualMessages.containsAll(expectedMessages)) } - protected fun getAirbyteMessagesSecondSync(streamName: String?): List { + protected open fun getAirbyteMessagesSecondSync(streamName: String?): List { return testMessages .stream() .map { `object`: AirbyteMessage -> Jsons.clone(`object`) } @@ -513,7 +513,7 @@ abstract class JdbcSourceAcceptanceTest> { Assertions.assertTrue(actualMessages.containsAll(expectedMessages)) } - protected fun getAirbyteMessagesForTablesWithQuoting( + protected open fun getAirbyteMessagesForTablesWithQuoting( streamForTableWithSpaces: ConfiguredAirbyteStream ): List { return testMessages @@ -586,7 +586,7 @@ abstract class JdbcSourceAcceptanceTest> { ) } - protected fun getAirbyteMessagesCheckCursorSpaceInColumnName( + protected open fun getAirbyteMessagesCheckCursorSpaceInColumnName( streamWithSpaces: ConfiguredAirbyteStream ): List { val firstMessage = testMessages[0] @@ -615,7 +615,7 @@ abstract class JdbcSourceAcceptanceTest> { } @Throws(Exception::class) - protected fun incrementalDateCheck() { + protected open fun incrementalDateCheck() { incrementalCursorCheck( COL_UPDATED_AT, "2005-10-18", @@ -698,7 +698,7 @@ abstract class JdbcSourceAcceptanceTest> { Assertions.assertTrue(actualMessagesSecondSync.containsAll(expectedMessages)) } - protected fun executeStatementReadIncrementallyTwice() { + protected open fun executeStatementReadIncrementallyTwice() { testdb .with( "INSERT INTO %s (id, name, updated_at) VALUES (4, 'riker', '2006-10-19')", @@ -710,7 +710,9 @@ abstract class JdbcSourceAcceptanceTest> { ) } - protected fun getExpectedAirbyteMessagesSecondSync(namespace: String?): List { + protected open fun getExpectedAirbyteMessagesSecondSync( + namespace: String? + ): List { val expectedMessages: MutableList = ArrayList() expectedMessages.add( AirbyteMessage() @@ -767,7 +769,7 @@ abstract class JdbcSourceAcceptanceTest> { @Test @Throws(Exception::class) - protected fun testReadMultipleTablesIncrementally() { + protected open fun testReadMultipleTablesIncrementally() { val tableName2 = TABLE_NAME + 2 val streamName2 = streamName() + 2 val fqTableName2 = getFullyQualifiedTableName(tableName2) @@ -865,7 +867,7 @@ abstract class JdbcSourceAcceptanceTest> { Assertions.assertTrue(actualMessagesFirstSync.containsAll(expectedMessagesFirstSync)) } - protected fun getAirbyteMessagesSecondStreamWithNamespace( + protected open fun getAirbyteMessagesSecondStreamWithNamespace( streamName2: String? ): List { return testMessages @@ -1105,7 +1107,7 @@ abstract class JdbcSourceAcceptanceTest> { } } - protected fun getStateData(airbyteMessage: AirbyteMessage, streamName: String): JsonNode { + protected open fun getStateData(airbyteMessage: AirbyteMessage, streamName: String): JsonNode { for (stream in airbyteMessage.state.data["streams"]) { if (stream["stream_name"].asText() == streamName) { return stream @@ -1174,7 +1176,7 @@ abstract class JdbcSourceAcceptanceTest> { Assertions.assertTrue(actualMessages.containsAll(expectedMessages)) } - protected fun buildStreamState( + protected open fun buildStreamState( configuredAirbyteStream: ConfiguredAirbyteStream, cursorField: String?, cursorValue: String? @@ -1202,7 +1204,7 @@ abstract class JdbcSourceAcceptanceTest> { return catalog } - protected fun getCatalog(defaultNamespace: String?): AirbyteCatalog { + protected open fun getCatalog(defaultNamespace: String?): AirbyteCatalog { return AirbyteCatalog() .withStreams( java.util.List.of( @@ -1248,7 +1250,7 @@ abstract class JdbcSourceAcceptanceTest> { ) } - protected val testMessages: List + protected open val testMessages: List get() = java.util.List.of( AirbyteMessage() @@ -1603,53 +1605,61 @@ abstract class JdbcSourceAcceptanceTest> { } companion object { - @JvmStatic protected var SCHEMA_NAME: String = "jdbc_integration_test1" - protected var SCHEMA_NAME2: String = "jdbc_integration_test2" - protected var TEST_SCHEMAS: Set = java.util.Set.of(SCHEMA_NAME, SCHEMA_NAME2) + @JvmField val SCHEMA_NAME: String = "jdbc_integration_test1" + @JvmField val SCHEMA_NAME2: String = "jdbc_integration_test2" + @JvmField val TEST_SCHEMAS: Set = java.util.Set.of(SCHEMA_NAME, SCHEMA_NAME2) - protected var TABLE_NAME: String = "id_and_name" - protected var TABLE_NAME_WITH_SPACES: String = "id and name" - protected var TABLE_NAME_WITHOUT_PK: String = "id_and_name_without_pk" - protected var TABLE_NAME_COMPOSITE_PK: String = "full_name_composite_pk" - protected var TABLE_NAME_WITHOUT_CURSOR_TYPE: String = "table_without_cursor_type" - protected var TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE: String = "table_with_null_cursor_type" + @JvmField val TABLE_NAME: String = "id_and_name" + @JvmField val TABLE_NAME_WITH_SPACES: String = "id and name" + @JvmField val TABLE_NAME_WITHOUT_PK: String = "id_and_name_without_pk" + @JvmField val TABLE_NAME_COMPOSITE_PK: String = "full_name_composite_pk" + @JvmField val TABLE_NAME_WITHOUT_CURSOR_TYPE: String = "table_without_cursor_type" + @JvmField val TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE: String = "table_with_null_cursor_type" // this table is used in testing incremental sync with concurrent insertions - protected var TABLE_NAME_AND_TIMESTAMP: String = "name_and_timestamp" - - protected var COL_ID: String = "id" - protected var COL_NAME: String = "name" - protected var COL_UPDATED_AT: String = "updated_at" - protected var COL_FIRST_NAME: String = "first_name" - protected var COL_LAST_NAME: String = "last_name" - protected var COL_LAST_NAME_WITH_SPACE: String = "last name" - protected var COL_CURSOR: String = "cursor_field" - protected var COL_TIMESTAMP: String = "timestamp" - protected var COL_TIMESTAMP_TYPE: String = "TIMESTAMP" - protected var ID_VALUE_1: Number = 1 - protected var ID_VALUE_2: Number = 2 - protected var ID_VALUE_3: Number = 3 - protected var ID_VALUE_4: Number = 4 - protected var ID_VALUE_5: Number = 5 - - protected var DROP_SCHEMA_QUERY: String = "DROP SCHEMA IF EXISTS %s CASCADE" + @JvmField val TABLE_NAME_AND_TIMESTAMP: String = "name_and_timestamp" + + @JvmField val COL_ID: String = "id" + @JvmField val COL_NAME: String = "name" + @JvmField val COL_UPDATED_AT: String = "updated_at" + @JvmField val COL_FIRST_NAME: String = "first_name" + @JvmField val COL_LAST_NAME: String = "last_name" + @JvmField val COL_LAST_NAME_WITH_SPACE: String = "last name" + @JvmField val COL_CURSOR: String = "cursor_field" + @JvmField val COL_TIMESTAMP: String = "timestamp" + @JvmField val ID_VALUE_1: Number = 1 + @JvmField val ID_VALUE_2: Number = 2 + @JvmField val ID_VALUE_3: Number = 3 + @JvmField val ID_VALUE_4: Number = 4 + @JvmField val ID_VALUE_5: Number = 5 + + @JvmField val DROP_SCHEMA_QUERY: String = "DROP SCHEMA IF EXISTS %s CASCADE" + @JvmField + val CREATE_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY: String = + "CREATE TABLE %s (%s VARCHAR(20));" + @JvmField + val INSERT_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY: String = + "INSERT INTO %s VALUES('Hello world :)');" + @JvmField + val INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY: String = + "INSERT INTO %s (name, timestamp) VALUES ('%s', '%s')" + + @JvmField protected var COL_TIMESTAMP_TYPE: String = "TIMESTAMP" + @JvmField protected var COLUMN_CLAUSE_WITH_PK: String = "id INTEGER, name VARCHAR(200) NOT NULL, updated_at DATE NOT NULL" + @JvmField protected var COLUMN_CLAUSE_WITHOUT_PK: String = "id INTEGER, name VARCHAR(200) NOT NULL, updated_at DATE NOT NULL" + @JvmField protected var COLUMN_CLAUSE_WITH_COMPOSITE_PK: String = "first_name VARCHAR(200) NOT NULL, last_name VARCHAR(200) NOT NULL, updated_at DATE NOT NULL" @JvmField var CREATE_TABLE_WITHOUT_CURSOR_TYPE_QUERY: String = "CREATE TABLE %s (%s bit NOT NULL);" @JvmField var INSERT_TABLE_WITHOUT_CURSOR_TYPE_QUERY: String = "INSERT INTO %s VALUES(0);" - protected var CREATE_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY: String = - "CREATE TABLE %s (%s VARCHAR(20));" - protected var INSERT_TABLE_WITH_NULLABLE_CURSOR_TYPE_QUERY: String = - "INSERT INTO %s VALUES('Hello world :)');" - protected var INSERT_TABLE_NAME_AND_TIMESTAMP_QUERY: String = - "INSERT INTO %s (name, timestamp) VALUES ('%s', '%s')" + @JvmStatic protected fun setEmittedAtToNull(messages: Iterable) { for (actualMessage in messages) { if (actualMessage.record != null) { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt index 64204de398788..cf33d04325b62 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceConnectorTest.kt @@ -138,7 +138,7 @@ abstract class AbstractSourceConnectorTest { * Override this method if you want to do any per-test setup that depends on being able to e.g. * [.runRead]. */ - @Throws(Exception::class) protected fun postSetup() {} + @Throws(Exception::class) protected open fun postSetup() {} @AfterEach @Throws(Exception::class) @@ -146,7 +146,7 @@ abstract class AbstractSourceConnectorTest { tearDown(environment) } - protected fun featureFlags(): FeatureFlags { + protected open fun featureFlags(): FeatureFlags { return EnvVariableFeatureFlags() } @@ -253,7 +253,7 @@ abstract class AbstractSourceConnectorTest { } @Throws(Exception::class) - protected fun runRead(configuredCatalog: ConfiguredAirbyteCatalog?): List { + protected open fun runRead(configuredCatalog: ConfiguredAirbyteCatalog?): List { return runRead(configuredCatalog, null) } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt index 573b8852d4fa1..7938f41825062 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/AbstractSourceDatabaseTypeTest.kt @@ -83,7 +83,7 @@ abstract class AbstractSourceDatabaseTypeTest : AbstractSourceConnectorTest() { * Test the 'discover' command. TODO (liren): Some existing databases may fail testDataTypes(), * so it is turned off by default. It should be enabled for all databases eventually. */ - protected fun testCatalog(): Boolean { + protected open fun testCatalog(): Boolean { return false } diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt index 8045d5377a097..6c35a829ee71f 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/SourceAcceptanceTest.kt @@ -444,6 +444,7 @@ abstract class SourceAcceptanceTest : AbstractSourceConnectorTest() { private val LOGGER: Logger = LoggerFactory.getLogger(SourceAcceptanceTest::class.java) + @JvmStatic protected fun filterRecords( messages: Collection? ): List { diff --git a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt index c14e9f7e33a47..e0f137b2547f9 100644 --- a/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt +++ b/airbyte-cdk/java/airbyte-cdk/db-sources/src/testFixtures/kotlin/io/airbyte/cdk/integrations/standardtest/source/TestDataHolder.kt @@ -211,6 +211,7 @@ internal constructor( * * @return builder for setup comprehensive test */ + @JvmStatic fun builder(): TestDataHolderBuilder { return TestDataHolderBuilder() } diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt index d671ee9d2c2eb..e32b0e5533f84 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/features/FeatureFlagsWrapper.kt @@ -38,6 +38,7 @@ open class FeatureFlagsWrapper(private val wrapped: FeatureFlags) : FeatureFlags companion object { /** Overrides the [FeatureFlags.deploymentMode] method in the feature flags. */ + @JvmStatic fun overridingDeploymentMode(wrapped: FeatureFlags, deploymentMode: String?): FeatureFlags { return object : FeatureFlagsWrapper(wrapped) { override fun deploymentMode(): String? { diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt index be4ee585ce3d6..33bcfce0ede8c 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/io/IOs.kt @@ -72,6 +72,7 @@ object IOs { } @Throws(IOException::class) + @JvmStatic fun getTail(numLines: Int, path: Path?): List { if (path == null) { return emptyList() diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt index 5c5eb575d7310..44f6af02fc641 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/json/Jsons.kt @@ -188,6 +188,7 @@ object Jsons { return jsonNode(emptyMap()) } + @JvmStatic fun arrayNode(): ArrayNode { return OBJECT_MAPPER.createArrayNode() } @@ -343,6 +344,7 @@ object Jsons { * provided for backward compatibility. */ @JvmOverloads + @JvmStatic fun flatten(node: JsonNode, applyFlattenToArray: Boolean = false): Map { if (node.isObject) { val output: MutableMap = HashMap() diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/map/MoreMaps.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/map/MoreMaps.kt index 9f3af6610df97..6aa4db06d2b57 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/map/MoreMaps.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/map/MoreMaps.kt @@ -7,6 +7,7 @@ import com.google.common.base.Preconditions object MoreMaps { @SafeVarargs + @JvmStatic fun merge(vararg maps: Map): Map { val outputMap: MutableMap = HashMap() diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/stream/AirbyteStreamUtils.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/stream/AirbyteStreamUtils.kt index 72f8d6f862a57..1d7d9df582f5a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/stream/AirbyteStreamUtils.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/stream/AirbyteStreamUtils.kt @@ -28,6 +28,7 @@ object AirbyteStreamUtils { * @param namespace The namespace of the stream. * @return The [AirbyteStreamNameNamespacePair]. */ + @JvmStatic fun convertFromNameAndNamespace( name: String?, namespace: String? diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/AutoCloseableIterators.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/AutoCloseableIterators.kt index ea531295e6271..da852d8ae571a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/AutoCloseableIterators.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/AutoCloseableIterators.kt @@ -20,6 +20,7 @@ object AutoCloseableIterators { * @param type * @return closeable iterator */ + @JvmStatic fun fromIterator(iterator: Iterator): AutoCloseableIterator { return DefaultAutoCloseableIterator(iterator, VoidCallable.NOOP, null) } @@ -31,6 +32,7 @@ object AutoCloseableIterators { * @param type * @return closeable iterator */ + @JvmStatic fun fromIterator( iterator: Iterator, airbyteStream: AirbyteStreamNameNamespacePair? @@ -47,6 +49,7 @@ object AutoCloseableIterators { * @param type * @return new autocloseable iterator with the close function appended */ + @JvmStatic fun fromIterator( iterator: Iterator, onClose: VoidCallable, @@ -63,6 +66,7 @@ object AutoCloseableIterators { * @param type * @return autocloseable iterator */ + @JvmStatic fun fromStream( stream: Stream, airbyteStream: AirbyteStreamNameNamespacePair? @@ -72,6 +76,7 @@ object AutoCloseableIterators { /** Consumes entire iterator and collect it into a list. Then it closes the iterator. */ @Throws(Exception::class) + @JvmStatic fun toListAndClose(iterator: AutoCloseableIterator): List { iterator.use { return MoreIterators.toList(iterator) @@ -88,6 +93,7 @@ object AutoCloseableIterators { * @param type * @return autocloseable iterator */ + @JvmStatic fun lazyIterator( iteratorSupplier: Supplier>, airbyteStream: AirbyteStreamNameNamespacePair? @@ -149,6 +155,7 @@ object AutoCloseableIterators { * @param output type * @return mapped autocloseable iterator */ + @JvmStatic fun transform( fromIterator: AutoCloseableIterator, function: Function @@ -166,6 +173,7 @@ object AutoCloseableIterators { * @param output type * @return mapped autocloseable iterator */ + @JvmStatic fun transform( fromIterator: AutoCloseableIterator, airbyteStream: AirbyteStreamNameNamespacePair?, @@ -189,6 +197,7 @@ object AutoCloseableIterators { * @return autocloseable iterator that still has the close functionality of the original input * iterator but is transformed by the iterator output by the iteratorCreator */ + @JvmStatic fun transform( iteratorCreator: Function, Iterator>, autoCloseableIterator: AutoCloseableIterator, @@ -201,6 +210,7 @@ object AutoCloseableIterators { ) } + @JvmStatic fun transformIterator( iteratorCreator: Function, Iterator>, autoCloseableIterator: AutoCloseableIterator, diff --git a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/MoreIterators.kt b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/MoreIterators.kt index f41f7aaa39413..2692721a3bb4a 100644 --- a/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/MoreIterators.kt +++ b/airbyte-cdk/java/airbyte-cdk/dependencies/src/main/kotlin/io/airbyte/commons/util/MoreIterators.kt @@ -16,6 +16,7 @@ object MoreIterators { * @return iterator with all elements */ @SafeVarargs + @JvmStatic fun of(vararg elements: T): Iterator { return Arrays.asList(*elements).iterator() } @@ -27,6 +28,7 @@ object MoreIterators { * @param type * @return list */ + @JvmStatic fun toList(iterator: Iterator): List { val list: MutableList = ArrayList() while (iterator.hasNext()) { @@ -42,6 +44,7 @@ object MoreIterators { * @param type * @return set */ + @JvmStatic fun toSet(iterator: Iterator): Set { val set: MutableSet = HashSet() while (iterator.hasNext()) { diff --git a/airbyte-integrations/connectors/source-postgres/build.gradle b/airbyte-integrations/connectors/source-postgres/build.gradle index 2bc702444aced..660dfdfb71c73 100644 --- a/airbyte-integrations/connectors/source-postgres/build.gradle +++ b/airbyte-integrations/connectors/source-postgres/build.gradle @@ -12,9 +12,9 @@ java { } airbyteJavaConnector { - cdkVersionRequired = '0.27.6' + cdkVersionRequired = '0.28.20' features = ['db-sources', 'datastore-postgres'] - useLocalCdk = false + useLocalCdk = true } application { diff --git a/airbyte-integrations/connectors/source-postgres/metadata.yaml b/airbyte-integrations/connectors/source-postgres/metadata.yaml index d13eace36e09e..18e2389892075 100644 --- a/airbyte-integrations/connectors/source-postgres/metadata.yaml +++ b/airbyte-integrations/connectors/source-postgres/metadata.yaml @@ -9,7 +9,7 @@ data: connectorSubtype: database connectorType: source definitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 - dockerImageTag: 3.3.20 + dockerImageTag: 3.3.21 dockerRepository: airbyte/source-postgres documentationUrl: https://docs.airbyte.com/integrations/sources/postgres githubIssueLabel: source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 98519ba3f651d..ebc43eed54b6d 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -327,7 +327,7 @@ public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLExcept sourceOperations, streamingQueryConfigProvider); - quoteString = (quoteString == null ? database.getMetaData().getIdentifierQuoteString() : quoteString); + setQuoteString((getQuoteString() == null ? database.getMetaData().getIdentifierQuoteString() : getQuoteString())); database.setSourceConfig(sourceConfig); database.setDatabaseConfig(jdbcConfig); @@ -843,7 +843,7 @@ private List getFullTableEstimate(final JdbcDatabase database, } private boolean cloudDeploymentMode() { - return AdaptiveSourceRunner.CLOUD_MODE.equalsIgnoreCase(featureFlags.deploymentMode()); + return AdaptiveSourceRunner.CLOUD_MODE.equalsIgnoreCase(getFeatureFlags().deploymentMode()); } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java index db8552a90e096..df37dd58f477e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/AbstractSshPostgresSourceAcceptanceTest.java @@ -13,7 +13,7 @@ import io.airbyte.cdk.integrations.base.ssh.SshBastionContainer; import io.airbyte.cdk.integrations.base.ssh.SshTunnel; import io.airbyte.cdk.integrations.standardtest.source.TestDestinationEnv; -import io.airbyte.commons.functional.CheckedFunction; +import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.postgres.PostgresTestDatabase; import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage; @@ -50,7 +50,7 @@ private void populateDatabaseTestData() throws Exception { outerConfig, JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY, - (CheckedFunction, Exception>) mangledConfig -> getDatabaseFromConfig(mangledConfig) + (CheckedConsumer) mangledConfig -> getDatabaseFromConfig(mangledConfig) .query(ctx -> { ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java index c099d9bce9300..4222f6f7e3195 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/CdcWalLogsPostgresSourceDatatypeTest.java @@ -36,7 +36,7 @@ protected List runRead(final ConfiguredAirbyteCatalog configured @Override protected void postSetup() throws Exception { final Database database = setupDatabase(); - for (final TestDataHolder test : testDataHolders) { + for (final TestDataHolder test : getTestDataHolders()) { database.query(ctx -> { ctx.fetch(test.getCreateSqlQuery()); return null; @@ -56,7 +56,7 @@ protected void postSetup() throws Exception { if (stateAfterFirstSync == null) { throw new RuntimeException("stateAfterFirstSync should not be null"); } - for (final TestDataHolder test : testDataHolders) { + for (final TestDataHolder test : getTestDataHolders()) { database.query(ctx -> { test.getInsertSqlQueries().forEach(ctx::fetch); return null; diff --git a/airbyte-integrations/connectors/source-postgres/src/test-performance/java/io/airbyte/integrations/source/postgres/PostgresRdsSourcePerformanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test-performance/java/io/airbyte/integrations/source/postgres/PostgresRdsSourcePerformanceTest.java index e0d551e25c4df..52cbb03e7a44e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-performance/java/io/airbyte/integrations/source/postgres/PostgresRdsSourcePerformanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-performance/java/io/airbyte/integrations/source/postgres/PostgresRdsSourcePerformanceTest.java @@ -34,7 +34,7 @@ protected void setupDatabase(final String dbName) { .put("method", "Standard") .build()); - config = Jsons.jsonNode(ImmutableMap.builder() + setConfig(Jsons.jsonNode(ImmutableMap.builder() .put(JdbcUtils.HOST_KEY, plainConfig.get(JdbcUtils.HOST_KEY)) .put(JdbcUtils.PORT_KEY, plainConfig.get(JdbcUtils.PORT_KEY)) .put(JdbcUtils.DATABASE_KEY, dbName) @@ -43,7 +43,7 @@ protected void setupDatabase(final String dbName) { .put(JdbcUtils.PASSWORD_KEY, plainConfig.get(JdbcUtils.PASSWORD_KEY)) .put(JdbcUtils.SSL_KEY, true) .put("replication_method", replicationMethod) - .build()); + .build())); } /** diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java index ba26f8093a26b..30d11722452db 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/CdcPostgresSourceTest.java @@ -83,7 +83,7 @@ protected void setBaseImage() { } @Override - protected void assertExpectedStateMessageCountMatches(final List stateMessages, long totalCount) { + protected void assertExpectedStateMessageCountMatches(final List stateMessages, long totalCount) { AtomicLong count = new AtomicLong(0L); stateMessages.stream().forEach(stateMessage -> count.addAndGet(stateMessage.getSourceStats().getRecordCount().longValue())); assertEquals(totalCount, count.get()); @@ -201,18 +201,18 @@ void testCheckWithoutReplicationSlot() throws Exception { } @Override - protected void assertExpectedStateMessages(final List stateMessages) { + protected void assertExpectedStateMessages(final List stateMessages) { assertEquals(7, stateMessages.size()); assertStateTypes(stateMessages, 4); } @Override - protected void assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(final List stateAfterFirstBatch) { + protected void assertExpectedStateMessagesForRecordsProducedDuringAndAfterSync(final List stateAfterFirstBatch) { assertEquals(27, stateAfterFirstBatch.size()); assertStateTypes(stateAfterFirstBatch, 24); } - private void assertStateTypes(final List stateMessages, final int indexTillWhichExpectCtidState) { + private void assertStateTypes(final List stateMessages, final int indexTillWhichExpectCtidState) { JsonNode sharedState = null; for (int i = 0; i < stateMessages.size(); i++) { final AirbyteStateMessage stateMessage = stateMessages.get(i); @@ -241,7 +241,7 @@ private void assertStateTypes(final List stateMessages, fin } @Override - protected void assertStateMessagesForNewTableSnapshotTest(final List stateMessages, + protected void assertStateMessagesForNewTableSnapshotTest(final List stateMessages, final AirbyteStateMessage stateMessageEmittedAfterFirstSyncCompletion) { assertEquals(7, stateMessages.size(), stateMessages.toString()); for (int i = 0; i <= 4; i++) { @@ -451,12 +451,12 @@ public void testTwoStreamSync() throws Exception { } @Override - protected void assertExpectedStateMessagesForNoData(final List stateMessages) { + protected void assertExpectedStateMessagesForNoData(final List stateMessages) { assertEquals(2, stateMessages.size()); } @Override - protected void assertExpectedStateMessagesFromIncrementalSync(final List stateMessages) { + protected void assertExpectedStateMessagesFromIncrementalSync(final List stateMessages) { assertEquals(1, stateMessages.size()); assertNotNull(stateMessages.get(0).getData()); } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index 705cf416fdc4d..d257eef9358bb 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -21,7 +21,6 @@ import io.airbyte.cdk.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.cdk.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.util.MoreIterators; import io.airbyte.integrations.source.postgres.PostgresTestDatabase.BaseImage; import io.airbyte.integrations.source.postgres.internal.models.CursorBasedStatus; @@ -37,7 +36,6 @@ import io.airbyte.protocol.models.v0.CatalogHelpers; import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.v0.ConfiguredAirbyteStream; -import io.airbyte.protocol.models.v0.ConnectorSpecification; import io.airbyte.protocol.models.v0.DestinationSyncMode; import io.airbyte.protocol.models.v0.SyncMode; import java.util.ArrayList; @@ -234,14 +232,6 @@ public boolean supportsSchemas() { return true; } - @Test - void testSpec() throws Exception { - final ConnectorSpecification actual = source().spec(); - final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class); - - assertEquals(expected, actual); - } - @Override protected List getTestMessages() { return getTestMessages(streamName()); diff --git a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java index 374ed7dceede1..8f82aa2a14c73 100644 --- a/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java +++ b/airbyte-integrations/connectors/source-postgres/src/testFixtures/java/io/airbyte/integrations/source/postgres/PostgresTestDatabase.java @@ -186,8 +186,8 @@ public PostgresConfigBuilder withCdcReplication(String LsnCommitBehaviour, Strin .with("is_test", true) .with("replication_method", Jsons.jsonNode(ImmutableMap.builder() .put("method", "CDC") - .put("replication_slot", testDatabase.getReplicationSlotName()) - .put("publication", testDatabase.getPublicationName()) + .put("replication_slot", getTestDatabase().getReplicationSlotName()) + .put("publication", getTestDatabase().getPublicationName()) .put("initial_waiting_seconds", DEFAULT_CDC_REPLICATION_INITIAL_WAIT.getSeconds()) .put("lsn_commit_behaviour", LsnCommitBehaviour) .put(INVALID_CDC_CURSOR_POSITION_PROPERTY, cdcCursorFailBehaviour) diff --git a/docs/integrations/sources/postgres.md b/docs/integrations/sources/postgres.md index e0bc5f56b11f1..f0c46d6c1ee28 100644 --- a/docs/integrations/sources/postgres.md +++ b/docs/integrations/sources/postgres.md @@ -292,6 +292,7 @@ According to Postgres [documentation](https://www.postgresql.org/docs/14/datatyp | Version | Date | Pull Request | Subject | |---------|------------|----------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 3.3.21 | 2024-03-25 | [36584](https://github.com/airbytehq/airbyte/pull/36584) | Adopt Kotlin CDK. | | 3.3.20 | 2024-03-25 | [36432](https://github.com/airbytehq/airbyte/pull/36432) | Failure to serialize values from Postgres DB shouldn't fail sync. | | 3.3.19 | 2024-03-12 | [36333](https://github.com/airbytehq/airbyte/pull/36333) | Use newest CDK - deprecate dbz iterator | | 3.3.18 | 2024-03-12 | [35599](https://github.com/airbytehq/airbyte/pull/35599) | Use newest CDK |