Skip to content

Commit 3089e0b

Browse files
fix: Avoid blocking thread in AsyncResultSet
1 parent 7096899 commit 3089e0b

11 files changed

+87
-55
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

+5-1
Original file line numberDiff line numberDiff line change
@@ -790,5 +790,9 @@
790790
<className>com/google/cloud/spanner/connection/Connection</className>
791791
<method>boolean isAutoBatchDmlUpdateCountVerification()</method>
792792
</difference>
793-
793+
<difference>
794+
<differenceType>7012</differenceType>
795+
<className>com/google/cloud/spanner/ResultSet</className>
796+
<method>boolean initiateStreaming(com.google.cloud.spanner.AsyncResultSet$StreamMessageListener)</method>
797+
</difference>
794798
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -768,8 +768,9 @@ ResultSet executeQueryInternalWithOptions(
768768
rpc.getExecuteQueryRetrySettings(),
769769
rpc.getExecuteQueryRetryableCodes()) {
770770
@Override
771-
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken,
772-
AsyncResultSet.StreamMessageListener streamListener) {
771+
CloseableIterator<PartialResultSet> startStream(
772+
@Nullable ByteString resumeToken,
773+
AsyncResultSet.StreamMessageListener streamListener) {
773774
GrpcStreamIterator stream =
774775
new GrpcStreamIterator(statement, prefetchChunks, cancelQueryWhenClientIsClosed);
775776
stream.registerListener(streamListener);
@@ -961,8 +962,9 @@ ResultSet readInternalWithOptions(
961962
rpc.getReadRetrySettings(),
962963
rpc.getReadRetryableCodes()) {
963964
@Override
964-
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken,
965-
AsyncResultSet.StreamMessageListener streamListener) {
965+
CloseableIterator<PartialResultSet> startStream(
966+
@Nullable ByteString resumeToken,
967+
AsyncResultSet.StreamMessageListener streamListener) {
966968
GrpcStreamIterator stream =
967969
new GrpcStreamIterator(prefetchChunks, cancelQueryWhenClientIsClosed);
968970
stream.registerListener(streamListener);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java

+9-6
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import com.google.api.core.ApiFuture;
2020
import com.google.common.base.Function;
2121
import com.google.spanner.v1.PartialResultSet;
22-
2322
import java.util.List;
2423
import java.util.concurrent.ExecutionException;
2524
import java.util.concurrent.Executor;
@@ -227,16 +226,20 @@ interface ReadyCallback {
227226
<T> List<T> toList(Function<StructReader, T> transformer) throws SpannerException;
228227

229228
/**
230-
* An interface to register the listener for streaming gRPC request. It will be called when a chunk is received
231-
* from gRPC streaming call.
229+
* An interface to register the listener for streaming gRPC request. It will be called when a
230+
* chunk is received from gRPC streaming call.
232231
*/
233232
interface StreamMessageListener {
234-
void onStreamMessage(PartialResultSet partialResultSet, int prefetchChunks, int currentBufferSize, StreamMessageRequestor streamMessageRequestor);
233+
void onStreamMessage(
234+
PartialResultSet partialResultSet,
235+
int prefetchChunks,
236+
int currentBufferSize,
237+
StreamMessageRequestor streamMessageRequestor);
235238
}
236239

237240
/**
238-
* An interface to request more messages from the gRPC streaming call. It will be implemented by the class which has access
239-
* to SpannerRpc.StreamingCall object
241+
* An interface to request more messages from the gRPC streaming call. It will be implemented by
242+
* the class which has access to SpannerRpc.StreamingCall object
240243
*/
241244
interface StreamMessageRequestor {
242245
void requestMessages(int numOfMessages);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

+28-17
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import com.google.spanner.v1.PartialResultSet;
3232
import com.google.spanner.v1.ResultSetMetadata;
3333
import com.google.spanner.v1.ResultSetStats;
34-
3534
import java.util.Collection;
3635
import java.util.LinkedList;
3736
import java.util.List;
@@ -40,10 +39,10 @@
4039
import java.util.logging.Logger;
4140

4241
/** Default implementation for {@link AsyncResultSet}. */
43-
class AsyncResultSetImpl extends ForwardingStructReader implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener {
42+
class AsyncResultSetImpl extends ForwardingStructReader
43+
implements ListenableAsyncResultSet, AsyncResultSet.StreamMessageListener {
4444
private static final Logger log = Logger.getLogger(AsyncResultSetImpl.class.getName());
4545

46-
4746
/** State of an {@link AsyncResultSetImpl}. */
4847
private enum State {
4948
INITIALIZED,
@@ -112,6 +111,9 @@ private enum State {
112111

113112
private State state = State.INITIALIZED;
114113

114+
/** This variable indicates that produce rows thread is initiated */
115+
private volatile boolean produceRowsInitiated;
116+
115117
/**
116118
* This variable indicates whether all the results from the underlying result set have been read.
117119
*/
@@ -458,7 +460,7 @@ private class InitiateStreamingRunnable implements Runnable {
458460
@Override
459461
public void run() {
460462
try {
461-
if(!initiateStreaming(AsyncResultSetImpl.this)) {
463+
if (!initiateStreaming(AsyncResultSetImpl.this)) {
462464
initiateProduceRows();
463465
}
464466
} catch (SpannerException e) {
@@ -489,7 +491,10 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
489491

490492
private void initiateProduceRows() {
491493
this.service.execute(new ProduceRowsRunnable());
492-
this.state = State.RUNNING;
494+
if (this.state == State.IN_PROGRESS) {
495+
this.state = State.RUNNING;
496+
}
497+
produceRowsInitiated = true;
493498
}
494499

495500
Future<Void> getResult() {
@@ -504,7 +509,6 @@ public void cancel() {
504509
"cannot cancel a result set without a callback");
505510
state = State.CANCELLED;
506511
pausedLatch.countDown();
507-
this.result.setException(CANCELLED_EXCEPTION);
508512
}
509513
}
510514

@@ -625,18 +629,25 @@ public Struct getCurrentRowAsStruct() {
625629
}
626630

627631
@Override
628-
public void onStreamMessage(PartialResultSet partialResultSet, int prefetchChunks, int currentBufferSize, StreamMessageRequestor streamMessageRequestor) {
632+
public void onStreamMessage(
633+
PartialResultSet partialResultSet,
634+
int prefetchChunks,
635+
int currentBufferSize,
636+
StreamMessageRequestor streamMessageRequestor) {
629637
synchronized (monitor) {
630-
if (state == State.IN_PROGRESS) {
631-
// if PartialResultSet contains resume token or buffer size is more than configured size or we have reached
632-
// end of stream, we can start the thread
633-
boolean startJobThread = !partialResultSet.getResumeToken().isEmpty()
634-
|| currentBufferSize > prefetchChunks || partialResultSet == GrpcStreamIterator.END_OF_STREAM;
635-
if (startJobThread){
636-
initiateProduceRows();
637-
} else {
638-
streamMessageRequestor.requestMessages(1);
639-
}
638+
if (produceRowsInitiated) {
639+
return;
640+
}
641+
// if PartialResultSet contains resume token or buffer size is more than configured size or
642+
// we have reached end of stream, we can start the thread
643+
boolean startJobThread =
644+
!partialResultSet.getResumeToken().isEmpty()
645+
|| currentBufferSize > prefetchChunks
646+
|| partialResultSet == GrpcStreamIterator.END_OF_STREAM;
647+
if (startJobThread || state != State.IN_PROGRESS) {
648+
initiateProduceRows();
649+
} else {
650+
streamMessageRequestor.requestMessages(1);
640651
}
641652
}
642653
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,6 @@ public ResultSetMetadata getMetadata() {
105105

106106
@Override
107107
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
108-
return delegate.get().initiateStreaming(streamMessageListener);
108+
return delegate.get().initiateStreaming(streamMessageListener);
109109
}
110110
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@
2323
import com.google.common.collect.AbstractIterator;
2424
import com.google.common.util.concurrent.Uninterruptibles;
2525
import com.google.spanner.v1.PartialResultSet;
26-
import org.threeten.bp.Duration;
27-
28-
import javax.annotation.Nullable;
2926
import java.util.Optional;
3027
import java.util.concurrent.BlockingQueue;
3128
import java.util.concurrent.LinkedBlockingQueue;
3229
import java.util.concurrent.TimeUnit;
3330
import java.util.logging.Level;
3431
import java.util.logging.Logger;
32+
import javax.annotation.Nullable;
33+
import org.threeten.bp.Duration;
3534

3635
/** Adapts a streaming read/query call into an iterator over partial result sets. */
3736
@VisibleForTesting
@@ -199,7 +198,7 @@ public boolean cancelQueryWhenClientIsClosed() {
199198
}
200199

201200
private void onStreamMessage(PartialResultSet partialResultSet) {
202-
Optional.ofNullable(streamMessageListener).ifPresent(
203-
sl -> sl.onStreamMessage(partialResultSet, prefetchChunks, stream.size(), this));
201+
Optional.ofNullable(streamMessageListener)
202+
.ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks, stream.size(), this));
204203
}
205204
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java

+4
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ default ResultSetMetadata getMetadata() {
8383
throw new UnsupportedOperationException("Method should be overridden");
8484
}
8585

86+
/**
87+
* Returns the {@link boolean} for this {@link ResultSet}. This method will be used by
88+
* AsyncResultSet to initiate gRPC streaming
89+
*/
8690
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
8791
return false;
8892
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResumableStreamIterator.java

+14-15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@
1616

1717
package com.google.cloud.spanner;
1818

19+
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
20+
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
21+
import static com.google.common.base.Preconditions.checkArgument;
22+
import static com.google.common.base.Preconditions.checkNotNull;
23+
1924
import com.google.api.client.util.BackOff;
2025
import com.google.api.client.util.ExponentialBackOff;
2126
import com.google.api.gax.grpc.GrpcStatusCode;
@@ -30,8 +35,6 @@
3035
import com.google.spanner.v1.PartialResultSet;
3136
import io.grpc.Context;
3237
import io.opentelemetry.api.common.Attributes;
33-
34-
import javax.annotation.Nullable;
3538
import java.io.IOException;
3639
import java.util.LinkedList;
3740
import java.util.Objects;
@@ -41,11 +44,7 @@
4144
import java.util.concurrent.TimeUnit;
4245
import java.util.logging.Level;
4346
import java.util.logging.Logger;
44-
45-
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerException;
46-
import static com.google.cloud.spanner.SpannerExceptionFactory.newSpannerExceptionForCancellation;
47-
import static com.google.common.base.Preconditions.checkArgument;
48-
import static com.google.common.base.Preconditions.checkNotNull;
47+
import javax.annotation.Nullable;
4948

5049
/**
5150
* Wraps an iterator over partial result sets, supporting resuming RPCs on error. This class keeps
@@ -198,8 +197,8 @@ public void execute(Runnable command) {
198197
}
199198
}
200199

201-
abstract CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken,
202-
AsyncResultSet.StreamMessageListener streamMessageListener);
200+
abstract CloseableIterator<PartialResultSet> startStream(
201+
@Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener);
203202

204203
/**
205204
* Prepares the iterator for a retry on a different gRPC channel. Returns true if that is
@@ -226,7 +225,7 @@ public boolean isWithBeginTransaction() {
226225
@Override
227226
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
228227
this.streamMessageListener = streamMessageListener;
229-
eagerStartStreaming();
228+
startGrpcStreaming();
230229
return true;
231230
}
232231

@@ -236,7 +235,7 @@ protected PartialResultSet computeNext() {
236235
Context context = Context.current();
237236
while (true) {
238237
// Eagerly start stream before consuming any buffered items.
239-
eagerStartStreaming();
238+
startGrpcStreaming();
240239
// Buffer contains items up to a resume token or has reached capacity: flush.
241240
if (!buffer.isEmpty()
242241
&& (finished || !safeToRetry || !buffer.getLast().getResumeToken().isEmpty())) {
@@ -315,12 +314,12 @@ && prepareIteratorForRetryOnDifferentGrpcChannel()) {
315314
}
316315
}
317316

318-
private void eagerStartStreaming() {
317+
private void startGrpcStreaming() {
319318
if (stream == null) {
320319
span.addAnnotation(
321-
"Starting/Resuming stream",
322-
"ResumeToken",
323-
resumeToken == null ? "null" : resumeToken.toStringUtf8());
320+
"Starting/Resuming stream",
321+
"ResumeToken",
322+
resumeToken == null ? "null" : resumeToken.toStringUtf8());
324323
try (IScope scope = tracer.withSpan(span)) {
325324
// When start a new stream set the Span as current to make the gRPC Span a child of
326325
// this Span.

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -289,7 +289,8 @@ public boolean next() throws SpannerException {
289289
}
290290

291291
@Override
292-
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
292+
public boolean initiateStreaming(
293+
AsyncResultSet.StreamMessageListener streamMessageListener) {
293294
try {
294295
boolean ret = super.initiateStreaming(streamMessageListener);
295296
if (beforeFirst) {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java

+10-3
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.assertFalse;
2323
import static org.junit.Assert.assertNull;
2424
import static org.junit.Assert.assertThrows;
25+
import static org.mockito.ArgumentMatchers.any;
2526
import static org.mockito.Mockito.mock;
2627
import static org.mockito.Mockito.when;
2728

@@ -32,6 +33,7 @@
3233
import com.google.cloud.spanner.AsyncResultSet.ReadyCallback;
3334
import com.google.common.base.Function;
3435
import com.google.common.collect.Range;
36+
import com.google.spanner.v1.PartialResultSet;
3537
import java.util.List;
3638
import java.util.concurrent.BlockingDeque;
3739
import java.util.concurrent.CountDownLatch;
@@ -48,8 +50,6 @@
4850
import org.junit.Test;
4951
import org.junit.runner.RunWith;
5052
import org.junit.runners.JUnit4;
51-
import org.mockito.Mock;
52-
import org.mockito.Mockito;
5353
import org.mockito.invocation.InvocationOnMock;
5454
import org.mockito.stubbing.Answer;
5555

@@ -390,6 +390,13 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled()
390390

391391
try (AsyncResultSetImpl rs =
392392
new AsyncResultSetImpl(simpleProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
393+
394+
when(delegate.initiateStreaming(any(AsyncResultSet.StreamMessageListener.class)))
395+
.thenAnswer(
396+
answer -> {
397+
rs.onStreamMessage(PartialResultSet.newBuilder().build(), 4, 1, null);
398+
return null;
399+
});
393400
callbackResult =
394401
rs.setCallback(
395402
executor,
@@ -402,7 +409,7 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled()
402409

403410
SpannerException exception = assertThrows(SpannerException.class, () -> get(callbackResult));
404411
assertEquals(ErrorCode.CANCELLED, exception.getErrorCode());
405-
assertEquals(0, callbackCounter.get());
412+
assertEquals(1, callbackCounter.get());
406413
}
407414
}
408415

google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResumableStreamIteratorTest.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,8 @@
6464
public class ResumableStreamIteratorTest {
6565
interface Starter {
6666
AbstractResultSet.CloseableIterator<PartialResultSet> startStream(
67-
@Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener);
67+
@Nullable ByteString resumeToken,
68+
AsyncResultSet.StreamMessageListener streamMessageListener);
6869
}
6970

7071
interface ResultSetStream {
@@ -164,7 +165,8 @@ private void initWithLimit(int maxBufferSize) {
164165
SpannerStubSettings.newBuilder().executeStreamingSqlSettings().getRetryableCodes()) {
165166
@Override
166167
AbstractResultSet.CloseableIterator<PartialResultSet> startStream(
167-
@Nullable ByteString resumeToken, AsyncResultSet.StreamMessageListener streamMessageListener) {
168+
@Nullable ByteString resumeToken,
169+
AsyncResultSet.StreamMessageListener streamMessageListener) {
168170
return starter.startStream(resumeToken, null);
169171
}
170172
};

0 commit comments

Comments
 (0)