Skip to content

Commit 0dbbfb8

Browse files
feat: introduce MaximumRequestCallbackWaitTimeExceededException (#2401)
exception
1 parent 4258af4 commit 0dbbfb8

File tree

3 files changed

+38
-5
lines changed

3 files changed

+38
-5
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java

+2-5
Original file line numberDiff line numberDiff line change
@@ -844,11 +844,8 @@ private void appendLoop() {
844844
private void throwIfWaitCallbackTooLong(Instant timeToCheck) {
845845
Duration milliSinceLastCallback = Duration.between(timeToCheck, Instant.now());
846846
if (milliSinceLastCallback.compareTo(MAXIMUM_REQUEST_CALLBACK_WAIT_TIME) > 0) {
847-
throw new RuntimeException(
848-
String.format(
849-
"Request has waited in inflight queue for %sms for writer %s, "
850-
+ "which is over maximum wait time %s",
851-
milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME.toString()));
847+
throw new Exceptions.MaximumRequestCallbackWaitTimeExceededException(
848+
milliSinceLastCallback, writerId, MAXIMUM_REQUEST_CALLBACK_WAIT_TIME);
852849
}
853850
}
854851

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java

+34
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import io.grpc.Status;
2323
import io.grpc.StatusRuntimeException;
2424
import io.grpc.protobuf.StatusProto;
25+
import java.time.Duration;
2526
import java.util.Map;
2627
import java.util.regex.Matcher;
2728
import java.util.regex.Pattern;
@@ -416,5 +417,38 @@ public String getFieldName() {
416417
}
417418
}
418419

420+
/**
421+
* The connection was shut down because a callback was not received within the maximum wait time.
422+
*/
423+
public static class MaximumRequestCallbackWaitTimeExceededException extends RuntimeException {
424+
private final Duration callbackWaitTime;
425+
private final String writerId;
426+
private final Duration callbackWaitTimeLimit;
427+
428+
public MaximumRequestCallbackWaitTimeExceededException(
429+
Duration callbackWaitTime, String writerId, Duration callbackWaitTimeLimit) {
430+
super(
431+
String.format(
432+
"Request has waited in inflight queue for %sms for writer %s, "
433+
+ "which is over maximum wait time %s",
434+
callbackWaitTime, writerId, callbackWaitTimeLimit.toString()));
435+
this.callbackWaitTime = callbackWaitTime;
436+
this.writerId = writerId;
437+
this.callbackWaitTimeLimit = callbackWaitTimeLimit;
438+
}
439+
440+
public Duration getCallbackWaitTime() {
441+
return callbackWaitTime;
442+
}
443+
444+
public String getWriterId() {
445+
return writerId;
446+
}
447+
448+
public Duration getCallbackWaitTimeLimit() {
449+
return callbackWaitTimeLimit;
450+
}
451+
}
452+
419453
private Exceptions() {}
420454
}

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -1235,6 +1235,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
12351235
() -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
12361236
if (i == 0) {
12371237
assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
1238+
assertThat(ex.getCause())
1239+
.isInstanceOf(Exceptions.MaximumRequestCallbackWaitTimeExceededException.class);
12381240
} else {
12391241
assertThat(ex.getCause())
12401242
.hasMessageThat()

0 commit comments

Comments
 (0)