Skip to content

Commit 56e6ba9

Browse files
Addressed comments
1 parent 2ceff50 commit 56e6ba9

11 files changed

+105
-57
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

+3-7
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@
758758
<className>com/google/cloud/spanner/connection/Connection</className>
759759
<method>boolean isKeepTransactionAlive()</method>
760760
</difference>
761-
761+
762762
<!-- Automatic DML batching -->
763763
<difference>
764764
<differenceType>7012</differenceType>
@@ -790,9 +790,5 @@
790790
<className>com/google/cloud/spanner/connection/Connection</className>
791791
<method>boolean isAutoBatchDmlUpdateCountVerification()</method>
792792
</difference>
793-
<difference>
794-
<differenceType>7012</differenceType>
795-
<className>com/google/cloud/spanner/ResultSet</className>
796-
<method>boolean initiateStreaming(com.google.cloud.spanner.AsyncResultSet$StreamMessageListener)</method>
797-
</difference>
798-
</differences>
793+
794+
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -232,8 +232,7 @@ interface ReadyCallback {
232232
interface StreamMessageListener {
233233
void onStreamMessage(
234234
PartialResultSet partialResultSet,
235-
int prefetchChunks,
236-
int currentBufferSize,
235+
boolean bufferIsFull,
237236
StreamMessageRequestor streamMessageRequestor);
238237
}
239238

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

+24-17
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@
3434
import java.util.Collection;
3535
import java.util.LinkedList;
3636
import java.util.List;
37-
import java.util.concurrent.*;
37+
import java.util.concurrent.BlockingDeque;
38+
import java.util.concurrent.CountDownLatch;
39+
import java.util.concurrent.ExecutionException;
40+
import java.util.concurrent.Executor;
41+
import java.util.concurrent.Future;
42+
import java.util.concurrent.LinkedBlockingDeque;
3843
import java.util.logging.Level;
3944
import java.util.logging.Logger;
4045

@@ -46,7 +51,7 @@ class AsyncResultSetImpl extends ForwardingStructReader
4651
/** State of an {@link AsyncResultSetImpl}. */
4752
private enum State {
4853
INITIALIZED,
49-
IN_PROGRESS,
54+
STREAMING_INITIALIZED,
5055
/** SYNC indicates that the {@link ResultSet} is used in sync pattern. */
5156
SYNC,
5257
CONSUMING,
@@ -400,7 +405,7 @@ public void run() {
400405
synchronized (monitor) {
401406
stop = cursorReturnedDoneOrException;
402407
}
403-
} catch (InterruptedException e) {
408+
} catch (Throwable e) {
404409
result.setException(e);
405410
return;
406411
}
@@ -415,15 +420,13 @@ public void run() {
415420
synchronized (monitor) {
416421
if (executionException != null) {
417422
result.setException(executionException);
418-
throw executionException;
419-
}
420-
if (state == State.CANCELLED) {
423+
} else if (state == State.CANCELLED) {
421424
result.setException(CANCELLED_EXCEPTION);
422-
throw CANCELLED_EXCEPTION;
425+
} else {
426+
result.set(null);
423427
}
424428
}
425429
}
426-
result.set(null);
427430
}
428431

429432
private void waitIfPaused() throws InterruptedException {
@@ -460,6 +463,12 @@ private class InitiateStreamingRunnable implements Runnable {
460463
@Override
461464
public void run() {
462465
try {
466+
// This method returns true if the underlying result set is a streaming result set (e.g. a
467+
// GrpcResultSet).
468+
// Those result sets will trigger initiateProduceRows() when the first results are received.
469+
// Non-streaming result sets do not trigger this callback, and for those result sets, we
470+
// need to eagerly
471+
// start the ProduceRowsRunnable.
463472
if (!initiateStreaming(AsyncResultSetImpl.this)) {
464473
initiateProduceRows();
465474
}
@@ -480,7 +489,7 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
480489

481490
// Start to fetch data and buffer these.
482491
this.result = SettableApiFuture.create();
483-
this.state = State.IN_PROGRESS;
492+
this.state = State.STREAMING_INITIALIZED;
484493
this.service.execute(new InitiateStreamingRunnable());
485494
this.executor = MoreExecutors.newSequentialExecutor(Preconditions.checkNotNull(exec));
486495
this.callback = Preconditions.checkNotNull(cb);
@@ -491,7 +500,7 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
491500

492501
private void initiateProduceRows() {
493502
this.service.execute(new ProduceRowsRunnable());
494-
if (this.state == State.IN_PROGRESS) {
503+
if (this.state == State.STREAMING_INITIALIZED) {
495504
this.state = State.RUNNING;
496505
}
497506
produceRowsInitiated = true;
@@ -607,9 +616,8 @@ public ResultSetMetadata getMetadata() {
607616
return delegateResultSet.get().getMetadata();
608617
}
609618

610-
@Override
611619
public boolean initiateStreaming(StreamMessageListener streamMessageListener) {
612-
return delegateResultSet.get().initiateStreaming(streamMessageListener);
620+
return StreamingUtil.initiateStreaming(delegateResultSet.get(), streamMessageListener);
613621
}
614622

615623
@Override
@@ -631,20 +639,19 @@ public Struct getCurrentRowAsStruct() {
631639
@Override
632640
public void onStreamMessage(
633641
PartialResultSet partialResultSet,
634-
int prefetchChunks,
635-
int currentBufferSize,
642+
boolean bufferIsFull,
636643
StreamMessageRequestor streamMessageRequestor) {
637644
synchronized (monitor) {
638645
if (produceRowsInitiated) {
639646
return;
640647
}
641-
// if PartialResultSet contains resume token or buffer size is more than configured size or
648+
// if PartialResultSet contains resume token or buffer size is ful or
642649
// we have reached end of stream, we can start the thread
643650
boolean startJobThread =
644651
!partialResultSet.getResumeToken().isEmpty()
645-
|| currentBufferSize >= prefetchChunks
652+
|| bufferIsFull
646653
|| partialResultSet == GrpcStreamIterator.END_OF_STREAM;
647-
if (startJobThread || state != State.IN_PROGRESS) {
654+
if (startJobThread || state != State.STREAMING_INITIALIZED) {
648655
initiateProduceRows();
649656
} else {
650657
streamMessageRequestor.requestMessages(1);

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ForwardingResultSet.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
import com.google.spanner.v1.ResultSetStats;
2424

2525
/** Forwarding implementation of ResultSet that forwards all calls to a delegate. */
26-
public class ForwardingResultSet extends ForwardingStructReader implements ProtobufResultSet {
26+
public class ForwardingResultSet extends ForwardingStructReader
27+
implements ProtobufResultSet, StreamingResultSet {
2728

2829
private Supplier<? extends ResultSet> delegate;
2930

@@ -105,6 +106,6 @@ public ResultSetMetadata getMetadata() {
105106

106107
@Override
107108
public boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
108-
return delegate.get().initiateStreaming(streamMessageListener);
109+
return StreamingUtil.initiateStreaming(delegate.get(), streamMessageListener);
109110
}
110111
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
import javax.annotation.Nullable;
3131

3232
@VisibleForTesting
33-
class GrpcResultSet extends AbstractResultSet<List<Object>> implements ProtobufResultSet {
33+
class GrpcResultSet extends AbstractResultSet<List<Object>>
34+
implements ProtobufResultSet, StreamingResultSet {
3435
private final GrpcValueIterator iterator;
3536
private final Listener listener;
3637
private final DecodeMode decodeMode;

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ public boolean cancelQueryWhenClientIsClosed() {
199199

200200
private void onStreamMessage(PartialResultSet partialResultSet) {
201201
Optional.ofNullable(streamMessageListener)
202-
.ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks, stream.size(), this));
202+
.ifPresent(
203+
sl -> sl.onStreamMessage(partialResultSet, prefetchChunks >= stream.size(), this));
203204
}
204205
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/ResultSet.java

-8
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,4 @@ public interface ResultSet extends AutoCloseable, StructReader {
8282
default ResultSetMetadata getMetadata() {
8383
throw new UnsupportedOperationException("Method should be overridden");
8484
}
85-
86-
/**
87-
* Returns the {@link boolean} for this {@link ResultSet}. This method will be used by
88-
* AsyncResultSet to initiate gRPC streaming
89-
*/
90-
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
91-
return false;
92-
}
9385
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

+4-5
Original file line numberDiff line numberDiff line change
@@ -292,18 +292,17 @@ public boolean next() throws SpannerException {
292292
public boolean initiateStreaming(
293293
AsyncResultSet.StreamMessageListener streamMessageListener) {
294294
try {
295-
boolean ret = super.initiateStreaming(streamMessageListener);
296-
if (beforeFirst) {
295+
boolean streamInitiated = super.initiateStreaming(streamMessageListener);
296+
if (!streamInitiated) {
297297
synchronized (lock) {
298298
session.get().markUsed();
299-
beforeFirst = false;
300299
sessionUsedForQuery = true;
301300
}
302301
}
303-
if (!ret && isSingleUse) {
302+
if (!streamInitiated && isSingleUse) {
304303
close();
305304
}
306-
return ret;
305+
return streamInitiated;
307306
} catch (SessionNotFoundException e) {
308307
throw e;
309308
} catch (SpannerException e) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
/** Streaming implementation of ResultSet that supports streaming of chunks */
20+
public interface StreamingResultSet extends ResultSet {
21+
22+
/**
23+
* Returns the {@link boolean} for this {@link ResultSet}. This method will be used by
24+
* AsyncResultSet to initiate gRPC streaming
25+
*/
26+
boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener);
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.spanner;
18+
19+
public class StreamingUtil {
20+
public static boolean initiateStreaming(
21+
ResultSet resultSet, AsyncResultSet.StreamMessageListener streamMessageListener) {
22+
if (resultSet instanceof StreamingResultSet) {
23+
return ((StreamingResultSet) resultSet).initiateStreaming(streamMessageListener);
24+
}
25+
return false;
26+
}
27+
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java

+12-14
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import static org.junit.Assert.assertNull;
2424
import static org.junit.Assert.assertThrows;
2525
import static org.mockito.ArgumentMatchers.any;
26-
import static org.mockito.Mockito.*;
26+
import static org.mockito.Mockito.mock;
27+
import static org.mockito.Mockito.times;
28+
import static org.mockito.Mockito.when;
2729

2830
import com.google.api.core.ApiFuture;
2931
import com.google.api.gax.core.ExecutorProvider;
@@ -385,7 +387,7 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
385387
public void testCallbackIsNotCalledWhilePausedAndCanceled()
386388
throws InterruptedException, ExecutionException {
387389
Executor executor = Executors.newSingleThreadExecutor();
388-
ResultSet delegate = mock(ResultSet.class);
390+
StreamingResultSet delegate = mock(StreamingResultSet.class);
389391

390392
final AtomicInteger callbackCounter = new AtomicInteger();
391393
ApiFuture<Void> callbackResult;
@@ -396,7 +398,7 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled()
396398
when(delegate.initiateStreaming(any(AsyncResultSet.StreamMessageListener.class)))
397399
.thenAnswer(
398400
answer -> {
399-
rs.onStreamMessage(PartialResultSet.newBuilder().build(), 4, 1, null);
401+
rs.onStreamMessage(PartialResultSet.newBuilder().build(), false, null);
400402
return null;
401403
});
402404
callbackResult =
@@ -512,7 +514,7 @@ public void callbackReturnsDoneBeforeEnd_shouldStopIteration() throws Exception
512514

513515
@Test
514516
public void testOnStreamMessageWhenResumeTokenIsPresent() {
515-
ResultSet delegate = mock(ResultSet.class);
517+
StreamingResultSet delegate = mock(StreamingResultSet.class);
516518
try (AsyncResultSetImpl rs =
517519
new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
518520
AsyncResultSet.StreamMessageRequestor streamMessageRequestor =
@@ -525,15 +527,13 @@ public void testOnStreamMessageWhenResumeTokenIsPresent() {
525527
rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE);
526528
rs.onStreamMessage(
527529
PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(),
528-
4,
529-
1,
530+
false,
530531
streamMessageRequestor);
531532
Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1));
532533

533534
rs.onStreamMessage(
534535
PartialResultSet.newBuilder().setResumeToken(ByteString.copyFromUtf8("test")).build(),
535-
4,
536-
2,
536+
false,
537537
streamMessageRequestor);
538538
Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any());
539539
Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1));
@@ -542,7 +542,7 @@ public void testOnStreamMessageWhenResumeTokenIsPresent() {
542542

543543
@Test
544544
public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() {
545-
ResultSet delegate = mock(ResultSet.class);
545+
StreamingResultSet delegate = mock(StreamingResultSet.class);
546546
try (AsyncResultSetImpl rs =
547547
new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
548548
AsyncResultSet.StreamMessageRequestor streamMessageRequestor =
@@ -555,8 +555,7 @@ public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() {
555555
rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE);
556556
rs.onStreamMessage(
557557
PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(),
558-
4,
559-
4,
558+
true,
560559
streamMessageRequestor);
561560
Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any());
562561
Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1));
@@ -565,7 +564,7 @@ public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() {
565564

566565
@Test
567566
public void testOnStreamMessageWhenAsyncResultIsCancelled() {
568-
ResultSet delegate = mock(ResultSet.class);
567+
StreamingResultSet delegate = mock(StreamingResultSet.class);
569568
try (AsyncResultSetImpl rs =
570569
new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
571570
AsyncResultSet.StreamMessageRequestor streamMessageRequestor =
@@ -579,8 +578,7 @@ public void testOnStreamMessageWhenAsyncResultIsCancelled() {
579578
rs.cancel();
580579
rs.onStreamMessage(
581580
PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(),
582-
1,
583-
4,
581+
false,
584582
streamMessageRequestor);
585583
Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any());
586584
Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1));

0 commit comments

Comments
 (0)