Commit 782894e 1 parent cc9fdfd commit 782894e Copy full SHA for 782894e
File tree 3 files changed +14
-4
lines changed
google-cloud-bigquerystorage/src
main/java/com/google/cloud/bigquery/storage/v1
test/java/com/google/cloud/bigquery/storage/v1
3 files changed +14
-4
lines changed Original file line number Diff line number Diff line change @@ -353,7 +353,7 @@ public void run() {
353
353
} finally {
354
354
lock .unlock ();
355
355
}
356
- cleanupInflightRequests ( );
356
+ cleanup ( /* waitForDone= */ false );
357
357
});
358
358
this .appendThread .start ();
359
359
}
@@ -812,7 +812,10 @@ private void appendLoop() {
812
812
this .streamConnection .send (originalRequestBuilder .build ());
813
813
}
814
814
}
815
+ cleanup (/* waitForDone= */ true );
816
+ }
815
817
818
+ private void cleanup (boolean waitForDone ) {
816
819
log .info (
817
820
"Cleanup starts. Stream: "
818
821
+ streamName
@@ -828,7 +831,9 @@ private void appendLoop() {
828
831
// We can close the stream connection and handle the remaining inflight requests.
829
832
if (streamConnection != null ) {
830
833
this .streamConnection .close ();
831
- waitForDoneCallback (3 , TimeUnit .MINUTES );
834
+ if (waitForDone ) {
835
+ waitForDoneCallback (3 , TimeUnit .MINUTES );
836
+ }
832
837
}
833
838
834
839
// At this point, there cannot be more callback. It is safe to clean up all inflight requests.
Original file line number Diff line number Diff line change @@ -650,9 +650,9 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
650
650
null ,
651
651
client .getSettings (),
652
652
retrySettings );
653
- testBigQueryWrite .setResponseSleep (org .threeten .bp .Duration .ofSeconds (3 ));
653
+ testBigQueryWrite .setResponseSleep (org .threeten .bp .Duration .ofSeconds (2 ));
654
654
655
- long appendCount = 10 ;
655
+ long appendCount = 2 ;
656
656
for (int i = 0 ; i < appendCount ; i ++) {
657
657
testBigQueryWrite .addResponse (createAppendResponse (i ));
658
658
}
@@ -691,6 +691,8 @@ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws E
691
691
100 )
692
692
.get ());
693
693
assertThat (ex .getCause ()).hasMessageThat ().contains ("Request has waited in inflight queue" );
694
+ connectionWorker .close ();
695
+ assertTrue (connectionWorker .isUserClosed ());
694
696
}
695
697
696
698
@ Test
Original file line number Diff line number Diff line change 32
32
import com .google .cloud .bigquery .storage .v1 .*;
33
33
import com .google .cloud .bigquery .storage .v1 .AppendRowsRequest .MissingValueInterpretation ;
34
34
import com .google .cloud .bigquery .storage .v1 .Exceptions .AppendSerializationError ;
35
+ import com .google .cloud .bigquery .storage .v1 .Exceptions .MaximumRequestCallbackWaitTimeExceededException ;
35
36
import com .google .cloud .bigquery .storage .v1 .Exceptions .OffsetAlreadyExists ;
36
37
import com .google .cloud .bigquery .storage .v1 .Exceptions .OffsetOutOfRange ;
37
38
import com .google .cloud .bigquery .storage .v1 .Exceptions .SchemaMismatchedException ;
38
39
import com .google .cloud .bigquery .storage .v1 .Exceptions .StreamFinalizedException ;
40
+ import com .google .cloud .bigquery .storage .v1 .Exceptions .StreamWriterClosedException ;
39
41
import com .google .cloud .bigquery .testing .RemoteBigQueryHelper ;
40
42
import com .google .common .collect .ImmutableList ;
41
43
import com .google .protobuf .ByteString ;
49
51
import java .math .BigDecimal ;
50
52
import java .sql .Timestamp ;
51
53
import java .text .ParseException ;
54
+ import java .time .Duration ;
52
55
import java .time .Instant ;
53
56
import java .time .ZoneId ;
54
57
import java .time .temporal .ChronoUnit ;
You can’t perform that action at this time.
0 commit comments