Skip to content

Commit fd061e1

Browse files
Addressed comments
1 parent 6bbfb9a commit fd061e1

7 files changed

+19
-46
lines changed

google-cloud-spanner/clirr-ignored-differences.xml

+3-3
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@
758758
<className>com/google/cloud/spanner/connection/Connection</className>
759759
<method>boolean isKeepTransactionAlive()</method>
760760
</difference>
761-
761+
762762
<!-- Automatic DML batching -->
763763
<difference>
764764
<differenceType>7012</differenceType>
@@ -790,5 +790,5 @@
790790
<className>com/google/cloud/spanner/connection/Connection</className>
791791
<method>boolean isAutoBatchDmlUpdateCountVerification()</method>
792792
</difference>
793-
794-
</differences>
793+
794+
</differences>

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java

+4
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@ interface CloseableIterator<T> extends Iterator<T> {
151151

152152
boolean isWithBeginTransaction();
153153

154+
/**
155+
* @param streamMessageListener A class object which implements StreamMessageListener
156+
* @return true if streaming is supported by the iterator, otherwise false
157+
*/
154158
default boolean initiateStreaming(AsyncResultSet.StreamMessageListener streamMessageListener) {
155159
return false;
156160
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java

+6-7
Original file line numberDiff line numberDiff line change
@@ -467,13 +467,12 @@ public void run() {
467467
// GrpcResultSet).
468468
// Those result sets will trigger initiateProduceRows() when the first results are received.
469469
// Non-streaming result sets do not trigger this callback, and for those result sets, we
470-
// need to eagerly
471-
// start the ProduceRowsRunnable.
470+
// need to eagerly start the ProduceRowsRunnable.
472471
if (!initiateStreaming(AsyncResultSetImpl.this)) {
473472
initiateProduceRows();
474473
}
475-
} catch (SpannerException e) {
476-
executionException = e;
474+
} catch (Throwable exception) {
475+
executionException = SpannerExceptionFactory.asSpannerException(exception);
477476
initiateProduceRows();
478477
}
479478
}
@@ -499,11 +498,11 @@ public ApiFuture<Void> setCallback(Executor exec, ReadyCallback cb) {
499498
}
500499

501500
private void initiateProduceRows() {
502-
this.service.execute(new ProduceRowsRunnable());
503501
if (this.state == State.STREAMING_INITIALIZED) {
504502
this.state = State.RUNNING;
505503
}
506504
produceRowsInitiated = true;
505+
this.service.execute(new ProduceRowsRunnable());
507506
}
508507

509508
Future<Void> getResult() {
@@ -642,8 +641,8 @@ public void onStreamMessage(PartialResultSet partialResultSet, boolean bufferIsF
642641
if (produceRowsInitiated) {
643642
return;
644643
}
645-
// if PartialResultSet contains resume token or buffer size is ful or
646-
// we have reached end of stream, we can start the thread
644+
// if PartialResultSet contains a resume token or buffer size is full, or
645+
// we have reached the end of the stream, we can start the thread.
647646
boolean startJobThread =
648647
!partialResultSet.getResumeToken().isEmpty()
649648
|| bufferIsFull

google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcStreamIterator.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,12 @@
3737
class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
3838
implements CloseableIterator<PartialResultSet> {
3939
private static final Logger logger = Logger.getLogger(GrpcStreamIterator.class.getName());
40-
public static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();
40+
static final PartialResultSet END_OF_STREAM = PartialResultSet.newBuilder().build();
4141
private AsyncResultSet.StreamMessageListener streamMessageListener;
4242

4343
private final ConsumerImpl consumer;
4444
private final BlockingQueue<PartialResultSet> stream;
4545
private final Statement statement;
46-
private final int prefetchChunks;
4746

4847
private SpannerRpc.StreamingCall call;
4948
private volatile boolean withBeginTransaction;
@@ -60,10 +59,9 @@ class GrpcStreamIterator extends AbstractIterator<PartialResultSet>
6059
GrpcStreamIterator(
6160
Statement statement, int prefetchChunks, boolean cancelQueryWhenClientIsClosed) {
6261
this.statement = statement;
63-
this.prefetchChunks = prefetchChunks;
6462
this.consumer = new ConsumerImpl(cancelQueryWhenClientIsClosed);
6563
// One extra to allow for END_OF_STREAM message.
66-
this.stream = new LinkedBlockingQueue<>((prefetchChunks * 2) + 1);
64+
this.stream = new LinkedBlockingQueue<>(prefetchChunks + 1);
6765
}
6866

6967
protected final SpannerRpc.ResultStreamConsumer consumer() {
@@ -194,6 +192,6 @@ public boolean cancelQueryWhenClientIsClosed() {
194192

195193
private void onStreamMessage(PartialResultSet partialResultSet) {
196194
Optional.ofNullable(streamMessageListener)
197-
.ifPresent(sl -> sl.onStreamMessage(partialResultSet, prefetchChunks >= stream.size()));
195+
.ifPresent(sl -> sl.onStreamMessage(partialResultSet, stream.remainingCapacity() <= 1));
198196
}
199197
}

google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionPool.java

-28
Original file line numberDiff line numberDiff line change
@@ -288,34 +288,6 @@ public boolean next() throws SpannerException {
288288
}
289289
}
290290

291-
@Override
292-
public boolean initiateStreaming(
293-
AsyncResultSet.StreamMessageListener streamMessageListener) {
294-
try {
295-
boolean streamInitiated = super.initiateStreaming(streamMessageListener);
296-
if (!streamInitiated) {
297-
synchronized (lock) {
298-
session.get().markUsed();
299-
sessionUsedForQuery = true;
300-
}
301-
}
302-
if (!streamInitiated && isSingleUse) {
303-
close();
304-
}
305-
return streamInitiated;
306-
} catch (SessionNotFoundException e) {
307-
throw e;
308-
} catch (SpannerException e) {
309-
synchronized (lock) {
310-
if (!closed && isSingleUse) {
311-
session.get().setLastException(e);
312-
AutoClosingReadContext.this.close();
313-
}
314-
}
315-
throw e;
316-
}
317-
}
318-
319291
private boolean internalNext() {
320292
try {
321293
boolean ret = super.next();

google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingResultSet.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.google.cloud.spanner;
1818

1919
/** Streaming implementation of ResultSet that supports streaming of chunks */
20-
public interface StreamingResultSet extends ResultSet {
20+
interface StreamingResultSet extends ResultSet {
2121

2222
/**
2323
* Returns the {@link boolean} for this {@link ResultSet}. This method will be used by

google-cloud-spanner/src/main/java/com/google/cloud/spanner/StreamingUtil.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616

1717
package com.google.cloud.spanner;
1818

19-
public class StreamingUtil {
20-
public static boolean initiateStreaming(
19+
class StreamingUtil {
20+
static boolean initiateStreaming(
2121
ResultSet resultSet, AsyncResultSet.StreamMessageListener streamMessageListener) {
2222
if (resultSet instanceof StreamingResultSet) {
2323
return ((StreamingResultSet) resultSet).initiateStreaming(streamMessageListener);

0 commit comments

Comments
 (0)