Skip to content

Commit

Permalink
Fix race condition (#34058)
Browse files Browse the repository at this point in the history
* Update JdbcIO.java

* Fix vars

* Fix typing problems

* Use lineage check so that we maintain safety

* Remove dupe assignment

* Assign correctly inside lock to avoid race

* Add other getConnection

* Fix bad merge

* Remove unneeded logs

* Shrink critical section

* Cleanup

* Fix bad refactor

* typo
  • Loading branch information
damccorm authored Mar 1, 2025
1 parent 228f028 commit 468001b
Showing 1 changed file with 18 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -1609,6 +1611,7 @@ private static class ReadFn<ParameterT, OutputT> extends DoFn<ParameterT, Output
private final int fetchSize;
private final boolean disableAutoCommit;

private Lock connectionLock = new ReentrantLock();
private @Nullable DataSource dataSource;
private @Nullable Connection connection;
private @Nullable KV<@Nullable String, String> reportedLineage;
Expand Down Expand Up @@ -1637,8 +1640,13 @@ private Connection getConnection() throws SQLException {
Connection connection = this.connection;
if (connection == null) {
DataSource validSource = checkStateNotNull(this.dataSource);
connection = checkStateNotNull(validSource).getConnection();
this.connection = connection;
connectionLock.lock();
try {
connection = validSource.getConnection();
this.connection = connection;
} finally {
connectionLock.unlock();
}

// report Lineage if not haven't done so
KV<@Nullable String, String> schemaWithTable =
Expand Down Expand Up @@ -2663,6 +2671,7 @@ abstract Builder<T, V> setMaxBatchBufferingDuration(
Metrics.distribution(WriteFn.class, "milliseconds_per_batch");

private final WriteFnSpec<T, V> spec;
private Lock connectionLock = new ReentrantLock();
private @Nullable DataSource dataSource;
private @Nullable Connection connection;
private @Nullable PreparedStatement preparedStatement;
Expand Down Expand Up @@ -2700,7 +2709,13 @@ private Connection getConnection() throws SQLException {
Connection connection = this.connection;
if (connection == null) {
DataSource validSource = checkStateNotNull(dataSource);
connection = validSource.getConnection();
connectionLock.lock();
try {
connection = validSource.getConnection();
} finally {
connectionLock.unlock();
}

connection.setAutoCommit(false);
preparedStatement =
connection.prepareStatement(checkStateNotNull(spec.getStatement()).get());
Expand Down

0 comments on commit 468001b

Please sign in to comment.