Skip to content

Commit 6f6c7ba

Browse files
authored
expose more context in DoFns, fix spotify#3579 (spotify#3583)
- `BundleFinalizer` is not supported by `SimpleDoFnRunner` in `DirectRunner`
1 parent 9ac78f2 commit 6f6c7ba

File tree

5 files changed

+13
-13
lines changed

5 files changed

+13
-13
lines changed

scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,14 @@ public abstract class BaseAsyncDoFn<InputT, OutputT, ResourceT, FutureT>
4343
private final ConcurrentLinkedQueue<Throwable> errors = new ConcurrentLinkedQueue<>();
4444

4545
@StartBundle
46-
public void startBundle() {
46+
public void startBundle(StartBundleContext context) {
4747
futures.clear();
4848
results.clear();
4949
errors.clear();
5050
}
5151

5252
@FinishBundle
53-
public void finishBundle(FinishBundleContext c) {
53+
public void finishBundle(FinishBundleContext context) {
5454
if (!futures.isEmpty()) {
5555
try {
5656
waitForFutures(futures.values());
@@ -63,7 +63,7 @@ public void finishBundle(FinishBundleContext c) {
6363
throw new RuntimeException("Failed to process futures", e);
6464
}
6565
}
66-
flush(c);
66+
flush(context);
6767
}
6868

6969
@ProcessElement

scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncLookupDoFn.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ public void setup() {
111111
}
112112

113113
@StartBundle
114-
public void startBundle() {
114+
public void startBundle(StartBundleContext context) {
115115
futures.clear();
116116
results.clear();
117117
requestCount = 0;
@@ -178,7 +178,7 @@ public void processElement(ProcessContext c, BoundedWindow window) {
178178
}
179179

180180
@FinishBundle
181-
public void finishBundle(FinishBundleContext c) {
181+
public void finishBundle(FinishBundleContext context) {
182182
if (!futures.isEmpty()) {
183183
try {
184184
// Block until all pending futures are complete
@@ -191,7 +191,7 @@ public void finishBundle(FinishBundleContext c) {
191191
LOG.error("Failed to process futures", e);
192192
}
193193
}
194-
flush(r -> c.output(KV.of(r.input, r.output), r.timestamp, r.window));
194+
flush(r -> context.output(KV.of(r.input, r.output), r.timestamp, r.window));
195195

196196
// Make sure all requests are processed
197197
Preconditions.checkState(

scio-core/src/main/java/com/spotify/scio/transforms/FileDownloadDoFn.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ public FileDownloadDoFn(
7575
}
7676

7777
@StartBundle
78-
public void startBundle() {
78+
public void startBundle(StartBundleContext context) {
7979
this.batch.clear();
8080
}
8181

@@ -88,8 +88,8 @@ public void processElement(ProcessContext c, BoundedWindow window) {
8888
}
8989

9090
@FinishBundle
91-
public void finishBundle(FinishBundleContext c) {
92-
processBatch(c);
91+
public void finishBundle(FinishBundleContext context) {
92+
processBatch(context);
9393
}
9494

9595
@Override

scio-core/src/main/java/com/spotify/scio/transforms/PipeDoFn.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -186,12 +186,12 @@ private Optional<RuntimeException> runCommands(String stage, List<String[]> comm
186186
}
187187

188188
@StartBundle
189-
public void startBundle() {
189+
public void startBundle(StartBundleContext context) {
190190
isNewBundle = true;
191191
}
192192

193193
@FinishBundle
194-
public void finishBundle() {
194+
public void finishBundle(FinishBundleContext context) {
195195
try {
196196
stdIn.close();
197197
int exitCode = pipeProcess.waitFor();

scio-test/src/test/scala/com/spotify/scio/transforms/AsyncDoFnSpec.scala

+2-2
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ abstract class AsyncDoFnTester[P[_], F[_]] extends BaseDoFnTester {
182182
private val fn = {
183183
val f = newDoFn
184184
f.setup()
185-
f.startBundle()
185+
f.startBundle(null)
186186
f
187187
}
188188

@@ -238,7 +238,7 @@ abstract class AsyncDoFnTester[P[_], F[_]] extends BaseDoFnTester {
238238
nextElement = 0
239239
pending.clear()
240240
outputBuffer.clear()
241-
fn.startBundle()
241+
fn.startBundle(null)
242242

243243
result
244244
}

0 commit comments

Comments
 (0)