Skip to content

Commit 170a3f5

Browse files
authored
fix: update grpc WriteObject response handling to provide context when a failure happens (#2532)
1 parent 3e573f7 commit 170a3f5

File tree

5 files changed

+368
-44
lines changed

5 files changed

+368
-44
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
/**
20+
* This exception is used to preserve the caller's stacktrace when invoking an async task in a sync
21+
* context. It will be added as a suppressed exception when propagating the async exception. This
22+
* allows callers to catch ApiException thrown in an async operation, while still maintaining the
23+
* call site.
24+
*/
25+
public final class AsyncStorageTaskException extends RuntimeException {
26+
// mimic of com.google.api.gax.rpc.AsyncTaskException which doesn't have a public constructor
27+
// if that class is ever made public, make this class extend it
28+
AsyncStorageTaskException() {
29+
super("Asynchronous task failed");
30+
}
31+
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedChunkedResumableWritableByteChannel.java

+45-14
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.api.core.SettableApiFuture;
2222
import com.google.api.gax.grpc.GrpcCallContext;
2323
import com.google.api.gax.retrying.ResultRetryAlgorithm;
24+
import com.google.api.gax.rpc.ApiException;
2425
import com.google.api.gax.rpc.ApiStreamObserver;
2526
import com.google.api.gax.rpc.ClientStreamingCallable;
2627
import com.google.api.gax.rpc.OutOfRangeException;
@@ -208,14 +209,19 @@ private void flush(
208209
deps,
209210
alg,
210211
() -> {
211-
Observer observer = new Observer(content, finalizing);
212+
Observer observer = new Observer(content, finalizing, segments, internalContext);
212213
ApiStreamObserver<WriteObjectRequest> write = callable.clientStreamingCall(observer);
213214

214215
for (WriteObjectRequest message : segments) {
215216
write.onNext(message);
216217
}
217218
write.onCompleted();
218-
observer.await();
219+
try {
220+
observer.await();
221+
} catch (Throwable t) {
222+
t.addSuppressed(new AsyncStorageTaskException());
223+
throw t;
224+
}
219225
return null;
220226
},
221227
Decoder.identity());
@@ -230,13 +236,21 @@ class Observer implements ApiStreamObserver<WriteObjectResponse> {
230236

231237
private final RewindableContent content;
232238
private final boolean finalizing;
239+
private final List<WriteObjectRequest> segments;
240+
private final GrpcCallContext context;
233241

234242
private final SettableApiFuture<Void> invocationHandle;
235243
private volatile WriteObjectResponse last;
236244

237-
Observer(@Nullable RewindableContent content, boolean finalizing) {
245+
Observer(
246+
@Nullable RewindableContent content,
247+
boolean finalizing,
248+
@NonNull List<WriteObjectRequest> segments,
249+
GrpcCallContext context) {
238250
this.content = content;
239251
this.finalizing = finalizing;
252+
this.segments = segments;
253+
this.context = context;
240254
this.invocationHandle = SettableApiFuture.create();
241255
}
242256

@@ -250,10 +264,20 @@ public void onError(Throwable t) {
250264
if (t instanceof OutOfRangeException) {
251265
OutOfRangeException oore = (OutOfRangeException) t;
252266
open = false;
253-
invocationHandle.setException(
254-
ResumableSessionFailureScenario.SCENARIO_5.toStorageException());
255-
} else {
256-
invocationHandle.setException(t);
267+
StorageException storageException =
268+
ResumableSessionFailureScenario.SCENARIO_5.toStorageException(
269+
segments, null, context, oore);
270+
invocationHandle.setException(storageException);
271+
} else if (t instanceof ApiException) {
272+
// use StorageExceptions logic to translate from ApiException to our status codes ensuring
273+
// things fall in line with our retry handlers.
274+
// This is suboptimal, as it will initialize a second exception, however this is the
275+
// unusual case, and it should not cause a significant overhead given its rarity.
276+
StorageException tmp = StorageException.asStorageException((ApiException) t);
277+
StorageException storageException =
278+
ResumableSessionFailureScenario.toStorageException(
279+
tmp.getCode(), tmp.getMessage(), tmp.getReason(), segments, null, context, t);
280+
invocationHandle.setException(storageException);
257281
}
258282
}
259283

@@ -276,7 +300,8 @@ public void onCompleted() {
276300
writeCtx.getTotalSentBytes().set(persistedSize);
277301
writeCtx.getConfirmedBytes().set(persistedSize);
278302
} else {
279-
throw ResumableSessionFailureScenario.SCENARIO_7.toStorageException();
303+
throw ResumableSessionFailureScenario.SCENARIO_7.toStorageException(
304+
segments, last, context, null);
280305
}
281306
} else if (finalizing && last.hasResource()) {
282307
long totalSentBytes = writeCtx.getTotalSentBytes().get();
@@ -285,22 +310,28 @@ public void onCompleted() {
285310
writeCtx.getConfirmedBytes().set(finalSize);
286311
resultFuture.set(last);
287312
} else if (finalSize < totalSentBytes) {
288-
throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException();
313+
throw ResumableSessionFailureScenario.SCENARIO_4_1.toStorageException(
314+
segments, last, context, null);
289315
} else {
290-
throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException();
316+
throw ResumableSessionFailureScenario.SCENARIO_4_2.toStorageException(
317+
segments, last, context, null);
291318
}
292319
} else if (!finalizing && last.hasResource()) {
293-
throw ResumableSessionFailureScenario.SCENARIO_1.toStorageException();
320+
throw ResumableSessionFailureScenario.SCENARIO_1.toStorageException(
321+
segments, last, context, null);
294322
} else if (finalizing && last.hasPersistedSize()) {
295323
long totalSentBytes = writeCtx.getTotalSentBytes().get();
296324
long persistedSize = last.getPersistedSize();
297325
if (persistedSize < totalSentBytes) {
298-
throw ResumableSessionFailureScenario.SCENARIO_3.toStorageException();
326+
throw ResumableSessionFailureScenario.SCENARIO_3.toStorageException(
327+
segments, last, context, null);
299328
} else {
300-
throw ResumableSessionFailureScenario.SCENARIO_2.toStorageException();
329+
throw ResumableSessionFailureScenario.SCENARIO_2.toStorageException(
330+
segments, last, context, null);
301331
}
302332
} else {
303-
throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException();
333+
throw ResumableSessionFailureScenario.SCENARIO_0.toStorageException(
334+
segments, last, context, null);
304335
}
305336
} catch (Throwable se) {
306337
open = false;

google-cloud-storage/src/main/java/com/google/cloud/storage/ResumableSessionFailureScenario.java

+148-10
Original file line numberDiff line numberDiff line change
@@ -16,19 +16,31 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import static com.google.cloud.storage.Utils.ifNonNull;
20+
1921
import com.google.api.client.http.HttpHeaders;
2022
import com.google.api.client.http.HttpResponse;
2123
import com.google.api.client.http.HttpResponseException;
24+
import com.google.api.gax.grpc.GrpcCallContext;
25+
import com.google.api.gax.rpc.ApiException;
2226
import com.google.cloud.BaseServiceException;
2327
import com.google.cloud.storage.StorageException.IOExceptionCallable;
2428
import com.google.common.io.CharStreams;
29+
import com.google.protobuf.MessageOrBuilder;
30+
import com.google.storage.v2.ChecksummedData;
31+
import com.google.storage.v2.ObjectChecksums;
32+
import com.google.storage.v2.WriteObjectRequest;
33+
import com.google.storage.v2.WriteObjectResponse;
34+
import io.grpc.StatusRuntimeException;
2535
import java.io.IOException;
2636
import java.io.InputStreamReader;
2737
import java.util.List;
2838
import java.util.Locale;
2939
import java.util.Map;
40+
import java.util.function.Consumer;
3041
import java.util.function.Predicate;
3142
import javax.annotation.ParametersAreNonnullByDefault;
43+
import org.checkerframework.checker.nullness.qual.NonNull;
3244
import org.checkerframework.checker.nullness.qual.Nullable;
3345

3446
@ParametersAreNonnullByDefault
@@ -69,6 +81,10 @@ enum ResumableSessionFailureScenario {
6981
private static final String PREFIX_I = "\t|< ";
7082
private static final String PREFIX_O = "\t|> ";
7183
private static final String PREFIX_X = "\t| ";
84+
// define some constants for tab widths that are more compressed that the literals
85+
private static final String T1 = "\t";
86+
private static final String T2 = "\t\t";
87+
private static final String T3 = "\t\t\t";
7288

7389
private static final Predicate<String> includedHeaders =
7490
matches("Content-Length")
@@ -78,6 +94,7 @@ enum ResumableSessionFailureScenario {
7894
.or(matches("Range"))
7995
.or(startsWith("X-Goog-Stored-"))
8096
.or(matches("X-Goog-GCS-Idempotency-Token"))
97+
.or(matches("X-Goog-request-params"))
8198
.or(matches("X-GUploader-UploadID"));
8299

83100
private static final Predicate<Map.Entry<String, ?>> includeHeader =
@@ -116,8 +133,12 @@ StorageException toStorageException(
116133
return toStorageException(code, message, reason, uploadId, resp, cause, contentCallable);
117134
}
118135

119-
StorageException toStorageException() {
120-
return new StorageException(code, message, reason, null);
136+
StorageException toStorageException(
137+
@NonNull List<@NonNull WriteObjectRequest> reqs,
138+
@Nullable WriteObjectResponse resp,
139+
@NonNull GrpcCallContext context,
140+
@Nullable Throwable cause) {
141+
return toStorageException(code, message, reason, reqs, resp, context, cause);
121142
}
122143

123144
static StorageException toStorageException(
@@ -136,6 +157,102 @@ static StorageException toStorageException(
136157
return se;
137158
}
138159

160+
static StorageException toStorageException(
161+
int code,
162+
String message,
163+
@Nullable String reason,
164+
@NonNull List<@NonNull WriteObjectRequest> reqs,
165+
@Nullable WriteObjectResponse resp,
166+
@NonNull GrpcCallContext context,
167+
@Nullable Throwable cause) {
168+
final StringBuilder sb = new StringBuilder();
169+
sb.append(message);
170+
// request context
171+
Map<String, List<String>> extraHeaders = context.getExtraHeaders();
172+
recordHeadersTo(extraHeaders, PREFIX_O, sb);
173+
int length = reqs.size();
174+
for (int i = 0; i < length; i++) {
175+
if (i == 0) {
176+
sb.append("\n").append(PREFIX_O).append("[");
177+
} else {
178+
sb.append(",");
179+
}
180+
WriteObjectRequest req = reqs.get(i);
181+
sb.append("\n").append(PREFIX_O).append(T1).append(req.getClass().getName()).append("{");
182+
if (req.hasUploadId()) {
183+
sb.append("\n").append(PREFIX_O).append(T2).append("upload_id: ").append(req.getUploadId());
184+
}
185+
long writeOffset = req.getWriteOffset();
186+
if (req.hasChecksummedData()) {
187+
ChecksummedData checksummedData = req.getChecksummedData();
188+
sb.append("\n").append(PREFIX_O).append(T2);
189+
sb.append(
190+
String.format(
191+
"checksummed_data: {range: [%d:%d]",
192+
writeOffset, writeOffset + checksummedData.getContent().size()));
193+
if (checksummedData.hasCrc32C()) {
194+
sb.append(", crc32c: ").append(checksummedData.getCrc32C());
195+
}
196+
sb.append("}");
197+
} else {
198+
sb.append("\n").append(PREFIX_O).append(T2).append("write_offset: ").append(writeOffset);
199+
}
200+
if (req.getFinishWrite()) {
201+
sb.append("\n").append(PREFIX_O).append(T2).append("finish_write: true");
202+
}
203+
if (req.hasObjectChecksums()) {
204+
ObjectChecksums objectChecksums = req.getObjectChecksums();
205+
sb.append("\n").append(PREFIX_O).append(T2).append("object_checksums: ").append("{");
206+
fmt(objectChecksums, PREFIX_O, T3, sb);
207+
sb.append("\n").append(PREFIX_O).append(T2).append("}");
208+
}
209+
sb.append("\n").append(PREFIX_O).append("\t}");
210+
if (i == length - 1) {
211+
sb.append("\n").append(PREFIX_O).append("]");
212+
}
213+
}
214+
215+
sb.append("\n").append(PREFIX_X);
216+
217+
// response context
218+
if (resp != null) {
219+
sb.append("\n").append(PREFIX_I).append(resp.getClass().getName()).append("{");
220+
fmt(resp, PREFIX_I, T1, sb);
221+
sb.append("\n").append(PREFIX_I).append("}");
222+
sb.append("\n").append(PREFIX_X);
223+
}
224+
225+
if (cause != null) {
226+
if (cause instanceof ApiException) {
227+
ApiException apiException = (ApiException) cause;
228+
Throwable cause1 = apiException.getCause();
229+
if (cause1 instanceof StatusRuntimeException) {
230+
StatusRuntimeException statusRuntimeException = (StatusRuntimeException) cause1;
231+
sb.append("\n").append(PREFIX_I).append(statusRuntimeException.getStatus());
232+
ifNonNull(
233+
statusRuntimeException.getTrailers(),
234+
t -> sb.append("\n").append(PREFIX_I).append(t));
235+
} else {
236+
sb.append("\n")
237+
.append(PREFIX_I)
238+
.append("code: ")
239+
.append(apiException.getStatusCode().toString());
240+
ifNonNull(
241+
apiException.getReason(),
242+
r -> sb.append("\n").append(PREFIX_I).append("reason: ").append(r));
243+
ifNonNull(
244+
apiException.getDomain(),
245+
d -> sb.append("\n").append(PREFIX_I).append("domain: ").append(d));
246+
ifNonNull(
247+
apiException.getErrorDetails(),
248+
e -> sb.append("\n").append(PREFIX_I).append("errorDetails: ").append(e));
249+
}
250+
sb.append("\n").append(PREFIX_X);
251+
}
252+
}
253+
return new StorageException(code, sb.toString(), reason, cause);
254+
}
255+
139256
static StorageException toStorageException(
140257
int overrideCode,
141258
String message,
@@ -213,14 +330,21 @@ private static Predicate<String> startsWith(String prefix) {
213330
}
214331

215332
private static void recordHeaderTo(HttpHeaders h, String prefix, StringBuilder sb) {
216-
h.entrySet().stream()
217-
.filter(includeHeader)
218-
.forEach(
219-
e -> {
220-
String key = e.getKey();
221-
String value = headerValueToString(e.getValue());
222-
sb.append("\n").append(prefix).append(key).append(": ").append(value);
223-
});
333+
h.entrySet().stream().filter(includeHeader).forEach(writeHeaderValue(prefix, sb));
334+
}
335+
336+
private static void recordHeadersTo(
337+
Map<String, List<String>> headers, String prefix, StringBuilder sb) {
338+
headers.entrySet().stream().filter(includeHeader).forEach(writeHeaderValue(prefix, sb));
339+
}
340+
341+
private static <V> Consumer<Map.Entry<String, V>> writeHeaderValue(
342+
String prefix, StringBuilder sb) {
343+
return e -> {
344+
String key = e.getKey();
345+
String value = headerValueToString(e.getValue());
346+
sb.append("\n").append(prefix).append(key).append(": ").append(value);
347+
};
224348
}
225349

226350
private static String headerValueToString(Object o) {
@@ -233,4 +357,18 @@ private static String headerValueToString(Object o) {
233357

234358
return o.toString();
235359
}
360+
361+
private static void fmt(
362+
MessageOrBuilder msg,
363+
@SuppressWarnings("SameParameterValue") String prefix,
364+
String indentation,
365+
StringBuilder sb) {
366+
String string = msg.toString();
367+
// drop the final new line before prefixing
368+
string = string.replaceAll("\n$", "");
369+
sb.append("\n")
370+
.append(prefix)
371+
.append(indentation)
372+
.append(string.replaceAll("\r?\n", "\n" + prefix + indentation));
373+
}
236374
}

0 commit comments

Comments
 (0)