Skip to content

Commit 7dba13c

Browse files
authored
fix: update retry lifecycle when attempting to decompress a gzip object (#2840)
If the initial response failed with a retryable error and the error should be retried, it wasn't. It would only retry for reading bytes after the initial response had been received.
1 parent 8b1bb43 commit 7dba13c

File tree

3 files changed

+65
-31
lines changed

3 files changed

+65
-31
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java

+15-11
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@
2323
import com.google.api.client.http.HttpResponse;
2424
import com.google.api.client.http.HttpResponseException;
2525
import com.google.api.core.SettableApiFuture;
26+
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
2627
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2728
import com.google.api.services.storage.Storage;
2829
import com.google.api.services.storage.Storage.Objects;
2930
import com.google.api.services.storage.Storage.Objects.Get;
3031
import com.google.api.services.storage.model.StorageObject;
31-
import com.google.cloud.BaseServiceException;
3232
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
3333
import com.google.cloud.storage.spi.v1.StorageRpc;
3434
import com.google.common.annotations.VisibleForTesting;
@@ -84,7 +84,17 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann
8484
this.storage = storage;
8585
this.result = result;
8686
this.options = options;
87-
this.resultRetryAlgorithm = resultRetryAlgorithm;
87+
this.resultRetryAlgorithm =
88+
new BasicResultRetryAlgorithm<Object>() {
89+
@Override
90+
public boolean shouldRetry(Throwable previousThrowable, Object previousResponse) {
91+
boolean shouldRetry = resultRetryAlgorithm.shouldRetry(previousThrowable, null);
92+
if (previousThrowable != null && !shouldRetry) {
93+
result.setException(previousThrowable);
94+
}
95+
return shouldRetry;
96+
}
97+
};
8898
this.open = true;
8999
this.returnEOF = false;
90100
this.position = apiaryReadRequest.getByteRangeSpec().beginOffset();
@@ -210,17 +220,11 @@ private ScatteringByteChannel open() {
210220
throw new StorageException(404, "Failure while trying to resume download", e);
211221
}
212222
}
213-
StorageException translate = StorageException.translate(e);
214-
result.setException(translate);
215-
throw translate;
223+
throw StorageException.translate(e);
216224
} catch (IOException e) {
217-
StorageException translate = StorageException.translate(e);
218-
result.setException(translate);
219-
throw translate;
225+
throw StorageException.translate(e);
220226
} catch (Throwable t) {
221-
BaseServiceException coalesce = StorageException.coalesce(t);
222-
result.setException(coalesce);
223-
throw coalesce;
227+
throw StorageException.coalesce(t);
224228
}
225229
}
226230

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java

+13-4
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.api.client.http.HttpStatusCodes;
2020
import com.google.api.core.ApiFuture;
2121
import com.google.api.core.SettableApiFuture;
22+
import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
2223
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2324
import com.google.api.gax.rpc.ApiExceptions;
2425
import com.google.api.gax.rpc.ServerStreamingCallable;
@@ -87,7 +88,18 @@ final class GapicUnbufferedReadableByteChannel
8788
this.blobOffset = req.getReadOffset();
8889
this.rclm = rclm;
8990
this.retryingDeps = retryingDependencies;
90-
this.alg = alg;
91+
this.alg =
92+
new BasicResultRetryAlgorithm<java.lang.Object>() {
93+
@Override
94+
public boolean shouldRetry(
95+
Throwable previousThrowable, java.lang.Object previousResponse) {
96+
boolean shouldRetry = alg.shouldRetry(previousThrowable, null);
97+
if (previousThrowable != null && !shouldRetry) {
98+
result.setException(previousThrowable);
99+
}
100+
return shouldRetry;
101+
}
102+
};
91103
// The reasoning for 2 elements below allow for a single response and the EOF/error signal
92104
// from onComplete or onError. Same thing com.google.api.gax.rpc.QueuingResponseObserver does.
93105
this.queue = new SimpleBlockingQueue<>(2);
@@ -337,9 +349,6 @@ protected void onErrorImpl(Throwable t) {
337349
}
338350
if (!open.isDone()) {
339351
open.setException(t);
340-
if (!alg.shouldRetry(t, null)) {
341-
result.setException(StorageException.coalesce(t));
342-
}
343352
}
344353
try {
345354
queue.offer(t);

google-cloud-storage/src/test/java/com/google/cloud/storage/conformance/retry/RpcMethodMappings.java

+37-16
Original file line numberDiff line numberDiff line change
@@ -1283,20 +1283,18 @@ private static void get(ArrayList<RpcMethodMapping> a) {
12831283
(ctx, c) ->
12841284
ctx.peek(
12851285
state -> {
1286-
try {
1287-
ReadChannel reader =
1288-
ctx.getStorage().reader(ctx.getState().getBlob().getBlobId());
1289-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
1286+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
1287+
try (ReadChannel reader =
1288+
ctx.getStorage().reader(ctx.getState().getBlob().getBlobId())) {
12901289
WritableByteChannel write = Channels.newChannel(baos);
12911290
ByteStreams.copy(reader, write);
1292-
1293-
assertThat(xxd(baos.toByteArray()))
1294-
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
12951291
} catch (IOException e) {
12961292
if (e.getCause() instanceof BaseServiceException) {
12971293
throw e.getCause();
12981294
}
12991295
}
1296+
assertThat(xxd(baos.toByteArray()))
1297+
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
13001298
}))
13011299
.build());
13021300
a.add(
@@ -1305,23 +1303,46 @@ private static void get(ArrayList<RpcMethodMapping> a) {
13051303
(ctx, c) ->
13061304
ctx.peek(
13071305
state -> {
1308-
try {
1309-
ReadChannel reader =
1310-
ctx.getStorage()
1311-
.reader(
1312-
ctx.getState().getBlob().getBlobId().getBucket(),
1313-
ctx.getState().getBlob().getBlobId().getName());
1314-
ByteArrayOutputStream baos = new ByteArrayOutputStream();
1306+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
1307+
try (ReadChannel reader =
1308+
ctx.getStorage()
1309+
.reader(
1310+
ctx.getState().getBlob().getBlobId().getBucket(),
1311+
ctx.getState().getBlob().getBlobId().getName())) {
13151312
WritableByteChannel write = Channels.newChannel(baos);
13161313
ByteStreams.copy(reader, write);
1314+
} catch (IOException e) {
1315+
if (e.getCause() instanceof BaseServiceException) {
1316+
throw e.getCause();
1317+
}
1318+
}
13171319

1318-
assertThat(xxd(baos.toByteArray()))
1319-
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
1320+
assertThat(xxd(baos.toByteArray()))
1321+
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
1322+
}))
1323+
.build());
1324+
a.add(
1325+
RpcMethodMapping.newBuilder(250, objects.get)
1326+
.withTest(
1327+
(ctx, c) ->
1328+
ctx.peek(
1329+
state -> {
1330+
ByteArrayOutputStream baos = new ByteArrayOutputStream();
1331+
try (ReadChannel reader =
1332+
ctx.getStorage()
1333+
.reader(
1334+
ctx.getState().getBlob().getBlobId(),
1335+
BlobSourceOption.shouldReturnRawInputStream(false))) {
1336+
WritableByteChannel write = Channels.newChannel(baos);
1337+
ByteStreams.copy(reader, write);
13201338
} catch (IOException e) {
13211339
if (e.getCause() instanceof BaseServiceException) {
13221340
throw e.getCause();
13231341
}
13241342
}
1343+
1344+
assertThat(xxd(baos.toByteArray()))
1345+
.isEqualTo(xxd(c.getHelloWorldUtf8Bytes()));
13251346
}))
13261347
.build());
13271348
a.add(

0 commit comments

Comments
 (0)