18
18
19
19
import com .google .api .core .ApiFuture ;
20
20
import com .google .api .core .ApiFutures ;
21
- import com .google .api .core .ListenableFutureToApiFuture ;
22
21
import com .google .api .core .SettableApiFuture ;
23
22
import com .google .api .gax .core .ExecutorProvider ;
24
23
import com .google .cloud .spanner .AbstractReadContext .ListenableAsyncResultSet ;
29
28
import com .google .common .collect .ImmutableList ;
30
29
import com .google .common .util .concurrent .ListeningScheduledExecutorService ;
31
30
import com .google .common .util .concurrent .MoreExecutors ;
31
+ import com .google .spanner .v1 .PartialResultSet ;
32
32
import com .google .spanner .v1 .ResultSetMetadata ;
33
33
import com .google .spanner .v1 .ResultSetStats ;
34
34
import java .util .Collection ;
35
35
import java .util .LinkedList ;
36
36
import java .util .List ;
37
37
import java .util .concurrent .BlockingDeque ;
38
- import java .util .concurrent .Callable ;
39
38
import java .util .concurrent .CountDownLatch ;
40
39
import java .util .concurrent .ExecutionException ;
41
40
import java .util .concurrent .Executor ;
45
44
import java .util .logging .Logger ;
46
45
47
46
/** Default implementation for {@link AsyncResultSet}. */
48
- class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {
47
+ class AsyncResultSetImpl extends ForwardingStructReader
48
+ implements ListenableAsyncResultSet , AsyncResultSet .StreamMessageListener {
49
49
private static final Logger log = Logger .getLogger (AsyncResultSetImpl .class .getName ());
50
50
51
51
/** State of an {@link AsyncResultSetImpl}. */
52
52
private enum State {
53
53
INITIALIZED ,
54
+ STREAMING_INITIALIZED ,
54
55
/** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
55
56
SYNC ,
56
57
CONSUMING ,
@@ -115,12 +116,15 @@ private enum State {
115
116
116
117
private State state = State .INITIALIZED ;
117
118
119
+ /** This variable indicates that produce rows thread is initiated */
120
+ private volatile boolean produceRowsInitiated ;
121
+
118
122
/**
119
123
* This variable indicates whether all the results from the underlying result set have been read.
120
124
*/
121
125
private volatile boolean finished ;
122
126
123
- private volatile ApiFuture <Void > result ;
127
+ private volatile SettableApiFuture <Void > result ;
124
128
125
129
/**
126
130
* This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a
@@ -329,12 +333,12 @@ public void run() {
329
333
private final CallbackRunnable callbackRunnable = new CallbackRunnable ();
330
334
331
335
/**
332
- * {@link ProduceRowsCallable } reads data from the underlying {@link ResultSet}, places these in
336
+ * {@link ProduceRowsRunnable } reads data from the underlying {@link ResultSet}, places these in
333
337
* the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed.
334
338
*/
335
- private class ProduceRowsCallable implements Callable < Void > {
339
+ private class ProduceRowsRunnable implements Runnable {
336
340
@ Override
337
- public Void call () throws Exception {
341
+ public void run () {
338
342
boolean stop = false ;
339
343
boolean hasNext = false ;
340
344
try {
@@ -393,12 +397,17 @@ public Void call() throws Exception {
393
397
}
394
398
// Call the callback if there are still rows in the buffer that need to be processed.
395
399
while (!stop ) {
396
- waitIfPaused ();
397
- startCallbackIfNecessary ();
398
- // Make sure we wait until the callback runner has actually finished.
399
- consumingLatch .await ();
400
- synchronized (monitor ) {
401
- stop = cursorReturnedDoneOrException ;
400
+ try {
401
+ waitIfPaused ();
402
+ startCallbackIfNecessary ();
403
+ // Make sure we wait until the callback runner has actually finished.
404
+ consumingLatch .await ();
405
+ synchronized (monitor ) {
406
+ stop = cursorReturnedDoneOrException ;
407
+ }
408
+ } catch (Throwable e ) {
409
+ result .setException (e );
410
+ return ;
402
411
}
403
412
}
404
413
} finally {
@@ -410,14 +419,14 @@ public Void call() throws Exception {
410
419
}
411
420
synchronized (monitor ) {
412
421
if (executionException != null ) {
413
- throw executionException ;
414
- }
415
- if (state == State .CANCELLED ) {
416
- throw CANCELLED_EXCEPTION ;
422
+ result .setException (executionException );
423
+ } else if (state == State .CANCELLED ) {
424
+ result .setException (CANCELLED_EXCEPTION );
425
+ } else {
426
+ result .set (null );
417
427
}
418
428
}
419
429
}
420
- return null ;
421
430
}
422
431
423
432
private void waitIfPaused () throws InterruptedException {
@@ -449,6 +458,26 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
449
458
}
450
459
}
451
460
461
+ private class InitiateStreamingRunnable implements Runnable {
462
+
463
+ @ Override
464
+ public void run () {
465
+ try {
466
+ // This method returns true if the underlying result set is a streaming result set (e.g. a
467
+ // GrpcResultSet).
468
+ // Those result sets will trigger initiateProduceRows() when the first results are received.
469
+ // Non-streaming result sets do not trigger this callback, and for those result sets, we
470
+ // need to eagerly start the ProduceRowsRunnable.
471
+ if (!initiateStreaming (AsyncResultSetImpl .this )) {
472
+ initiateProduceRows ();
473
+ }
474
+ } catch (Throwable exception ) {
475
+ executionException = SpannerExceptionFactory .asSpannerException (exception );
476
+ initiateProduceRows ();
477
+ }
478
+ }
479
+ }
480
+
452
481
/** Sets the callback for this {@link AsyncResultSet}. */
453
482
@ Override
454
483
public ApiFuture <Void > setCallback (Executor exec , ReadyCallback cb ) {
@@ -458,16 +487,24 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
458
487
this .state == State .INITIALIZED , "callback may not be set multiple times" );
459
488
460
489
// Start to fetch data and buffer these.
461
- this .result =
462
- new ListenableFutureToApiFuture <>(this .service .submit (new ProduceRowsCallable ()));
490
+ this .result = SettableApiFuture .create ();
491
+ this .state = State .STREAMING_INITIALIZED ;
492
+ this .service .execute (new InitiateStreamingRunnable ());
463
493
this .executor = MoreExecutors .newSequentialExecutor (Preconditions .checkNotNull (exec ));
464
494
this .callback = Preconditions .checkNotNull (cb );
465
- this .state = State .RUNNING ;
466
495
pausedLatch .countDown ();
467
496
return result ;
468
497
}
469
498
}
470
499
500
+ private void initiateProduceRows () {
501
+ if (this .state == State .STREAMING_INITIALIZED ) {
502
+ this .state = State .RUNNING ;
503
+ }
504
+ produceRowsInitiated = true ;
505
+ this .service .execute (new ProduceRowsRunnable ());
506
+ }
507
+
471
508
Future <Void > getResult () {
472
509
return result ;
473
510
}
@@ -578,6 +615,10 @@ public ResultSetMetadata getMetadata() {
578
615
return delegateResultSet .get ().getMetadata ();
579
616
}
580
617
618
+ boolean initiateStreaming (StreamMessageListener streamMessageListener ) {
619
+ return StreamingUtil .initiateStreaming (delegateResultSet .get (), streamMessageListener );
620
+ }
621
+
581
622
@ Override
582
623
protected void checkValidState () {
583
624
synchronized (monitor ) {
@@ -593,4 +634,22 @@ public Struct getCurrentRowAsStruct() {
593
634
checkValidState ();
594
635
return currentRow ;
595
636
}
637
+
638
+ @ Override
639
+ public void onStreamMessage (PartialResultSet partialResultSet , boolean bufferIsFull ) {
640
+ synchronized (monitor ) {
641
+ if (produceRowsInitiated ) {
642
+ return ;
643
+ }
644
+ // if PartialResultSet contains a resume token or buffer size is full, or
645
+ // we have reached the end of the stream, we can start the thread.
646
+ boolean startJobThread =
647
+ !partialResultSet .getResumeToken ().isEmpty ()
648
+ || bufferIsFull
649
+ || partialResultSet == GrpcStreamIterator .END_OF_STREAM ;
650
+ if (startJobThread || state != State .STREAMING_INITIALIZED ) {
651
+ initiateProduceRows ();
652
+ }
653
+ }
654
+ }
596
655
}
0 commit comments