Skip to content

Commit 2870be2

Browse files
committed
mysql dv2 raw table impl
1 parent 2067931 commit 2870be2

File tree

58 files changed

+742
-95
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

58 files changed

+742
-95
lines changed

airbyte-integrations/connectors/destination-mysql-strict-encrypt/build.gradle

+1-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
airbyteJavaConnector {
77
cdkVersionRequired = '0.30.2'
88
features = ['db-destinations', 'typing-deduping']
9-
useLocalCdk = false
9+
useLocalCdk = true
1010
}
1111

1212
//remove once upgrading the CDK version to 0.4.x or later
@@ -31,9 +31,3 @@ dependencies {
3131
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-mysql')
3232
integrationTestJavaImplementation libs.testcontainers.mysql
3333
}
34-
35-
configurations.all {
36-
resolutionStrategy {
37-
force libs.jooq
38-
}
39-
}

airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/mysql/MySQLStrictEncryptDestinationAcceptanceTest.java

+15-16
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import org.junit.jupiter.api.Test;
4040
import org.testcontainers.containers.MySQLContainer;
4141

42+
@Disabled
4243
public class MySQLStrictEncryptDestinationAcceptanceTest extends JdbcDestinationAcceptanceTest {
4344

4445
private MySQLContainer<?> db;
@@ -113,23 +114,22 @@ protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
113114
}
114115

115116
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
116-
try (final DSLContext dslContext = DSLContextFactory.create(
117+
final DSLContext dslContext = DSLContextFactory.create(
117118
db.getUsername(),
118119
db.getPassword(),
119120
db.getDriverClassName(),
120121
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
121122
db.getHost(),
122123
db.getFirstMappedPort(),
123124
db.getDatabaseName()),
124-
SQLDialect.MYSQL)) {
125-
return new Database(dslContext).query(
126-
ctx -> ctx
127-
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
128-
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
129-
.stream()
130-
.map(this::getJsonFromRecord)
131-
.collect(Collectors.toList()));
132-
}
125+
SQLDialect.MYSQL);
126+
return new Database(dslContext).query(
127+
ctx -> ctx
128+
.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName,
129+
JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
130+
.stream()
131+
.map(this::getJsonFromRecord)
132+
.collect(Collectors.toList()));
133133
}
134134

135135
@Override
@@ -162,19 +162,18 @@ private void grantCorrectPermissions() {
162162
}
163163

164164
private void executeQuery(final String query) {
165-
try (final DSLContext dslContext = DSLContextFactory.create(
165+
final DSLContext dslContext = DSLContextFactory.create(
166166
"root",
167167
"test",
168168
db.getDriverClassName(),
169169
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
170170
db.getHost(),
171171
db.getFirstMappedPort(),
172172
db.getDatabaseName()),
173-
SQLDialect.MYSQL)) {
174-
new Database(dslContext).query(
175-
ctx -> ctx
176-
.execute(query));
177-
} catch (final SQLException e) {
173+
SQLDialect.MYSQL);
174+
try {
175+
new Database(dslContext).query(ctx -> ctx.execute(query));
176+
} catch (SQLException e) {
178177
throw new RuntimeException(e);
179178
}
180179
}

airbyte-integrations/connectors/destination-mysql-strict-encrypt/src/test/resources/expected_spec.json

+7-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/mysql",
33
"supportsIncremental": true,
4-
"supportsNormalization": true,
4+
"supportsNormalization": false,
55
"supportsDBT": true,
66
"supported_destination_sync_modes": ["overwrite", "append"],
77
"connectionSpecification": {
@@ -165,6 +165,12 @@
165165
}
166166
}
167167
]
168+
},
169+
"raw_data_schema": {
170+
"type": "string",
171+
"description": "The database to write raw tables into",
172+
"title": "Raw table database (defaults to airbyte_internal)",
173+
"order": 7
168174
}
169175
}
170176
}

airbyte-integrations/connectors/destination-mysql/build.gradle

+2-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
airbyteJavaConnector {
77
cdkVersionRequired = '0.30.2'
88
features = ['db-destinations', 'typing-deduping']
9-
useLocalCdk = false
9+
useLocalCdk = true
1010
}
1111

1212
//remove once upgrading the CDK version to 0.4.x or later
@@ -26,10 +26,5 @@ application {
2626
dependencies {
2727
implementation 'mysql:mysql-connector-java:8.0.22'
2828
integrationTestJavaImplementation libs.testcontainers.mysql
29-
}
30-
31-
configurations.all {
32-
resolutionStrategy {
33-
force libs.jooq
34-
}
29+
testFixturesApi libs.testcontainers.mysql
3530
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
# our testcontainer has issues with too much concurrency.
2+
# 4 threads seems to be the sweet spot.
3+
testExecutionConcurrency=4
4+
JunitMethodExecutionTimeout=15 m

airbyte-integrations/connectors/destination-mysql/metadata.yaml

-4
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@ data:
88
icon: mysql.svg
99
license: ELv2
1010
name: MySQL
11-
normalizationConfig:
12-
normalizationIntegrationType: mysql
13-
normalizationRepository: airbyte/normalization-mysql
14-
normalizationTag: 0.4.3
1511
registries:
1612
cloud:
1713
dockerImageTag: 0.2.0

airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
2222
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
2323
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
24+
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler;
2425
import io.airbyte.commons.exceptions.ConnectionErrorException;
2526
import io.airbyte.commons.json.Jsons;
2627
import io.airbyte.commons.map.MoreMaps;
@@ -30,13 +31,15 @@
3031
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
3132
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
3233
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
34+
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlSqlGenerator;
3335
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
3436
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
3537
import java.util.Collections;
3638
import java.util.List;
3739
import java.util.Map;
3840
import javax.sql.DataSource;
3941
import org.jetbrains.annotations.NotNull;
42+
import org.jooq.SQLDialect;
4043
import org.slf4j.Logger;
4144
import org.slf4j.LoggerFactory;
4245

@@ -141,14 +144,24 @@ public JsonNode toJdbcConfig(final JsonNode config) {
141144

142145
@Override
143146
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
144-
throw new UnsupportedOperationException("mysql does not yet support DV2");
147+
return new MysqlSqlGenerator();
145148
}
146149

147150
@Override
148151
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
149152
return new PropertyNameSimplifyingDataTransformer();
150153
}
151154

155+
@Override
156+
public boolean isV2Destination() {
157+
return true;
158+
}
159+
160+
@Override
161+
protected boolean shouldAlwaysDisableTypeDedupe() {
162+
return true;
163+
}
164+
152165
public static void main(final String[] args) throws Exception {
153166
final Destination destination = MySQLDestination.sshWrappedDestination();
154167
LOGGER.info("starting destination: {}", MySQLDestination.class);
@@ -161,7 +174,7 @@ public static void main(final String[] args) throws Exception {
161174
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
162175
@NotNull JdbcDatabase database,
163176
@NotNull String rawTableSchema) {
164-
throw new UnsupportedOperationException("Mysql does not yet support DV2");
177+
return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT);
165178
}
166179

167180
@NotNull

0 commit comments

Comments
 (0)