Skip to content

Commit 81b0ca0

Browse files
committed
mysql dv2 raw table impl
1 parent 4e09b56 commit 81b0ca0

File tree

52 files changed

+557
-51
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+557
-51
lines changed

airbyte-integrations/connectors/destination-mysql-strict-encrypt/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
airbyteJavaConnector {
77
cdkVersionRequired = '0.30.2'
88
features = ['db-destinations', 'typing-deduping']
9-
useLocalCdk = false
9+
useLocalCdk = true
1010
}
1111

1212
//remove once upgrading the CDK version to 0.4.x or later

airbyte-integrations/connectors/destination-mysql/build.gradle

+1-7
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ plugins {
66
airbyteJavaConnector {
77
cdkVersionRequired = '0.30.2'
88
features = ['db-destinations', 'typing-deduping']
9-
useLocalCdk = false
9+
useLocalCdk = true
1010
}
1111

1212
//remove once upgrading the CDK version to 0.4.x or later
@@ -27,9 +27,3 @@ dependencies {
2727
implementation 'mysql:mysql-connector-java:8.0.22'
2828
integrationTestJavaImplementation libs.testcontainers.mysql
2929
}
30-
31-
configurations.all {
32-
resolutionStrategy {
33-
force libs.jooq
34-
}
35-
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# our testcontainer has issues with too much concurrency.
2+
# 4 threads seems to be the sweet spot.
3+
testExecutionConcurrency=4

airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLDestination.java

+15-2
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
2222
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
2323
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
24+
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.NoOpJdbcDestinationHandler;
2425
import io.airbyte.commons.exceptions.ConnectionErrorException;
2526
import io.airbyte.commons.json.Jsons;
2627
import io.airbyte.commons.map.MoreMaps;
@@ -30,13 +31,15 @@
3031
import io.airbyte.integrations.base.destination.typing_deduping.migrators.Migration;
3132
import io.airbyte.integrations.base.destination.typing_deduping.migrators.MinimumDestinationState;
3233
import io.airbyte.integrations.destination.mysql.MySQLSqlOperations.VersionCompatibility;
34+
import io.airbyte.integrations.destination.mysql.typing_deduping.MysqlSqlGenerator;
3335
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
3436
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
3537
import java.util.Collections;
3638
import java.util.List;
3739
import java.util.Map;
3840
import javax.sql.DataSource;
3941
import org.jetbrains.annotations.NotNull;
42+
import org.jooq.SQLDialect;
4043
import org.slf4j.Logger;
4144
import org.slf4j.LoggerFactory;
4245

@@ -141,14 +144,24 @@ public JsonNode toJdbcConfig(final JsonNode config) {
141144

142145
@Override
143146
protected JdbcSqlGenerator getSqlGenerator(final JsonNode config) {
144-
throw new UnsupportedOperationException("mysql does not yet support DV2");
147+
return new MysqlSqlGenerator();
145148
}
146149

147150
@Override
148151
protected StreamAwareDataTransformer getDataTransformer(ParsedCatalog parsedCatalog, String defaultNamespace) {
149152
return new PropertyNameSimplifyingDataTransformer();
150153
}
151154

155+
@Override
156+
public boolean isV2Destination() {
157+
return true;
158+
}
159+
160+
@Override
161+
protected boolean shouldAlwaysDisableTypeDedupe() {
162+
return true;
163+
}
164+
152165
public static void main(final String[] args) throws Exception {
153166
final Destination destination = MySQLDestination.sshWrappedDestination();
154167
LOGGER.info("starting destination: {}", MySQLDestination.class);
@@ -161,7 +174,7 @@ public static void main(final String[] args) throws Exception {
161174
protected JdbcDestinationHandler<MinimumDestinationState> getDestinationHandler(@NotNull String databaseName,
162175
@NotNull JdbcDatabase database,
163176
@NotNull String rawTableSchema) {
164-
throw new UnsupportedOperationException("Mysql does not yet support DV2");
177+
return new NoOpJdbcDestinationHandler<>(databaseName, database, rawTableSchema, SQLDialect.DEFAULT);
165178
}
166179

167180
@NotNull

airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java

+107-7
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,14 @@
44

55
package io.airbyte.integrations.destination.mysql;
66

7+
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT;
8+
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_ID;
9+
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT;
10+
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_META;
11+
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_AB_RAW_ID;
12+
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_DATA;
13+
import static io.airbyte.cdk.integrations.base.JavaBaseConstants.COLUMN_NAME_EMITTED_AT;
14+
715
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
816
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
917
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
@@ -14,7 +22,12 @@
1422
import java.nio.file.Files;
1523
import java.sql.SQLException;
1624
import java.sql.Statement;
25+
import java.util.Arrays;
1726
import java.util.List;
27+
import java.util.Spliterator;
28+
import java.util.stream.Collectors;
29+
import java.util.stream.IntStream;
30+
import java.util.stream.StreamSupport;
1831

1932
@SuppressFBWarnings(
2033
value = {"SQL_NONCONSTANT_STRING_PASSED_TO_EXECUTE"},
@@ -42,7 +55,15 @@ public void insertRecordsInternal(final JdbcDatabase database,
4255
try {
4356
final File tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile();
4457

45-
loadDataIntoTable(database, records, schemaName, tmpTableName, tmpFile);
58+
loadDataIntoTable(
59+
database,
60+
records,
61+
schemaName,
62+
tmpTableName,
63+
tmpFile,
64+
COLUMN_NAME_AB_ID,
65+
COLUMN_NAME_DATA,
66+
COLUMN_NAME_EMITTED_AT);
4667

4768
Files.delete(tmpFile.toPath());
4869
} catch (final IOException e) {
@@ -56,25 +77,82 @@ protected void insertRecordsInternalV2(final JdbcDatabase database,
5677
final String schemaName,
5778
final String tableName)
5879
throws Exception {
59-
throw new UnsupportedOperationException("mysql does not yet support DV2");
80+
if (records.isEmpty()) {
81+
return;
82+
}
83+
84+
verifyLocalFileEnabled(database);
85+
try {
86+
final File tmpFile = Files.createTempFile(tableName + "-", ".tmp").toFile();
87+
88+
loadDataIntoTable(
89+
database,
90+
records,
91+
schemaName,
92+
tableName,
93+
tmpFile,
94+
COLUMN_NAME_AB_RAW_ID,
95+
COLUMN_NAME_DATA,
96+
COLUMN_NAME_AB_EXTRACTED_AT,
97+
COLUMN_NAME_AB_LOADED_AT,
98+
COLUMN_NAME_AB_META);
99+
Files.delete(tmpFile.toPath());
100+
} catch (final IOException e) {
101+
throw new SQLException(e);
102+
}
60103
}
61104

62105
private void loadDataIntoTable(final JdbcDatabase database,
63106
final List<PartialAirbyteMessage> records,
64107
final String schemaName,
65108
final String tmpTableName,
66-
final File tmpFile)
109+
final File tmpFile,
110+
final String... columnNames)
67111
throws SQLException {
68112
database.execute(connection -> {
69113
try {
70114
writeBatchToFile(tmpFile, records);
71115

72116
final String absoluteFile = "'" + tmpFile.getAbsolutePath() + "'";
73117

74-
final String query = String.format(
75-
"LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s FIELDS TERMINATED BY ',' ENCLOSED BY '\"' ESCAPED BY '\\\"' LINES TERMINATED BY '\\r\\n'",
76-
absoluteFile, schemaName, tmpTableName);
118+
/*
119+
We want to generate a query like:
120+
121+
LOAD DATA LOCAL INFILE '/a/b/c' INTO TABLE foo.bar
122+
FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY '\"'
123+
LINES TERMINATED BY '\r\n'
124+
(@c0, @c1, @c2, @c3, @c4)
125+
SET
126+
_airybte_raw_id = NULLIT(@c0, ''),
127+
_airbyte_data = NULLIT(@c1, ''),
128+
_airbyte_extracted_at = NULLIT(@c2, ''),
129+
_airbyte_loaded_at = NULLIT(@c3, ''),
130+
_airbyte_meta = NULLIT(@c4, '')
77131
132+
This is to avoid weird default values (e.g. 0000-00-00 00:00:00) when the value should be NULL.
133+
*/
134+
135+
final String colVarDecls = "("
136+
+ IntStream.range(0, columnNames.length).mapToObj(i -> "@c" + i).collect(Collectors.joining(","))
137+
+ ")";
138+
final String colAssignments = IntStream.range(0, columnNames.length)
139+
.mapToObj(i -> columnNames[i] + " = NULLIF(@c" + i + ", '')")
140+
.collect(Collectors.joining(","));
141+
142+
final String query = String.format(
143+
"""
144+
LOAD DATA LOCAL INFILE %s INTO TABLE %s.%s
145+
FIELDS TERMINATED BY ',' ENCLOSED BY '"' ESCAPED BY '\\"'
146+
LINES TERMINATED BY '\\r\\n'
147+
%s
148+
SET
149+
%s
150+
""",
151+
absoluteFile,
152+
schemaName,
153+
tmpTableName,
154+
colVarDecls,
155+
colAssignments);
78156
try (final Statement stmt = connection.createStatement()) {
79157
stmt.execute(query);
80158
}
@@ -129,7 +207,7 @@ private boolean checkIfLocalFileIsEnabled(final JdbcDatabase database) throws SQ
129207
}
130208

131209
@Override
132-
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
210+
protected String createTableQueryV1(String schemaName, String tableName) {
133211
// MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column,
134212
// 256 is enough
135213
return String.format(
@@ -141,6 +219,28 @@ public String createTableQuery(final JdbcDatabase database, final String schemaN
141219
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA, JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
142220
}
143221

222+
protected String createTableQueryV2(String schemaName, String tableName) {
223+
// MySQL requires byte information with VARCHAR. Since we are using uuid as value for the column,
224+
// 256 is enough
225+
return String.format(
226+
"""
227+
CREATE TABLE IF NOT EXISTS %s.%s (\s
228+
%s VARCHAR(256) PRIMARY KEY,
229+
%s JSON,
230+
%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
231+
%s TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
232+
%s JSON
233+
);
234+
""",
235+
schemaName,
236+
tableName,
237+
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
238+
JavaBaseConstants.COLUMN_NAME_DATA,
239+
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
240+
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
241+
JavaBaseConstants.COLUMN_NAME_AB_META);
242+
}
243+
144244
public static class VersionCompatibility {
145245

146246
private final double version;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package io.airbyte.integrations.destination.mysql.typing_deduping
2+
3+
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator
4+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId
5+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId.Companion.concatenateRawTableName
6+
import io.airbyte.integrations.destination.mysql.MySQLNameTransformer
7+
8+
class MysqlSqlGenerator : RawOnlySqlGenerator(MySQLNameTransformer()) {
9+
10+
override fun buildStreamId(
11+
namespace: String,
12+
name: String,
13+
rawNamespaceOverride: String
14+
): StreamId {
15+
return StreamId(
16+
namingTransformer.getNamespace(namespace),
17+
namingTransformer.convertStreamName(name),
18+
namingTransformer.getNamespace(rawNamespaceOverride),
19+
// The default implementation is just convertStreamName(concatenate()).
20+
// Wrap in getIdentifier to also truncate.
21+
// This is probably only necessary because the mysql name transformer
22+
// doesn't call convertStreamName in getIdentifier (probably a bug?).
23+
// But that entire NameTransformer interface is a hot mess anyway.
24+
namingTransformer.getIdentifier(
25+
namingTransformer.convertStreamName(
26+
concatenateRawTableName(namespace, name),
27+
),
28+
),
29+
namespace,
30+
name,
31+
)
32+
}
33+
}

airbyte-integrations/connectors/destination-mysql/src/main/resources/spec.json

+6
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,12 @@
5858
"title": "JDBC URL Params",
5959
"type": "string",
6060
"order": 6
61+
},
62+
"raw_table_database": {
63+
"type": "string",
64+
"description": "The database to write raw tables into",
65+
"title": "Raw table database (defaults to airbyte_internal)",
66+
"order": 7
6167
}
6268
}
6369
}

0 commit comments

Comments
 (0)