Skip to content

Commit c1dac83

Browse files
authored
fix: fix interrupt spiral in grpc ReadObject drainQueue (#2850)
If our thread is interrupted while attempting to drainQueue poll will throw an InterruptedException, instead of setting the flag back on the thread immediately we need to defer setting it until we complete our draining. If we don't defer setting it, we can never actually drain our queue.
1 parent ca6e153 commit c1dac83

File tree

1 file changed

+32
-25
lines changed

1 file changed

+32
-25
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java

+32-25
Original file line numberDiff line numberDiff line change
@@ -234,35 +234,42 @@ public void close() throws IOException {
234234
}
235235

236236
private void drainQueue() throws IOException {
237-
IOException ioException = null;
238-
while (queue.nonEmpty()) {
239-
try {
240-
java.lang.Object queueValue = queue.poll();
241-
if (queueValue instanceof ReadObjectResponse) {
242-
ReadObjectResponse resp = (ReadObjectResponse) queueValue;
243-
ResponseContentLifecycleHandle handle = rclm.get(resp);
244-
handle.close();
245-
} else if (queueValue == EOF_MARKER || queueValue instanceof Throwable) {
246-
break;
247-
}
248-
} catch (IOException e) {
249-
if (ioException == null) {
250-
ioException = e;
251-
} else if (ioException != e) {
252-
ioException.addSuppressed(e);
237+
boolean shouldInterupt = false;
238+
try {
239+
IOException ioException = null;
240+
while (queue.nonEmpty()) {
241+
try {
242+
java.lang.Object queueValue = queue.poll();
243+
if (queueValue instanceof ReadObjectResponse) {
244+
ReadObjectResponse resp = (ReadObjectResponse) queueValue;
245+
ResponseContentLifecycleHandle handle = rclm.get(resp);
246+
handle.close();
247+
} else if (queueValue == EOF_MARKER || queueValue instanceof Throwable) {
248+
break;
249+
}
250+
} catch (IOException e) {
251+
if (ioException == null) {
252+
ioException = e;
253+
} else if (ioException != e) {
254+
ioException.addSuppressed(e);
255+
}
256+
} catch (InterruptedException e) {
257+
shouldInterupt = true;
258+
if (ioException == null) {
259+
ioException = new InterruptedIOException();
260+
} else {
261+
ioException.addSuppressed(e);
262+
}
253263
}
254-
} catch (InterruptedException e) {
264+
}
265+
if (ioException != null) {
266+
throw ioException;
267+
}
268+
} finally {
269+
if (shouldInterupt) {
255270
Thread.currentThread().interrupt();
256-
if (ioException == null) {
257-
ioException = new InterruptedIOException();
258-
} else {
259-
ioException.addSuppressed(e);
260-
}
261271
}
262272
}
263-
if (ioException != null) {
264-
throw ioException;
265-
}
266273
}
267274

268275
ApiFuture<Object> getResult() {

0 commit comments

Comments
 (0)