diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 7eefa8398a060..3996c7fe6fc22 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -599,7 +599,7 @@ - name: Microsoft SQL Server (MSSQL) sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 dockerRepository: airbyte/source-mssql - dockerImageTag: 0.4.17 + dockerImageTag: 0.4.18 documentationUrl: https://docs.airbyte.io/integrations/sources/mssql icon: mssql.svg sourceType: database diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 43387628eb054..1a558151a514e 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -5273,7 +5273,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-mssql:0.4.17" +- dockerImage: "airbyte/source-mssql:0.4.18" spec: documentationUrl: "https://docs.airbyte.io/integrations/destinations/mssql" connectionSpecification: @@ -5374,7 +5374,7 @@ description: "Specifies the host name of the server. The value of\ \ this property must match the subject property of the certificate." order: 7 - replication: + replication_method: type: "object" title: "Replication Method" description: "The replication method used for extracting data from the database.\ @@ -5389,9 +5389,9 @@ description: "Standard replication requires no setup on the DB side but\ \ will not be able to represent deletions incrementally." required: - - "replication_type" + - "method" properties: - replication_type: + method: type: "string" const: "STANDARD" enum: @@ -5402,9 +5402,9 @@ description: "CDC uses {TBC} to detect inserts, updates, and deletes.\ \ This needs to be configured on the source database itself." required: - - "replication_type" + - "method" properties: - replication_type: + method: type: "string" const: "CDC" enum: diff --git a/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile index f7c29543a3865..8c1bf75d5c428 100644 --- a/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql-strict-encrypt/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql-strict-encrypt COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.17 +LABEL io.airbyte.version=0.4.18 LABEL io.airbyte.name=airbyte/source-mssql-strict-encrypt diff --git a/airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/resources/expected_spec.json index af6cc24193fd1..641e888a8b4bd 100644 --- a/airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/resources/expected_spec.json +++ b/airbyte-integrations/connectors/source-mssql-strict-encrypt/src/test/resources/expected_spec.json @@ -87,7 +87,7 @@ } ] }, - "replication": { + "replication_method": { "type": "object", "title": "Replication Method", "description": "The replication method used for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", @@ -97,9 +97,9 @@ { "title": "Standard", "description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.", - "required": ["replication_type"], + "required": ["method"], "properties": { - "replication_type": { + "method": { "type": "string", "const": "STANDARD", "enum": ["STANDARD"], @@ -111,9 +111,9 @@ { "title": "Logical Replication (CDC)", "description": "CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", - "required": ["replication_type"], + "required": ["method"], "properties": { - "replication_type": { + "method": { "type": "string", "const": "CDC", "enum": ["CDC"], diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index 3522a1b3604f1..0ee5ccbc01a89 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -16,5 +16,5 @@ ENV APPLICATION source-mssql COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=0.4.17 +LABEL io.airbyte.version=0.4.18 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java index 51672a826ab82..63c814edef630 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcHelper.java @@ -17,6 +17,7 @@ public class MssqlCdcHelper { // it is an oneOf object private static final String REPLICATION_FIELD = "replication"; private static final String REPLICATION_TYPE_FIELD = "replication_type"; + private static final String METHOD_FIELD = "method"; private static final String CDC_SNAPSHOT_ISOLATION_FIELD = "snapshot_isolation"; private static final String CDC_DATA_TO_SYNC_FIELD = "data_to_sync"; @@ -91,14 +92,19 @@ public static DataToSync from(final String value) { @VisibleForTesting static boolean isCdc(final JsonNode config) { // new replication method config since version 0.4.0 - if (config.hasNonNull(REPLICATION_FIELD)) { - final JsonNode replicationConfig = config.get(REPLICATION_FIELD); - return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC; + if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) { + final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD); + return ReplicationMethod.valueOf(replicationConfig.get(METHOD_FIELD).asText()) == ReplicationMethod.CDC; } // legacy replication method config before version 0.4.0 - if (config.hasNonNull(LEGACY_REPLICATION_FIELD)) { + if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isTextual()) { return ReplicationMethod.valueOf(config.get(LEGACY_REPLICATION_FIELD).asText()) == ReplicationMethod.CDC; } + if (config.hasNonNull(REPLICATION_FIELD)) { + final JsonNode replicationConfig = config.get(REPLICATION_FIELD); + return ReplicationMethod.valueOf(replicationConfig.get(REPLICATION_TYPE_FIELD).asText()) == ReplicationMethod.CDC; + } + return false; } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json b/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json index aba59c81aa7f2..f599164e9579e 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/source-mssql/src/main/resources/spec.json @@ -100,7 +100,7 @@ } ] }, - "replication": { + "replication_method": { "type": "object", "title": "Replication Method", "description": "The replication method used for extracting data from the database. STANDARD replication requires no setup on the DB side but will not be able to represent deletions incrementally. CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", @@ -110,9 +110,9 @@ { "title": "Standard", "description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.", - "required": ["replication_type"], + "required": ["method"], "properties": { - "replication_type": { + "method": { "type": "string", "const": "STANDARD", "enum": ["STANDARD"], @@ -124,9 +124,9 @@ { "title": "Logical Replication (CDC)", "description": "CDC uses {TBC} to detect inserts, updates, and deletes. This needs to be configured on the source database itself.", - "required": ["replication_type"], + "required": ["method"], "properties": { - "replication_type": { + "method": { "type": "string", "const": "CDC", "enum": ["CDC"], diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java index 9961d5753309b..c2dd276fc26fc 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceAcceptanceTest.java @@ -97,7 +97,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Int container.start(); final JsonNode replicationConfig = Jsons.jsonNode(Map.of( - "replication_type", "CDC", + "method", "CDC", "data_to_sync", "Existing and New", "snapshot_isolation", "Snapshot")); @@ -107,7 +107,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Int .put(JdbcUtils.DATABASE_KEY, DB_NAME) .put(JdbcUtils.USERNAME_KEY, TEST_USER_NAME) .put(JdbcUtils.PASSWORD_KEY, TEST_USER_PASSWORD) - .put("replication", replicationConfig) + .put("replication_method", replicationConfig) .build()); dslContext = DSLContextFactory.create( diff --git a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceDatatypeTest.java b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceDatatypeTest.java index f232d81b503e5..0a202c36910d9 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-integration/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceDatatypeTest.java @@ -32,7 +32,7 @@ protected Database setupDatabase() throws Exception { container.start(); final JsonNode replicationConfig = Jsons.jsonNode(Map.of( - "replication_type", "CDC", + "method", "CDC", "data_to_sync", "Existing and New", "snapshot_isolation", "Snapshot")); @@ -42,7 +42,7 @@ protected Database setupDatabase() throws Exception { .put(JdbcUtils.DATABASE_KEY, DB_NAME) .put(JdbcUtils.USERNAME_KEY, container.getUsername()) .put(JdbcUtils.PASSWORD_KEY, container.getPassword()) - .put("replication", replicationConfig) + .put("replication_method", replicationConfig) .build()); dslContext = DSLContextFactory.create( diff --git a/airbyte-integrations/connectors/source-mssql/src/test-performance/java/io/airbyte/integrations/source/mssql/FillMsSqlTestDbScriptTest.java b/airbyte-integrations/connectors/source-mssql/src/test-performance/java/io/airbyte/integrations/source/mssql/FillMsSqlTestDbScriptTest.java index 162a6db04615d..b22020307068e 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test-performance/java/io/airbyte/integrations/source/mssql/FillMsSqlTestDbScriptTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test-performance/java/io/airbyte/integrations/source/mssql/FillMsSqlTestDbScriptTest.java @@ -40,7 +40,7 @@ protected String getImageName() { @Override protected Database setupDatabase(final String dbName) { final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() - .put("replication_type", "Standard") + .put("method", "Standard") .build()); config = Jsons.jsonNode(ImmutableMap.builder() @@ -49,7 +49,7 @@ protected Database setupDatabase(final String dbName) { .put(JdbcUtils.DATABASE_KEY, dbName) // set your db name .put(JdbcUtils.USERNAME_KEY, "your_username") .put(JdbcUtils.PASSWORD_KEY, "your_pass") - .put("replication", replicationMethod) + .put("replication_method", replicationMethod) .build()); dslContext = DSLContextFactory.create( diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java index b04e49bd536ab..5badf3afcbe04 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -86,7 +86,7 @@ private void init() { source = new MssqlSource(); final JsonNode replicationConfig = Jsons.jsonNode(Map.of( - "replication_type", "CDC", + "method", "CDC", "data_to_sync", "Existing and New", "snapshot_isolation", "Snapshot")); config = Jsons.jsonNode(ImmutableMap.builder() @@ -96,7 +96,7 @@ private void init() { .put(JdbcUtils.SCHEMAS_KEY, List.of(MODELS_SCHEMA, MODELS_SCHEMA + "_random")) .put(JdbcUtils.USERNAME_KEY, TEST_USER_NAME) .put(JdbcUtils.PASSWORD_KEY, TEST_USER_PASSWORD) - .put("replication", replicationConfig) + .put("replication_method", replicationConfig) .build()); dataSource = DataSourceFactory.create( @@ -279,7 +279,7 @@ void testAssertSnapshotIsolationAllowed() { @Test void testAssertSnapshotIsolationDisabled() { final JsonNode replicationConfig = Jsons.jsonNode(ImmutableMap.builder() - .put("replication_type", "CDC") + .put("method", "CDC") .put("data_to_sync", "New Changes Only") // set snapshot_isolation level to "Read Committed" to disable snapshot .put("snapshot_isolation", "Read Committed") diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlCdcHelperTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlCdcHelperTest.java index 79193cc8169c0..eec4115076a22 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlCdcHelperTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlCdcHelperTest.java @@ -27,27 +27,30 @@ public void testIsCdc() { assertTrue(MssqlCdcHelper.isCdc(LEGACY_CDC_CONFIG)); // new replication method config since version 0.4.0 - final JsonNode newNonCdc = Jsons.jsonNode(Map.of("replication", - Jsons.jsonNode(Map.of("replication_type", "STANDARD")))); + final JsonNode newNonCdc = Jsons.jsonNode(Map.of("replication_method", + Jsons.jsonNode(Map.of("method", "STANDARD")))); assertFalse(MssqlCdcHelper.isCdc(newNonCdc)); - final JsonNode newCdc = Jsons.jsonNode(Map.of("replication", + final JsonNode newCdc = Jsons.jsonNode(Map.of("replication_method", Jsons.jsonNode(Map.of( - "replication_type", "CDC", + "method", "CDC", "data_to_sync", "Existing and New", "snapshot_isolation", "Snapshot")))); assertTrue(MssqlCdcHelper.isCdc(newCdc)); // migration from legacy to new config final JsonNode mixNonCdc = Jsons.jsonNode(Map.of( - "replication_method", "CDC", - "replication", Jsons.jsonNode(Map.of("replication_type", "STANDARD")))); + "replication_method", Jsons.jsonNode(Map.of("method", "STANDARD")), + "replication", Jsons.jsonNode(Map.of("replication_type", "CDC")))); assertFalse(MssqlCdcHelper.isCdc(mixNonCdc)); final JsonNode mixCdc = Jsons.jsonNode(Map.of( - "replication_method", "Standard", "replication", Jsons.jsonNode(Map.of( - "replication_type", "CDC", + "replication_type", "Standard", + "data_to_sync", "Existing and New", + "snapshot_isolation", "Snapshot")), + "replication_method", Jsons.jsonNode(Map.of( + "method", "CDC", "data_to_sync", "Existing and New", "snapshot_isolation", "Snapshot")))); assertTrue(MssqlCdcHelper.isCdc(mixCdc)); diff --git a/docs/integrations/sources/mssql.md b/docs/integrations/sources/mssql.md index 65052d2c57d2d..a1139f48d9a57 100644 --- a/docs/integrations/sources/mssql.md +++ b/docs/integrations/sources/mssql.md @@ -306,6 +306,7 @@ If you do not see a type in this list, assume that it is coerced into a string. | Version | Date | Pull Request | Subject | |:--------|:-----------| :----------------------------------------------------- |:-------------------------------------------------------------------------------------------------------| +| 0.4.18 | 2022-09-03 | [14910](https://github.com/airbytehq/airbyte/pull/14910) | Standardize spec for CDC replication. Replace the `replication_method` enum with a config object with a `method` enum field. | | 0.4.17 | 2022-09-01 | [16261](https://github.com/airbytehq/airbyte/pull/16261) | Emit state messages more frequently | | 0.4.16 | 2022-08-18 | [14356](https://github.com/airbytehq/airbyte/pull/14356) | DB Sources: only show a table can sync incrementally if at least one column can be used as a cursor field | | 0.4.15 | 2022-08-11 | [15538](https://github.com/airbytehq/airbyte/pull/15538) | Allow additional properties in db stream state |