Commit d62a1d3 1 parent cc9fdfd commit d62a1d3 Copy full SHA for d62a1d3
File tree 2 files changed +13
-4
lines changed
google-cloud-bigquerystorage/src
main/java/com/google/cloud/bigquery/storage/v1
test/java/com/google/cloud/bigquery/storage/v1
2 files changed +13
-4
lines changed Original file line number Diff line number Diff line change @@ -353,7 +353,7 @@ public void run() {
353
353
} finally {
354
354
lock .unlock ();
355
355
}
356
- cleanupInflightRequests ( );
356
+ cleanup ( /* waitForDone= */ false );
357
357
});
358
358
this .appendThread .start ();
359
359
}
@@ -812,7 +812,10 @@ private void appendLoop() {
812
812
this .streamConnection .send (originalRequestBuilder .build ());
813
813
}
814
814
}
815
+ cleanup (/* waitForDone= */ true );
816
+ }
815
817
818
+ private void cleanup (boolean waitForDone ) {
816
819
log .info (
817
820
"Cleanup starts. Stream: "
818
821
+ streamName
@@ -828,7 +831,9 @@ private void appendLoop() {
828
831
// We can close the stream connection and handle the remaining inflight requests.
829
832
if (streamConnection != null ) {
830
833
this .streamConnection .close ();
831
- waitForDoneCallback (3 , TimeUnit .MINUTES );
834
+ if (waitForDone ) {
835
+ waitForDoneCallback (3 , TimeUnit .MINUTES );
836
+ }
832
837
}
833
838
834
839
// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
Original file line number Diff line number Diff line change @@ -650,9 +650,9 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
650
650
null ,
651
651
client .getSettings (),
652
652
retrySettings );
653
- testBigQueryWrite .setResponseSleep (org .threeten .bp .Duration .ofSeconds (3 ));
653
+ testBigQueryWrite .setResponseSleep (org .threeten .bp .Duration .ofSeconds (2 ));
654
654
655
- long appendCount = 10 ;
655
+ long appendCount = 2 ;
656
656
for (int i = 0 ; i < appendCount ; i ++) {
657
657
testBigQueryWrite .addResponse (createAppendResponse (i ));
658
658
}
@@ -691,6 +691,10 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
691
691
100 )
692
692
.get ());
693
693
assertThat (ex .getCause ()).hasMessageThat ().contains ("Request has waited in inflight queue" );
694
+
695
+ // Verify we can shutdown normally.
696
+ connectionWorker .close ();
697
+ assertTrue (connectionWorker .isUserClosed ());
694
698
}
695
699
696
700
@ Test
You can’t perform that action at this time.
0 commit comments