18
18
19
19
import com .google .api .core .ApiFuture ;
20
20
import com .google .api .core .ApiFutures ;
21
- import com .google .api .core .ListenableFutureToApiFuture ;
22
21
import com .google .api .core .SettableApiFuture ;
23
22
import com .google .api .gax .core .ExecutorProvider ;
24
23
import com .google .cloud .spanner .AbstractReadContext .ListenableAsyncResultSet ;
29
28
import com .google .common .collect .ImmutableList ;
30
29
import com .google .common .util .concurrent .ListeningScheduledExecutorService ;
31
30
import com .google .common .util .concurrent .MoreExecutors ;
31
+ import com .google .spanner .v1 .PartialResultSet ;
32
32
import com .google .spanner .v1 .ResultSetMetadata ;
33
33
import com .google .spanner .v1 .ResultSetStats ;
34
34
import java .util .Collection ;
35
35
import java .util .LinkedList ;
36
36
import java .util .List ;
37
- import java .util .concurrent .BlockingDeque ;
38
- import java .util .concurrent .Callable ;
39
- import java .util .concurrent .CountDownLatch ;
40
- import java .util .concurrent .ExecutionException ;
41
- import java .util .concurrent .Executor ;
42
- import java .util .concurrent .Future ;
43
- import java .util .concurrent .LinkedBlockingDeque ;
37
+ import java .util .concurrent .*;
44
38
import java .util .logging .Level ;
45
39
import java .util .logging .Logger ;
46
40
47
41
/** Default implementation for {@link AsyncResultSet}. */
48
- class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet {
42
+ class AsyncResultSetImpl extends ForwardingStructReader
43
+ implements ListenableAsyncResultSet , AsyncResultSet .StreamMessageListener {
49
44
private static final Logger log = Logger .getLogger (AsyncResultSetImpl .class .getName ());
50
45
51
46
/** State of an {@link AsyncResultSetImpl}. */
52
47
private enum State {
53
48
INITIALIZED ,
49
+ IN_PROGRESS ,
54
50
/** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
55
51
SYNC ,
56
52
CONSUMING ,
@@ -115,12 +111,15 @@ private enum State {
115
111
116
112
private State state = State .INITIALIZED ;
117
113
114
+ /** This variable indicates that produce rows thread is initiated */
115
+ private volatile boolean produceRowsInitiated ;
116
+
118
117
/**
119
118
* This variable indicates whether all the results from the underlying result set have been read.
120
119
*/
121
120
private volatile boolean finished ;
122
121
123
- private volatile ApiFuture <Void > result ;
122
+ private volatile SettableApiFuture <Void > result ;
124
123
125
124
/**
126
125
* This variable indicates whether {@link #tryNext()} has returned {@link CursorState#DONE} or a
@@ -329,12 +328,12 @@ public void run() {
329
328
private final CallbackRunnable callbackRunnable = new CallbackRunnable ();
330
329
331
330
/**
332
- * {@link ProduceRowsCallable } reads data from the underlying {@link ResultSet}, places these in
331
+ * {@link ProduceRowsRunnable } reads data from the underlying {@link ResultSet}, places these in
333
332
* the buffer and dispatches the {@link CallbackRunnable} when data is ready to be consumed.
334
333
*/
335
- private class ProduceRowsCallable implements Callable < Void > {
334
+ private class ProduceRowsRunnable implements Runnable {
336
335
@ Override
337
- public Void call () throws Exception {
336
+ public void run () {
338
337
boolean stop = false ;
339
338
boolean hasNext = false ;
340
339
try {
@@ -393,12 +392,17 @@ public Void call() throws Exception {
393
392
}
394
393
// Call the callback if there are still rows in the buffer that need to be processed.
395
394
while (!stop ) {
396
- waitIfPaused ();
397
- startCallbackIfNecessary ();
398
- // Make sure we wait until the callback runner has actually finished.
399
- consumingLatch .await ();
400
- synchronized (monitor ) {
401
- stop = cursorReturnedDoneOrException ;
395
+ try {
396
+ waitIfPaused ();
397
+ startCallbackIfNecessary ();
398
+ // Make sure we wait until the callback runner has actually finished.
399
+ consumingLatch .await ();
400
+ synchronized (monitor ) {
401
+ stop = cursorReturnedDoneOrException ;
402
+ }
403
+ } catch (InterruptedException e ) {
404
+ result .setException (e );
405
+ return ;
402
406
}
403
407
}
404
408
} finally {
@@ -410,14 +414,16 @@ public Void call() throws Exception {
410
414
}
411
415
synchronized (monitor ) {
412
416
if (executionException != null ) {
417
+ result .setException (executionException );
413
418
throw executionException ;
414
419
}
415
420
if (state == State .CANCELLED ) {
421
+ result .setException (CANCELLED_EXCEPTION );
416
422
throw CANCELLED_EXCEPTION ;
417
423
}
418
424
}
419
425
}
420
- return null ;
426
+ result . set ( null ) ;
421
427
}
422
428
423
429
private void waitIfPaused () throws InterruptedException {
@@ -449,6 +455,21 @@ private void startCallbackWithBufferLatchIfNecessary(int bufferLatch) {
449
455
}
450
456
}
451
457
458
+ private class InitiateStreamingRunnable implements Runnable {
459
+
460
+ @ Override
461
+ public void run () {
462
+ try {
463
+ if (!initiateStreaming (AsyncResultSetImpl .this )) {
464
+ initiateProduceRows ();
465
+ }
466
+ } catch (SpannerException e ) {
467
+ executionException = e ;
468
+ initiateProduceRows ();
469
+ }
470
+ }
471
+ }
472
+
452
473
/** Sets the callback for this {@link AsyncResultSet}. */
453
474
@ Override
454
475
public ApiFuture <Void > setCallback (Executor exec , ReadyCallback cb ) {
@@ -458,16 +479,24 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
458
479
this .state == State .INITIALIZED , "callback may not be set multiple times" );
459
480
460
481
// Start to fetch data and buffer these.
461
- this .result =
462
- new ListenableFutureToApiFuture <>(this .service .submit (new ProduceRowsCallable ()));
482
+ this .result = SettableApiFuture .create ();
483
+ this .state = State .IN_PROGRESS ;
484
+ this .service .execute (new InitiateStreamingRunnable ());
463
485
this .executor = MoreExecutors .newSequentialExecutor (Preconditions .checkNotNull (exec ));
464
486
this .callback = Preconditions .checkNotNull (cb );
465
- this .state = State .RUNNING ;
466
487
pausedLatch .countDown ();
467
488
return result ;
468
489
}
469
490
}
470
491
492
+ private void initiateProduceRows () {
493
+ this .service .execute (new ProduceRowsRunnable ());
494
+ if (this .state == State .IN_PROGRESS ) {
495
+ this .state = State .RUNNING ;
496
+ }
497
+ produceRowsInitiated = true ;
498
+ }
499
+
471
500
Future <Void > getResult () {
472
501
return result ;
473
502
}
@@ -578,6 +607,11 @@ public ResultSetMetadata getMetadata() {
578
607
return delegateResultSet .get ().getMetadata ();
579
608
}
580
609
610
+ @ Override
611
+ public boolean initiateStreaming (StreamMessageListener streamMessageListener ) {
612
+ return delegateResultSet .get ().initiateStreaming (streamMessageListener );
613
+ }
614
+
581
615
@ Override
582
616
protected void checkValidState () {
583
617
synchronized (monitor ) {
@@ -593,4 +627,28 @@ public Struct getCurrentRowAsStruct() {
593
627
checkValidState ();
594
628
return currentRow ;
595
629
}
630
+
631
+ @ Override
632
+ public void onStreamMessage (
633
+ PartialResultSet partialResultSet ,
634
+ int prefetchChunks ,
635
+ int currentBufferSize ,
636
+ StreamMessageRequestor streamMessageRequestor ) {
637
+ synchronized (monitor ) {
638
+ if (produceRowsInitiated ) {
639
+ return ;
640
+ }
641
+ // if PartialResultSet contains resume token or buffer size is more than configured size or
642
+ // we have reached end of stream, we can start the thread
643
+ boolean startJobThread =
644
+ !partialResultSet .getResumeToken ().isEmpty ()
645
+ || currentBufferSize >= prefetchChunks
646
+ || partialResultSet == GrpcStreamIterator .END_OF_STREAM ;
647
+ if (startJobThread || state != State .IN_PROGRESS ) {
648
+ initiateProduceRows ();
649
+ } else {
650
+ streamMessageRequestor .requestMessages (1 );
651
+ }
652
+ }
653
+ }
596
654
}
0 commit comments