Skip to content

Commit e8e85eb

Browse files
authored
Destination Mysql: DV2 (#36936)
1 parent 4894ad2 commit e8e85eb

File tree

66 files changed

+1597
-131
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1597
-131
lines changed

airbyte-integrations/connectors/destination-mysql-strict-encrypt/build.gradle

+1-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.30.2'
7+
cdkVersionRequired = '0.33.0'
88
features = ['db-destinations', 'typing-deduping']
99
useLocalCdk = false
1010
}
@@ -31,9 +31,3 @@ dependencies {
3131
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-mysql')
3232
integrationTestJavaImplementation libs.testcontainers.mysql
3333
}
34-
35-
configurations.all {
36-
resolutionStrategy {
37-
force libs.jooq
38-
}
39-
}

airbyte-integrations/connectors/destination-mysql-strict-encrypt/metadata.yaml

+20-5
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,34 @@ data:
77
connectorSubtype: database
88
connectorType: destination
99
definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
10-
dockerImageTag: 0.3.1
10+
dockerImageTag: 1.0.0
1111
dockerRepository: airbyte/destination-mysql-strict-encrypt
1212
githubIssueLabel: destination-mysql
1313
icon: mysql.svg
1414
license: ELv2
1515
name: MySQL
16-
normalizationConfig:
17-
normalizationIntegrationType: mysql
18-
normalizationRepository: airbyte/normalization-mysql
19-
normalizationTag: 0.4.1
2016
releaseStage: alpha
2117
documentationUrl: https://docs.airbyte.com/integrations/destinations/mysql
2218
supportsDbt: true
2319
tags:
2420
- language:java
21+
releases:
22+
breakingChanges:
23+
1.0.0:
24+
message:
25+
"**Do not upgrade until you have run a test upgrade as outlined [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#testing-destinations-v2-for-a-single-connection)**.
26+
27+
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2),
28+
which provides better error handling, incremental delivery of data for large
29+
syncs, and improved final table structures. To review the breaking changes,
30+
and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
31+
These changes will likely require updates to downstream dbt / SQL models,
32+
which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
33+
34+
Selecting `Upgrade` will upgrade **all** connections using this destination
35+
at their next sync. You can manually sync existing connections prior to
36+
the next scheduled sync to start the upgrade early.
37+
38+
"
39+
upgradeDeadline: "2024-05-15"
2540
metadataSpecVersion: "1.0"

airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java

+15-16
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.junit.jupiter.api.Test;
4040
import org.testcontainers.containers.MySQLContainer;
4141

42+
@Disabled
4243
public class MySQLStrictEncryptDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {
4344

4445
private MySQLContainer<?> db;
@@ -113,23 +114,22 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
113114
}
114115

115116
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
116-
try (final DSLContext dslContext = DSLContextFactory.create(
117+
final DSLContext dslContext = DSLContextFactory.create(
117118
db.getUsername(),
118119
db.getPassword(),
119120
db.getDriverClassName(),
120121
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
121122
db.getHost(),
122123
db.getFirstMappedPort(),
123124
db.getDatabaseName()),
124-
SQLDialect.MYSQL)) {
125-
return new Database(dslContext).query(
126-
ctx -> ctx
127-
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
128-
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
129-
.stream()
130-
.map(this::getJsonFromRecord)
131-
.collect(Collectors.toList()));
132-
}
125+
SQLDialect.MYSQL);
126+
return new Database(dslContext).query(
127+
ctx -> ctx
128+
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
129+
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
130+
.stream()
131+
.map(this::getJsonFromRecord)
132+
.collect(Collectors.toList()));
133133
}
134134

135135
@Override
@@ -162,19 +162,18 @@ private void grantCorrectPermissions() {
162162
}
163163

164164
private void executeQuery(final String query) {
165-
try (final DSLContext dslContext = DSLContextFactory.create(
165+
final DSLContext dslContext = DSLContextFactory.create(
166166
"root",
167167
"test",
168168
db.getDriverClassName(),
169169
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
170170
db.getHost(),
171171
db.getFirstMappedPort(),
172172
db.getDatabaseName()),
173-
SQLDialect.MYSQL)) {
174-
new Database(dslContext).query(
175-
ctx -> ctx
176-
.execute(query));
177-
} catch (final SQLException e) {
173+
SQLDialect.MYSQL);
174+
try {
175+
new Database(dslContext).query(ctx -> ctx.execute(query));
176+
} catch (SQLException e) {
178177
throw new RuntimeException(e);
179178
}
180179
}

airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test/resources/expected_spec.json

+15-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
{
22
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/mysql",
33
"supportsIncremental": true,
4-
"supportsNormalization": true,
4+
"supportsNormalization": false,
55
"supportsDBT": true,
6-
"supported_destination_sync_modes": ["overwrite", "append"],
6+
"supported_destination_sync_modes": ["overwrite", "append", "append_dedup"],
77
"connectionSpecification": {
88
"$schema": "http://json-schema.org/draft-07/schema#",
99
"title": "MySQL Destination Spec",
@@ -165,6 +165,19 @@
165165
}
166166
}
167167
]
168+
},
169+
"raw_data_schema": {
170+
"type": "string",
171+
"description": "The database to write raw tables into",
172+
"title": "Raw table database (defaults to airbyte_internal)",
173+
"order": 7
174+
},
175+
"disable_type_dedupe": {
176+
"type": "boolean",
177+
"default": false,
178+
"description": "Disable Writing Final Tables. WARNING! The data format in _airbyte_data is likely stable but there are no guarantees that other metadata columns will remain the same in future versions",
179+
"title": "Disable Final Tables. (WARNING! Unstable option; Columns in raw table schema might change between versions)",
180+
"order": 8
168181
}
169182
}
170183
}

airbyte-integrations/connectors/destination-mysql/build.gradle

+2-7
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ plugins {
44
}
55

66
airbyteJavaConnector {
7-
cdkVersionRequired = '0.30.2'
7+
cdkVersionRequired = '0.33.0'
88
features = ['db-destinations', 'typing-deduping']
99
useLocalCdk = false
1010
}
@@ -26,10 +26,5 @@ application {
2626
dependencies {
2727
implementation 'mysql:mysql-connector-java:8.0.22'
2828
integrationTestJavaImplementation libs.testcontainers.mysql
29-
}
30-
31-
configurations.all {
32-
resolutionStrategy {
33-
force libs.jooq
34-
}
29+
testFixturesApi libs.testcontainers.mysql
3530
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# our testcontainer has issues with too much concurrency.
2+
# 4 threads seems to be the sweet spot.
3+
testExecutionConcurrency=4
4+
JunitMethodExecutionTimeout=15 m

airbyte-integrations/connectors/destination-mysql/metadata.yaml

+17-7
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,17 @@ data:
22
connectorSubtype: database
33
connectorType: destination
44
definitionId: ca81ee7c-3163-4246-af40-094cc31e5e42
5-
dockerImageTag: 0.3.1
5+
dockerImageTag: 1.0.0
66
dockerRepository: airbyte/destination-mysql
77
githubIssueLabel: destination-mysql
88
icon: mysql.svg
99
license: ELv2
1010
name: MySQL
11-
normalizationConfig:
12-
normalizationIntegrationType: mysql
13-
normalizationRepository: airbyte/normalization-mysql
14-
normalizationTag: 0.4.3
1511
registries:
1612
cloud:
17-
dockerImageTag: 0.2.0
1813
dockerRepository: airbyte/destination-mysql-strict-encrypt
1914
enabled: true
2015
oss:
21-
dockerImageTag: 0.2.0
2216
enabled: true
2317
releaseStage: alpha
2418
documentationUrl: https://docs.airbyte.com/integrations/destinations/mysql
@@ -29,4 +23,20 @@ data:
2923
sl: 100
3024
ql: 200
3125
supportLevel: community
26+
releases:
27+
breakingChanges:
28+
1.0.0:
29+
message:
30+
"**Do not upgrade until you have run a test upgrade as outlined [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#testing-destinations-v2-for-a-single-connection)**.
31+
32+
This version introduces [Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2),
33+
which provides better error handling and improved final table structures. To review the breaking changes,
34+
and how to upgrade, see [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#quick-start-to-upgrading).
35+
These changes will likely require updates to downstream dbt / SQL models,
36+
which we walk through [here](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#updating-downstream-transformations).
37+
38+
Selecting `Upgrade` will upgrade **all** connections using this destination
39+
at their next sync. You can manually sync existing connections prior to
40+
the next scheduled sync to start the upgrade early."
41+
upgradeDeadline: "2024-06-05"
3242
metadataSpecVersion: "1.0"

airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java

+36-17
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,15 @@
2525
import io.airbyte.commons.json.Jsons;
2626
import io.airbyte.commons.map.MoreMaps;
2727
import io.airbyte.integrations.base.destination.typing_deduping.DestinationHandler;
28+
import io.airbyte.integrations.base.destination.typing_deduping.DestinationV1V2Migrator;
2829
import io.airbyte.integrations.base.destination.typing_deduping.ParsedCatalog;
2930
import io.airbyte.integrations.base.destination.typing_deduping.SqlGenerator;
3031
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
3132
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
3233
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
34+
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlDestinationHandler;
35+
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlSqlGenerator;
36+
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlV1V2Migrator;
3337
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
3438
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
3539
import java.util.Collections;
@@ -60,6 +64,12 @@ public class MySQLDestination extends AbstractJdbcDestination<MinimumDestination
6064
"verifyServerCertificate", "false"),
6165
DEFAULT_JDBC_PARAMETERS);
6266

67+
@Override
68+
@NotNull
69+
protected String getConfigSchemaKey() {
70+
return JdbcUtils.DATABASE_KEY;
71+
}
72+
6373
public static Destination sshWrappedDestination() {
6474
return new SshWrappedDestination(new MySQLDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
6575
}
@@ -120,10 +130,9 @@ protected Map<String, String> getDefaultConnectionProperties(final JsonNode conf
120130

121131
@Override
122132
public JsonNode toJdbcConfig(final JsonNode config) {
123-
final String jdbcUrl = String.format("jdbc:mysql://%s:%s/%s",
133+
final String jdbcUrl = String.format("jdbc:mysql://%s:%s",
124134
config.get(JdbcUtils.HOST_KEY).asText(),
125-
config.get(JdbcUtils.PORT_KEY).asText(),
126-
config.get(JdbcUtils.DATABASE_KEY).asText());
135+
config.get(JdbcUtils.PORT_KEY).asText());
127136

128137
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
129138
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
@@ -141,27 +150,15 @@ public JsonNode toJdbcConfig(final JsonNode config) {
141150

142151
@Override
143152
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
144-
throw new UnsupportedOperationException("mysql does not yet support DV2");
145-
}
146-
147-
@Override
148-
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
149-
return new PropertyNameSimplifyingDataTransformer();
150-
}
151-
152-
public static void main(final String[] args) throws Exception {
153-
final Destination destination = MySQLDestination.sshWrappedDestination();
154-
LOGGER.info("starting destination: {}", MySQLDestination.class);
155-
new IntegrationRunner(destination).run(args);
156-
LOGGER.info("completed destination: {}", MySQLDestination.class);
153+
return new MysqlSqlGenerator();
157154
}
158155

159156
@NotNull
160157
@Override
161158
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
162159
@NotNull JdbcDatabase database,
163160
@NotNull String rawTableSchema) {
164-
throw new UnsupportedOperationException("Mysql does not yet support DV2");
161+
return new MysqlDestinationHandler(database, rawTableSchema);
165162
}
166163

167164
@NotNull
@@ -173,4 +170,26 @@ protected List<Migration<MinimumDestinationState>> getMigrations(@NotNull JdbcDa
173170
return Collections.emptyList();
174171
}
175172

173+
@Override
174+
protected DestinationV1V2Migrator getV1V2Migrator(JdbcDatabase database, String databaseName) {
175+
return new MysqlV1V2Migrator(database);
176+
}
177+
178+
@Override
179+
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
180+
return new PropertyNameSimplifyingDataTransformer();
181+
}
182+
183+
@Override
184+
public boolean isV2Destination() {
185+
return true;
186+
}
187+
188+
public static void main(final String[] args) throws Exception {
189+
final Destination destination = MySQLDestination.sshWrappedDestination();
190+
LOGGER.info("starting destination: {}", MySQLDestination.class);
191+
new IntegrationRunner(destination).run(args);
192+
LOGGER.info("completed destination: {}", MySQLDestination.class);
193+
}
194+
176195
}

0 commit comments

Comments
 (0)