Skip to content

Commit b70edc3

Browse files
Addressed comments
1 parent 56e6ba9 commit b70edc3

File tree

4 files changed

+9
-48
lines changed

4 files changed

+9
-48
lines changed

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSet.java

+1-12
Original file line numberDiff line numberDiff line change
@@ -230,17 +230,6 @@ interface ReadyCallback {
230230
* chunk is received from gRPC streaming call.
231231
*/
232232
interface StreamMessageListener {
233-
void onStreamMessage(
234-
PartialResultSet partialResultSet,
235-
boolean bufferIsFull,
236-
StreamMessageRequestor streamMessageRequestor);
237-
}
238-
239-
/**
240-
* An interface to request more messages from the gRPC streaming call. It will be implemented by
241-
* the class which has access to SpannerRpc.StreamingCall object
242-
*/
243-
interface StreamMessageRequestor {
244-
void requestMessages(int numOfMessages);
233+
void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull);
245234
}
246235
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

+1-6
Original file line numberDiff line numberDiff line change
@@ -637,10 +637,7 @@ public Struct getCurrentRowAsStruct() {
637637
}
638638

639639
@Override
640-
public void onStreamMessage(
641-
PartialResultSet partialResultSet,
642-
boolean bufferIsFull,
643-
StreamMessageRequestor streamMessageRequestor) {
640+
public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsFull) {
644641
synchronized (monitor) {
645642
if (produceRowsInitiated) {
646643
return;
@@ -653,8 +650,6 @@ public void onStreamMessage(
653650
|| partialResultSet == GrpcStreamIterator.END_OF_STREAM;
654651
if (startJobThread || state != State.STREAMING_INITIALIZED) {
655652
initiateProduceRows();
656-
} else {
657-
streamMessageRequestor.requestMessages(1);
658653
}
659654
}
660655
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@
3535
/** Adapts a streaming read/query call into an iterator over partial result sets. */
3636
@VisibleForTesting
3737
class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
38-
implements CloseableIterator<PartialResultSet>, AsyncResultSet.StreamMessageRequestor {
38+
implements CloseableIterator<PartialResultSet> {
3939
private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName());
4040
public static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();
4141
private AsyncResultSet.StreamMessageListener streamMessageListener;
@@ -146,11 +146,6 @@ private void addToStream(PartialResultSet results) {
146146
onStreamMessage(results);
147147
}
148148

149-
@Override
150-
public void requestMessages(int numOfMessages) {
151-
call.request(numOfMessages);
152-
}
153-
154149
private class ConsumerImpl implements SpannerRpc.ResultStreamConsumer {
155150
private final boolean cancelQueryWhenClientIsClosed;
156151

@@ -199,7 +194,6 @@ public boolean cancelQueryWhenClientIsClosed() {
199194

200195
private void onStreamMessage(PartialResultSet partialResultSet) {
201196
Optional.ofNullable(streamMessageListener)
202-
.ifPresent(
203-
sl -> sl.onStreamMessage(partialResultSet, prefetchChunks >= stream.size(), this));
197+
.ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks >= stream.size()));
204198
}
205199
}

google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncResultSetImplTest.java

+5-22
Original file line numberDiff line numberDiff line change
@@ -398,7 +398,7 @@ public void testCallbackIsNotCalledWhilePausedAndCanceled()
398398
when(delegate.initiateStreaming(any(AsyncResultSet.StreamMessageListener.class)))
399399
.thenAnswer(
400400
answer -> {
401-
rs.onStreamMessage(PartialResultSet.newBuilder().build(), false, null);
401+
rs.onStreamMessage(PartialResultSet.newBuilder().build(), false);
402402
return null;
403403
});
404404
callbackResult =
@@ -517,26 +517,19 @@ public void testOnStreamMessageWhenResumeTokenIsPresent() {
517517
StreamingResultSet delegate = mock(StreamingResultSet.class);
518518
try (AsyncResultSetImpl rs =
519519
new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
520-
AsyncResultSet.StreamMessageRequestor streamMessageRequestor =
521-
Mockito.mock(AsyncResultSet.StreamMessageRequestor.class);
522520
// Marking Streaming as supported
523521
Mockito.when(
524522
delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class)))
525523
.thenReturn(true);
526524

527525
rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE);
528526
rs.onStreamMessage(
529-
PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(),
530-
false,
531-
streamMessageRequestor);
532-
Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1));
527+
PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), false);
533528

534529
rs.onStreamMessage(
535530
PartialResultSet.newBuilder().setResumeToken(ByteString.copyFromUtf8("test")).build(),
536-
false,
537-
streamMessageRequestor);
531+
false);
538532
Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any());
539-
Mockito.verify(streamMessageRequestor, times(1)).requestMessages(Mockito.eq(1));
540533
}
541534
}
542535

@@ -545,20 +538,15 @@ public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() {
545538
StreamingResultSet delegate = mock(StreamingResultSet.class);
546539
try (AsyncResultSetImpl rs =
547540
new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
548-
AsyncResultSet.StreamMessageRequestor streamMessageRequestor =
549-
Mockito.mock(AsyncResultSet.StreamMessageRequestor.class);
550541
// Marking Streaming as supported
551542
Mockito.when(
552543
delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class)))
553544
.thenReturn(true);
554545

555546
rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE);
556547
rs.onStreamMessage(
557-
PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(),
558-
true,
559-
streamMessageRequestor);
548+
PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), true);
560549
Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any());
561-
Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1));
562550
}
563551
}
564552

@@ -567,8 +555,6 @@ public void testOnStreamMessageWhenAsyncResultIsCancelled() {
567555
StreamingResultSet delegate = mock(StreamingResultSet.class);
568556
try (AsyncResultSetImpl rs =
569557
new AsyncResultSetImpl(mockedProvider, delegate, AsyncResultSetImpl.DEFAULT_BUFFER_SIZE)) {
570-
AsyncResultSet.StreamMessageRequestor streamMessageRequestor =
571-
Mockito.mock(AsyncResultSet.StreamMessageRequestor.class);
572558
// Marking Streaming as supported
573559
Mockito.when(
574560
delegate.initiateStreaming(Mockito.any(AsyncResultSet.StreamMessageListener.class)))
@@ -577,11 +563,8 @@ public void testOnStreamMessageWhenAsyncResultIsCancelled() {
577563
rs.setCallback(Executors.newSingleThreadExecutor(), ignored -> CallbackResponse.DONE);
578564
rs.cancel();
579565
rs.onStreamMessage(
580-
PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(),
581-
false,
582-
streamMessageRequestor);
566+
PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), false);
583567
Mockito.verify(mockedProvider.getExecutor(), times(2)).execute(Mockito.any());
584-
Mockito.verify(streamMessageRequestor, times(0)).requestMessages(Mockito.eq(1));
585568
}
586569
}
587570
}

0 commit comments

Comments
 (0)