Skip to content

Commit

Permalink
address comments, improve splitting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
johnjcasey committed Feb 20, 2024
1 parent c886de1 commit f78a73f
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,18 @@ private PCollection<T> expandForDirectRead(
getProjectionPushdownApplied());
List<? extends BoundedSource<T>> sources;
try {
sources = source.split(0, input.getPipeline().getOptions());
//This splitting logic taken from the SDF implementation of Read
long estimatedSize = source.getEstimatedSizeBytes(bqOptions);
// Split into pieces as close to the default desired bundle size but if that would cause too
// few splits then prefer to split up to the default desired number of splits.
long desiredChunkSize;
if (estimatedSize <= 0) {
desiredChunkSize = 64 << 20; // 64mb
} else {
// 1mb --> 1 shard; 1gb --> 32 shards; 1tb --> 1000 shards, 1pb --> 32k shards
desiredChunkSize = Math.max(1 << 20, (long) (1000 * Math.sqrt(estimatedSize)));
}
sources = source.split(desiredChunkSize, bqOptions);
} catch (Exception e) {
throw new RuntimeException("Unable to split TableSource", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ public void testBigQueryStorageQueryWithErrorHandling1M() throws Exception {

errorHandler.close();

//When 1/50 elements fail sequentially, this is the expected success count
PAssert.thatSingleton(count).isEqualTo(10381L);
//this is the total elements, less the successful elements
PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L);
p.run().waitUntilFinish();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ private void runBigQueryIOStorageReadPipelineErrorHandling() throws Exception {

errorHandler.close();

//When 1/50 elements fail sequentially, this is the expected success count
PAssert.thatSingleton(count).isEqualTo(10381L);
//this is the total elements, less the successful elements
PAssert.thatSingleton(errorHandler.getOutput()).isEqualTo(10592L - 10381L);
p.run().waitUntilFinish();
}
Expand Down

0 comments on commit f78a73f

Please sign in to comment.