Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Source-postgres] : Move to new Kotlin CDK #36584

Merged
merged 22 commits into from
Apr 1, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ import com.fasterxml.jackson.databind.JsonNode
* accidentally closing a shared resource.
*/
open class AbstractDatabase {
@JvmField var sourceConfig: JsonNode? = null
@JvmField var databaseConfig: JsonNode? = null
var sourceConfig: JsonNode? = null
var databaseConfig: JsonNode? = null
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.protocol.models.v0.AirbyteAnalyticsTraceMessage
object DbAnalyticsUtils {
const val CDC_CURSOR_INVALID_KEY: String = "db-sources-cdc-cursor-invalid"

@JvmStatic
fun cdcCursorInvalidMessage(): AirbyteAnalyticsTraceMessage {
return AirbyteAnalyticsTraceMessage().withType(CDC_CURSOR_INVALID_KEY).withValue("1")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ enum class DatabaseDriver(val driverClassName: String, val urlFormatString: Stri
* @param driverClassName The driver class name.
* @return The matching [DatabaseDriver] enumerated value or `null` if no match is found.
*/
@JvmStatic
fun findByDriverClassName(driverClassName: String?): DatabaseDriver {
lateinit var selected: DatabaseDriver

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun putBoolean(
protected open fun putBoolean(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
Expand Down Expand Up @@ -156,7 +156,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun putDouble(
protected open fun putDouble(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
Expand Down Expand Up @@ -188,7 +188,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun putBigDecimal(
protected open fun putBigDecimal(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
Expand All @@ -208,7 +208,12 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun putDate(node: ObjectNode, columnName: String?, resultSet: ResultSet, index: Int) {
protected open fun putDate(
node: ObjectNode,
columnName: String?,
resultSet: ResultSet,
index: Int
) {
node.put(columnName, resultSet.getString(index))
}

Expand Down Expand Up @@ -280,7 +285,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun setTimestamp(
protected open fun setTimestamp(
preparedStatement: PreparedStatement,
parameterIndex: Int,
value: String?
Expand All @@ -293,7 +298,7 @@ abstract class AbstractJdbcCompatibleSourceOperations<Datatype> :
}

@Throws(SQLException::class)
protected fun setDate(
protected open fun setDate(
preparedStatement: PreparedStatement,
parameterIndex: Int,
value: String
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ protected constructor(@JvmField protected val driverClassName: String) : BaseCon
* @param driverClassName name of the JDBC driver
* @return DataSourceBuilder class used to create dynamic fields for DataSource
*/
@JvmStatic
fun getConnectionTimeout(
connectionProperties: Map<String, String>,
driverClassName: String?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object AirbyteTraceMessageUtility {
)
}

@JvmStatic
fun emitEstimateTrace(
byteEstimate: Long,
type: AirbyteEstimateTraceMessage.Type?,
Expand All @@ -55,10 +56,12 @@ object AirbyteTraceMessageUtility {
)
}

@JvmStatic
fun emitAnalyticsTrace(airbyteAnalyticsTraceMessage: AirbyteAnalyticsTraceMessage) {
emitMessage(makeAnalyticsTraceAirbyteMessage(airbyteAnalyticsTraceMessage))
}

@JvmStatic
fun emitErrorTrace(
e: Throwable,
displayMessage: String?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ object SshHelpers {
get() = getSpecAndInjectSsh(Optional.empty())

@Throws(IOException::class)
@JvmStatic
fun getSpecAndInjectSsh(group: Optional<String>): ConnectorSpecification? {
val originalSpec =
Jsons.deserialize(
Expand All @@ -28,6 +29,7 @@ object SshHelpers {

@JvmOverloads
@Throws(IOException::class)
@JvmStatic
fun injectSshIntoSpec(
connectorSpecification: ConnectorSpecification,
group: Optional<String> = Optional.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class SshBastionContainer : AutoCloseable {
private val SSH_USER: String? = "sshuser"
private val SSH_PASSWORD: String? = "secret"

@JvmStatic
/**
* Returns the inner docker network ip address and port of a container. This can be used to
* reach a container from another container running on the same network
Expand All @@ -157,6 +158,7 @@ class SshBastionContainer : AutoCloseable {
)
}

@JvmStatic
/**
* Returns the outer docker network ip address and port of a container. This can be used to
* reach a container from the host machine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,10 +302,7 @@ protected constructor(val container: C) : AutoCloseable {
}

companion object {
private val DEFAULT_CDC_REPLICATION_INITIAL_WAIT: Duration = Duration.ofSeconds(5)
fun getDefaultCdcReplicationInitialWait(): Duration {
return DEFAULT_CDC_REPLICATION_INITIAL_WAIT
}
@JvmField val DEFAULT_CDC_REPLICATION_INITIAL_WAIT: Duration? = Duration.ofSeconds(5)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class PgLsn private constructor(private val lsn: Long) : Comparable<PgLsn> {
return PgLsn(lsn)
}

@JvmStatic
fun fromPgString(lsn: String): PgLsn {
return PgLsn(lsnToLong(lsn))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ class AirbyteDebeziumHandler<T>(
*/
const val QUEUE_CAPACITY: Int = 10000

@JvmStatic
fun isAnyStreamIncrementalSyncMode(catalog: ConfiguredAirbyteCatalog): Boolean {
return catalog.streams
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ interface CdcTargetPosition<T> {
* @return Returns `true` if both offsets are at the same position. Otherwise, it returns
* `false`
*/
fun isSameOffset(offsetA: Map<*, *>, offsetB: Map<String, String>): Boolean {
fun isSameOffset(offsetA: Map<String, String>, offsetB: Map<String, String>): Boolean {
return false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ object DebeziumIteratorConstants {
// TODO: Move these variables to a separate class IteratorConstants, as they will be used in
// state
// iterators for non debezium cases too.
val SYNC_CHECKPOINT_DURATION: Duration = Duration.ofMinutes(15)
@JvmField val SYNC_CHECKPOINT_DURATION: Duration = Duration.ofMinutes(15)
const val SYNC_CHECKPOINT_RECORDS: Int = 10000
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ class AirbyteFileOffsetBackingStore(
return ByteBuffer.wrap(s!!.toByteArray(StandardCharsets.UTF_8))
}

@JvmStatic
fun initializeState(
cdcState: JsonNode?,
dbName: Optional<String>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class DebeziumConverterUtils private constructor() {
private val LOGGER: Logger = LoggerFactory.getLogger(DebeziumConverterUtils::class.java)

/** TODO : Replace usage of this method with [io.airbyte.cdk.db.jdbc.DateTimeConverter] */
@JvmStatic
fun convertDate(input: Any): String {
/**
* While building this custom converter we were not sure what type debezium could return
Expand Down Expand Up @@ -61,6 +62,7 @@ class DebeziumConverterUtils private constructor() {
return input.toString()
}

@JvmStatic
fun convertDefaultValue(field: RelationalColumn): Any? {
if (field.isOptional) {
return null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class DebeziumMessageProducer<T>(
* generating an unneeded usage of networking and processing.
*/
private val initialOffset: HashMap<String, String>
private val previousCheckpointOffset: HashMap<String?, String?>
private val previousCheckpointOffset: HashMap<String, String>
private val offsetManager: AirbyteFileOffsetBackingStore?
private val targetPosition: CdcTargetPosition<T>
private val schemaHistoryManager: Optional<AirbyteSchemaHistoryStorage>
Expand All @@ -55,7 +55,7 @@ class DebeziumMessageProducer<T>(
throw RuntimeException("Offset manager cannot be null")
}
this.schemaHistoryManager = schemaHistoryManager
this.previousCheckpointOffset = offsetManager.read() as HashMap<String?, String?>
this.previousCheckpointOffset = offsetManager.read() as HashMap<String, String>
this.initialOffset = HashMap(this.previousCheckpointOffset)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,13 @@ import org.slf4j.LoggerFactory
@SuppressFBWarnings("NP_PARAMETER_MUST_BE_NONNULL_BUT_MARKED_AS_NULLABLE")
abstract class AbstractJdbcSource<Datatype>(
driverClass: String,
protected val streamingQueryConfigProvider: Supplier<JdbcStreamingQueryConfig>,
@JvmField val streamingQueryConfigProvider: Supplier<JdbcStreamingQueryConfig>,
sourceOperations: JdbcCompatibleSourceOperations<Datatype>
) : AbstractDbSource<Datatype, JdbcDatabase>(driverClass), Source {
protected val sourceOperations: JdbcCompatibleSourceOperations<Datatype>
@JvmField val sourceOperations: JdbcCompatibleSourceOperations<Datatype>

override var quoteString: String? = null
protected var dataSources: MutableCollection<DataSource> = ArrayList()
@JvmField val dataSources: MutableCollection<DataSource> = ArrayList()

init {
this.sourceOperations = sourceOperations
Expand Down Expand Up @@ -304,7 +304,10 @@ abstract class AbstractJdbcSource<Datatype>(

// needs to override isNotInternalSchema for connectors that override
// getPrivilegesTableForCurrentUser()
protected fun isNotInternalSchema(jsonNode: JsonNode, internalSchemas: Set<String?>): Boolean {
protected open fun isNotInternalSchema(
jsonNode: JsonNode,
internalSchemas: Set<String?>
): Boolean {
return !internalSchemas.contains(jsonNode.get(INTERNAL_SCHEMA_NAME).asText())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ object JdbcDataSourceUtils {
* @param defaultParameters connection properties map as specified by each Jdbc source
* @throws IllegalArgumentException
*/
@JvmStatic
fun assertCustomParametersDontOverwriteDefaultParameters(
customParameters: Map<String, String>,
defaultParameters: Map<String, String>
Expand Down Expand Up @@ -57,6 +58,7 @@ object JdbcDataSourceUtils {
* @param config A configuration used to check Jdbc connection
* @return A mapping of the default connection properties
*/
@JvmStatic
fun getDefaultConnectionProperties(config: JsonNode): Map<String, String> {
// NOTE that Postgres returns an empty map for some reason?
return parseJdbcParameters(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class JdbcSSLConnectionUtils {
* @return map containing relevant parsed values including location of keystore or an empty
* map
*/
@JvmStatic
fun parseSSLConfig(config: JsonNode): Map<String, String> {
LOGGER.debug("source config: {}", config)

Expand Down Expand Up @@ -143,6 +144,7 @@ class JdbcSSLConnectionUtils {
return additionalParameters
}

@JvmStatic
fun prepareCACertificateKeyStore(config: JsonNode): Pair<URI, String>? {
// if config available
// if has CA cert - make keystore
Expand Down Expand Up @@ -266,6 +268,7 @@ class JdbcSSLConnectionUtils {
return clientCertKeyStorePair
}

@JvmStatic
fun fileFromCertPem(certPem: String?): Path {
try {
val path = Files.createTempFile(null, ".crt")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class JdbcPrivilegeDto(
}

companion object {
@JvmStatic
fun builder(): JdbcPrivilegeDtoBuilder {
return JdbcPrivilegeDtoBuilder()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ abstract class AbstractDbSource<DataType, Database : AbstractDatabase?>
protected constructor(driverClassName: String) :
JdbcConnector(driverClassName), Source, AutoCloseable {
// TODO: Remove when the flag is not use anymore
protected var featureFlags: FeatureFlags = EnvVariableFeatureFlags()
var featureFlags: FeatureFlags = EnvVariableFeatureFlags()

@Trace(operationName = CHECK_TRACE_OPERATION_NAME)
@Throws(Exception::class)
Expand Down Expand Up @@ -270,7 +270,7 @@ protected constructor(driverClassName: String) :
* @throws SQLException exception
*/
@Throws(SQLException::class)
protected fun verifyCursorColumnValues(
protected open fun verifyCursorColumnValues(
database: Database,
schema: String?,
tableName: String?,
Expand All @@ -286,7 +286,7 @@ protected constructor(driverClassName: String) :
*
* @param database database
*/
protected fun estimateFullRefreshSyncSize(
protected open fun estimateFullRefreshSyncSize(
database: Database,
configuredAirbyteStream: ConfiguredAirbyteStream?
) {
Expand Down Expand Up @@ -327,7 +327,7 @@ protected constructor(driverClassName: String) :
)
}

protected fun getIncrementalIterators(
protected open fun getIncrementalIterators(
database: Database,
catalog: ConfiguredAirbyteCatalog,
tableNameToTable: Map<String?, TableInfo<CommonField<DataType>>>,
Expand Down Expand Up @@ -601,7 +601,7 @@ protected constructor(driverClassName: String) :
* Oracle DB - the schema is the user, you cannot REVOKE a privilege on a table from its owner).
*/
@Throws(SQLException::class)
protected fun <T> getPrivilegesTableForCurrentUser(
protected open fun <T> getPrivilegesTableForCurrentUser(
database: JdbcDatabase?,
schema: String?
): Set<T> {
Expand Down Expand Up @@ -670,7 +670,7 @@ protected constructor(driverClassName: String) :
*/
get

protected val excludedViews: Set<String>
protected open val excludedViews: Set<String>
/**
* Get list of system views in order to exclude them from the `discover` result list.
*
Expand Down Expand Up @@ -765,7 +765,7 @@ protected constructor(driverClassName: String) :
cursorFieldType: DataType
): AutoCloseableIterator<AirbyteRecordData>

protected val stateEmissionFrequency: Int
protected open val stateEmissionFrequency: Int
/**
* When larger than 0, the incremental iterator will emit intermediate state for every N
* records. Please note that if intermediate state emission is enabled, the incremental
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ object DbSourceDiscoverUtil {
return AirbyteCatalog().withStreams(streams)
}

@JvmStatic
fun getFullyQualifiedTableName(nameSpace: String?, tableName: String): String {
return if (nameSpace != null) "$nameSpace.$tableName" else tableName
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object RelationalDbQueryUtils {
}
}

@JvmStatic
fun enquoteIdentifierList(identifiers: List<String>, quoteString: String): String {
val joiner = StringJoiner(",")
for (identifier in identifiers) {
Expand All @@ -37,6 +38,7 @@ object RelationalDbQueryUtils {
}

/** @return fully qualified table name with the schema (if a schema exists) in quotes. */
@JvmStatic
fun getFullyQualifiedTableNameWithQuoting(
nameSpace: String?,
tableName: String,
Expand Down
Loading
Loading