Skip to content

Commit df60f3e

Browse files
committed
mysql stuff
1 parent 55c1931 commit df60f3e

File tree

34 files changed

+866
-12
lines changed

34 files changed

+866
-12
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package io.airbyte.integrations.destination.mysql.typing_deduping;
2+
3+
import static org.jooq.impl.DSL.field;
4+
import static org.jooq.impl.DSL.name;
5+
6+
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
7+
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcDestinationHandler;
8+
import io.airbyte.commons.exceptions.SQLRuntimeException;
9+
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
10+
import java.sql.ResultSet;
11+
import java.sql.SQLException;
12+
import java.sql.Timestamp;
13+
import java.time.Instant;
14+
import java.time.temporal.ChronoUnit;
15+
import java.util.Objects;
16+
import java.util.Optional;
17+
import java.util.stream.Stream;
18+
import javax.annotation.Nullable;
19+
import org.jooq.SQLDialect;
20+
import org.jooq.impl.DSL;
21+
import org.slf4j.Logger;
22+
import org.slf4j.LoggerFactory;
23+
24+
public class MysqlDestinationHandler extends JdbcDestinationHandler {
25+
private static final Logger LOGGER = LoggerFactory.getLogger(MysqlDestinationHandler.class);
26+
27+
public MysqlDestinationHandler(final String databaseName, final JdbcDatabase jdbcDatabase) {
28+
super(databaseName, jdbcDatabase, SQLDialect.MYSQL);
29+
}
30+
31+
// mysql's ResultSet#getTimestamp() throws errors like
32+
// `java.sql.SQLDataException: Cannot convert string '2023-01-01T00:00:00Z' to java.sql.Timestamp value`
33+
// so we override the method and replace all of those calls with Instant.parse(rs.getString())
34+
// yes, this is dumb.
35+
@Override
36+
public InitialRawTableState getInitialRawTableState(final StreamId id) throws Exception {
37+
final boolean tableExists = jdbcDatabase.executeMetadataQuery(dbmetadata -> {
38+
LOGGER.info("Retrieving table from Db metadata: {} {} {}", databaseName, id.rawNamespace(), id.rawName());
39+
try (final ResultSet table = dbmetadata.getTables(databaseName, id.rawNamespace(), id.rawName(), null)) {
40+
return table.next();
41+
} catch (final SQLException e) {
42+
LOGGER.error("Failed to retrieve table info from metadata", e);
43+
throw new SQLRuntimeException(e);
44+
}
45+
});
46+
if (!tableExists) {
47+
// There's no raw table at all. Therefore there are no unprocessed raw records, and this sync
48+
// should not filter raw records by timestamp.
49+
return new InitialRawTableState(false, Optional.empty());
50+
}
51+
// And use two explicit queries because COALESCE might not short-circuit evaluation.
52+
// This first query tries to find the oldest raw record with loaded_at = NULL.
53+
// Unsafe query requires us to explicitly close the Stream, which is inconvenient,
54+
// but it's also the only method in the JdbcDatabase interface to return non-string/int types
55+
try (final Stream<Instant> timestampStream = jdbcDatabase.unsafeQuery(
56+
conn -> conn.prepareStatement(
57+
getDslContext().select(field("MIN(_airbyte_extracted_at)").as("min_timestamp"))
58+
.from(name(id.rawNamespace(), id.rawName()))
59+
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
60+
.getSQL()),
61+
record -> parseInstant(record.getString("min_timestamp")))) {
62+
// Filter for nonNull values in case the query returned NULL (i.e. no unloaded records).
63+
final Optional<Instant> minUnloadedTimestamp = timestampStream.filter(Objects::nonNull).findFirst();
64+
if (minUnloadedTimestamp.isPresent()) {
65+
// Decrement by 1 second since timestamp precision varies between databases.
66+
final Optional<Instant> ts = minUnloadedTimestamp
67+
.map(i -> i.minus(1, ChronoUnit.SECONDS));
68+
return new InitialRawTableState(true, ts);
69+
}
70+
}
71+
// If there are no unloaded raw records, then we can safely skip all existing raw records.
72+
// This second query just finds the newest raw record.
73+
try (final Stream<Instant> timestampStream = jdbcDatabase.unsafeQuery(
74+
conn -> conn.prepareStatement(
75+
getDslContext().select(field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
76+
.from(name(id.rawNamespace(), id.rawName()))
77+
.getSQL()),
78+
record -> parseInstant(record.getString("min_timestamp")))) {
79+
// Filter for nonNull values in case the query returned NULL (i.e. no raw records at all).
80+
final Optional<Instant> minUnloadedTimestamp = timestampStream.filter(Objects::nonNull).findFirst();
81+
return new InitialRawTableState(false, minUnloadedTimestamp);
82+
}
83+
}
84+
85+
private static Instant parseInstant(final String ts) {
86+
// Instant.parse requires nonnull input.
87+
if (ts == null) {
88+
return null;
89+
}
90+
return Instant.parse(ts);
91+
}
92+
}

0 commit comments

Comments
 (0)