Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mssql-source:upgrade debezium version to 1.9.6 #18732

Merged
merged 7 commits into from
Nov 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -765,7 +765,7 @@
- name: Microsoft SQL Server (MSSQL)
sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.4.24
dockerImageTag: 0.4.25
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
icon: mssql.svg
sourceType: database
Expand Down
14 changes: 13 additions & 1 deletion airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6897,7 +6897,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mssql:0.4.24"
- dockerImage: "airbyte/source-mssql:0.4.25"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/mssql"
connectionSpecification:
Expand Down Expand Up @@ -7071,6 +7071,18 @@
\ the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\"\
>snapshot isolation mode</a> on the database."
order: 2
initial_waiting_seconds:
type: "integer"
title: "Initial Waiting Time in Seconds (Advanced)"
description: "The amount of time the connector will wait when it launches\
\ to determine if there is new data to sync or not. Defaults to\
\ 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about\
\ <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\"\
>initial waiting time</a>."
default: 300
min: 120
max: 1200
order: 3
tunnel_method:
type: "object"
title: "SSH Tunnel Method"
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/bases/debezium-v1-9-6/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies {

implementation 'io.debezium:debezium-api:1.9.6.Final'
implementation 'io.debezium:debezium-embedded:1.9.6.Final'
// implementation 'io.debezium:debezium-connector-sqlserver:1.9.2.Final'
implementation 'io.debezium:debezium-connector-sqlserver:1.9.6.Final'
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just wanted to make sure that what was forgotten in Postgres (TINYINT(1)?) is taken care of here.

implementation 'io.debezium:debezium-connector-mysql:1.9.6.Final'
implementation 'io.debezium:debezium-connector-postgres:1.9.6.Final'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FirstRecordWaitTimeUtil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a brief class comment here?


private static final Logger LOGGER = LoggerFactory.getLogger(FirstRecordWaitTimeUtil.class);

public static final Duration MIN_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(2);
public static final Duration MAX_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(20);
public static final Duration DEFAULT_FIRST_RECORD_WAIT_TIME = Duration.ofMinutes(5);

public static void checkFirstRecordWaitTime(final JsonNode config) {
// we need to skip the check because in tests, we set initial_waiting_seconds
// to 5 seconds for performance reasons, which is shorter than the minimum
// value allowed in production
if (config.has("is_test") && config.get("is_test").asBoolean()) {
return;
}

final Optional<Integer> firstRecordWaitSeconds = getFirstRecordWaitSeconds(config);
if (firstRecordWaitSeconds.isPresent()) {
final int seconds = firstRecordWaitSeconds.get();
if (seconds < MIN_FIRST_RECORD_WAIT_TIME.getSeconds() || seconds > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) {
throw new IllegalArgumentException(
String.format("initial_waiting_seconds must be between %d and %d seconds",
MIN_FIRST_RECORD_WAIT_TIME.getSeconds(), MAX_FIRST_RECORD_WAIT_TIME.getSeconds()));
}
}
}

public static Duration getFirstRecordWaitTime(final JsonNode config) {
final boolean isTest = config.has("is_test") && config.get("is_test").asBoolean();
Duration firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME;

final Optional<Integer> firstRecordWaitSeconds = getFirstRecordWaitSeconds(config);
if (firstRecordWaitSeconds.isPresent()) {
firstRecordWaitTime = Duration.ofSeconds(firstRecordWaitSeconds.get());
if (!isTest && firstRecordWaitTime.compareTo(MIN_FIRST_RECORD_WAIT_TIME) < 0) {
LOGGER.warn("First record waiting time is overridden to {} minutes, which is the min time allowed for safety.",
MIN_FIRST_RECORD_WAIT_TIME.toMinutes());
firstRecordWaitTime = MIN_FIRST_RECORD_WAIT_TIME;
} else if (!isTest && firstRecordWaitTime.compareTo(MAX_FIRST_RECORD_WAIT_TIME) > 0) {
LOGGER.warn("First record waiting time is overridden to {} minutes, which is the max time allowed for safety.",
MAX_FIRST_RECORD_WAIT_TIME.toMinutes());
firstRecordWaitTime = MAX_FIRST_RECORD_WAIT_TIME;
}
}

LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
return firstRecordWaitTime;
}

public static Optional<Integer> getFirstRecordWaitSeconds(final JsonNode config) {
final JsonNode replicationMethod = config.get("replication_method");
if (replicationMethod != null && replicationMethod.has("initial_waiting_seconds")) {
final int seconds = config.get("replication_method").get("initial_waiting_seconds").asInt();
return Optional.of(seconds);
}
return Optional.empty();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,21 @@

package io.airbyte.integrations.debezium.internals;

import com.microsoft.sqlserver.jdbc.Geography;
import com.microsoft.sqlserver.jdbc.Geometry;
import com.microsoft.sqlserver.jdbc.SQLServerException;
import io.airbyte.db.DataTypeUtils;
import io.debezium.spi.converter.CustomConverter;
import io.debezium.spi.converter.RelationalColumn;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import microsoft.sql.DateTimeOffset;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -20,9 +27,14 @@ public class MSSQLConverter implements CustomConverter<SchemaBuilder, Relational

private final Logger LOGGER = LoggerFactory.getLogger(MSSQLConverter.class);

private final Set<String> DATE_TYPES = Set.of("DATE", "DATETIME", "DATETIME2", "DATETIMEOFFSET", "SMALLDATETIME");
private final String TIME_TYPE = "TIME";
private final String SMALLMONEY_TYPE = "SMALLMONEY";
private final Set<String> DATE_TYPES = Set.of("DATE", "DATETIME", "DATETIME2", "SMALLDATETIME");
private final Set<String> BINARY = Set.of("VARBINARY", "BINARY");
private static final String DATETIMEOFFSET = "DATETIMEOFFSET";
private static final String TIME_TYPE = "TIME";
private static final String SMALLMONEY_TYPE = "SMALLMONEY";
private static final String GEOMETRY = "GEOMETRY";
private static final String GEOGRAPHY = "GEOGRAPHY";
private static final String DEBEZIUM_DATETIMEOFFSET_FORMAT = "yyyy-MM-dd HH:mm:ss XXX";

@Override
public void configure(Properties props) {}
Expand All @@ -34,11 +46,61 @@ public void converterFor(final RelationalColumn field,
registerDate(field, registration);
} else if (SMALLMONEY_TYPE.equalsIgnoreCase(field.typeName())) {
registerMoney(field, registration);
} else if (BINARY.contains(field.typeName().toUpperCase())) {
registerBinary(field, registration);
} else if (GEOMETRY.equalsIgnoreCase(field.typeName())) {
registerGeometry(field, registration);
} else if (GEOGRAPHY.equalsIgnoreCase(field.typeName())) {
registerGeography(field, registration);
} else if (TIME_TYPE.equalsIgnoreCase(field.typeName())) {
registerTime(field, registration);
} else if (DATETIMEOFFSET.equalsIgnoreCase(field.typeName())) {
registerDateTimeOffSet(field, registration);
}
}

private void registerGeometry(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}

if (input instanceof byte[]) {
try {
return Geometry.deserialize((byte[]) input).toString();
} catch (SQLServerException e) {
LOGGER.error(e.getMessage());
}
}

LOGGER.warn("Uncovered Geometry class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}

private void registerGeography(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}

if (input instanceof byte[]) {
try {
return Geography.deserialize((byte[]) input).toString();
} catch (SQLServerException e) {
LOGGER.error(e.getMessage());
}
}

LOGGER.warn("Uncovered Geography class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}

private void registerDate(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
Expand All @@ -50,6 +112,25 @@ private void registerDate(final RelationalColumn field,
});
}

private void registerDateTimeOffSet(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}

if (input instanceof DateTimeOffset) {
return DataTypeUtils.toISO8601String(
OffsetDateTime.parse(input.toString(),
DateTimeFormatter.ofPattern(DEBEZIUM_DATETIMEOFFSET_FORMAT)));
}

LOGGER.warn("Uncovered DateTimeOffSet class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}

private void registerTime(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
Expand Down Expand Up @@ -84,4 +165,21 @@ private void registerMoney(final RelationalColumn field,
});
}

private void registerBinary(final RelationalColumn field,
final ConverterRegistration<SchemaBuilder> registration) {
registration.register(SchemaBuilder.string(), input -> {
if (Objects.isNull(input)) {
return DebeziumConverterUtils.convertDefaultValue(field);
}

if (input instanceof byte[]) {
return new String((byte[]) input, Charset.defaultCharset());
}

LOGGER.warn("Uncovered binary class type '{}'. Use default converter",
input.getClass().getName());
return input.toString();
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import static io.airbyte.integrations.debezium.internals.FirstRecordWaitTimeUtil.MAX_FIRST_RECORD_WAIT_TIME;
import static io.airbyte.integrations.debezium.internals.FirstRecordWaitTimeUtil.MIN_FIRST_RECORD_WAIT_TIME;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;

public class FirstRecordWaitTimeUtilTest {

@Test
void testGetFirstRecordWaitTime() {
final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap());
assertDoesNotThrow(() -> FirstRecordWaitTimeUtil.checkFirstRecordWaitTime(emptyConfig));
assertEquals(Optional.empty(), FirstRecordWaitTimeUtil.getFirstRecordWaitSeconds(emptyConfig));
assertEquals(FirstRecordWaitTimeUtil.DEFAULT_FIRST_RECORD_WAIT_TIME, FirstRecordWaitTimeUtil.getFirstRecordWaitTime(emptyConfig));

final JsonNode normalConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "initial_waiting_seconds", 500)));
assertDoesNotThrow(() -> FirstRecordWaitTimeUtil.checkFirstRecordWaitTime(normalConfig));
assertEquals(Optional.of(500), FirstRecordWaitTimeUtil.getFirstRecordWaitSeconds(normalConfig));
assertEquals(Duration.ofSeconds(500), FirstRecordWaitTimeUtil.getFirstRecordWaitTime(normalConfig));

final int tooShortTimeout = (int) MIN_FIRST_RECORD_WAIT_TIME.getSeconds() - 1;
final JsonNode tooShortConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "initial_waiting_seconds", tooShortTimeout)));
assertThrows(IllegalArgumentException.class, () -> FirstRecordWaitTimeUtil.checkFirstRecordWaitTime(tooShortConfig));
assertEquals(Optional.of(tooShortTimeout), FirstRecordWaitTimeUtil.getFirstRecordWaitSeconds(tooShortConfig));
assertEquals(MIN_FIRST_RECORD_WAIT_TIME, FirstRecordWaitTimeUtil.getFirstRecordWaitTime(tooShortConfig));

final int tooLongTimeout = (int) MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 1;
final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "initial_waiting_seconds", tooLongTimeout)));
assertThrows(IllegalArgumentException.class, () -> FirstRecordWaitTimeUtil.checkFirstRecordWaitTime(tooLongConfig));
assertEquals(Optional.of(tooLongTimeout), FirstRecordWaitTimeUtil.getFirstRecordWaitSeconds(tooLongConfig));
assertEquals(MAX_FIRST_RECORD_WAIT_TIME, FirstRecordWaitTimeUtil.getFirstRecordWaitTime(tooLongConfig));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public abstract class CdcSourceTest {
protected static final String COL_ID = "id";
protected static final String COL_MAKE_ID = "make_id";
protected static final String COL_MODEL = "model";
protected static final int INITIAL_WAITING_SECONDS = 5;

protected final List<JsonNode> MODEL_RECORDS_RANDOM = ImmutableList.of(
Jsons
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.24
LABEL io.airbyte.version=0.4.25
LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@
"enum": ["Snapshot", "Read Committed"],
"description": "Existing data in the database are synced through an initial snapshot. This parameter controls the isolation level that will be used during the initial snapshotting. If you choose the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\">snapshot isolation mode</a> on the database.",
"order": 2
},
"initial_waiting_seconds": {
"type": "integer",
"title": "Initial Waiting Time in Seconds (Advanced)",
"description": "The amount of time the connector will wait when it launches to determine if there is new data to sync or not. Defaults to 300 seconds. Valid range: 120 seconds to 1200 seconds. Read about <a href=\"https://docs.airbyte.com/integrations/sources/mysql/#change-data-capture-cdc\">initial waiting time</a>.",
"default": 300,
"min": 120,
"max": 1200,
"order": 3
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.24
LABEL io.airbyte.version=0.4.25
LABEL io.airbyte.name=airbyte/source-mssql
7 changes: 4 additions & 3 deletions airbyte-integrations/connectors/source-mssql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@ dependencies {

implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-integrations:bases:debezium-v1-4-2')
implementation project(':airbyte-integrations:bases:debezium-v1-9-6')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'io.debezium:debezium-connector-sqlserver:1.4.2.Final'
implementation 'io.debezium:debezium-connector-sqlserver:1.9.6.Final'
implementation 'com.microsoft.sqlserver:mssql-jdbc:8.4.1.jre14'
implementation 'org.codehaus.plexus:plexus-utils:3.4.2'

testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-4-2'))
testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6'))
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))

testImplementation 'org.apache.commons:commons-lang3:3.11'
Expand Down
Loading