49
49
import com .google .api .gax .core .ExecutorProvider ;
50
50
import com .google .api .gax .rpc .ServerStream ;
51
51
import com .google .cloud .Timestamp ;
52
+ import com .google .cloud .Tuple ;
52
53
import com .google .cloud .grpc .GrpcTransportOptions ;
53
54
import com .google .cloud .grpc .GrpcTransportOptions .ExecutorFactory ;
54
55
import com .google .cloud .spanner .Options .QueryOption ;
@@ -1854,18 +1855,18 @@ private void keepAliveSessions(Instant currTime) {
1854
1855
1855
1856
// Keep chugging till there is no session that needs to be kept alive.
1856
1857
while (numSessionsToKeepAlive > 0 ) {
1857
- PooledSession sessionToKeepAlive = null ;
1858
+ Tuple < PooledSession , Integer > sessionToKeepAlive ;
1858
1859
synchronized (lock ) {
1859
1860
sessionToKeepAlive = findSessionToKeepAlive (sessions , keepAliveThreshold , 0 );
1860
1861
}
1861
1862
if (sessionToKeepAlive == null ) {
1862
1863
break ;
1863
1864
}
1864
1865
try {
1865
- logger .log (Level .FINE , "Keeping alive session " + sessionToKeepAlive .getName ());
1866
+ logger .log (Level .FINE , "Keeping alive session " + sessionToKeepAlive .x (). getName ());
1866
1867
numSessionsToKeepAlive --;
1867
- sessionToKeepAlive .keepAlive ();
1868
- releaseSession (sessionToKeepAlive , false );
1868
+ sessionToKeepAlive .x (). keepAlive ();
1869
+ releaseSession (sessionToKeepAlive );
1869
1870
} catch (SpannerException e ) {
1870
1871
handleException (e , sessionToKeepAlive );
1871
1872
}
@@ -2314,11 +2315,11 @@ private boolean isClosed() {
2314
2315
}
2315
2316
}
2316
2317
2317
- private void handleException (SpannerException e , PooledSession session ) {
2318
+ private void handleException (SpannerException e , Tuple < PooledSession , Integer > session ) {
2318
2319
if (isSessionNotFound (e )) {
2319
- invalidateSession (session );
2320
+ invalidateSession (session . x () );
2320
2321
} else {
2321
- releaseSession (session , false );
2322
+ releaseSession (session );
2322
2323
}
2323
2324
}
2324
2325
@@ -2342,7 +2343,7 @@ private void invalidateSession(PooledSession session) {
2342
2343
}
2343
2344
}
2344
2345
2345
- private PooledSession findSessionToKeepAlive (
2346
+ private Tuple < PooledSession , Integer > findSessionToKeepAlive (
2346
2347
Queue <PooledSession > queue , Instant keepAliveThreshold , int numAlreadyChecked ) {
2347
2348
int numChecked = 0 ;
2348
2349
Iterator <PooledSession > iterator = queue .iterator ();
@@ -2352,7 +2353,7 @@ private PooledSession findSessionToKeepAlive(
2352
2353
PooledSession session = iterator .next ();
2353
2354
if (session .delegate .getLastUseTime ().isBefore (keepAliveThreshold )) {
2354
2355
iterator .remove ();
2355
- return session ;
2356
+ return Tuple . of ( session , numChecked ) ;
2356
2357
}
2357
2358
numChecked ++;
2358
2359
}
@@ -2476,8 +2477,17 @@ private void maybeCreateSession() {
2476
2477
}
2477
2478
}
2478
2479
2479
- /** Releases a session back to the pool. This might cause one of the waiters to be unblocked. */
2480
+ private void releaseSession (Tuple <PooledSession , Integer > sessionWithPosition ) {
2481
+ releaseSession (sessionWithPosition .x (), false , sessionWithPosition .y ());
2482
+ }
2483
+
2480
2484
private void releaseSession (PooledSession session , boolean isNewSession ) {
2485
+ releaseSession (session , isNewSession , null );
2486
+ }
2487
+
2488
+ /** Releases a session back to the pool. This might cause one of the waiters to be unblocked. */
2489
+ private void releaseSession (
2490
+ PooledSession session , boolean isNewSession , @ Nullable Integer position ) {
2481
2491
Preconditions .checkNotNull (session );
2482
2492
synchronized (lock ) {
2483
2493
if (closureFuture != null ) {
@@ -2497,7 +2507,12 @@ private void releaseSession(PooledSession session, boolean isNewSession) {
2497
2507
// more efficient.
2498
2508
session .releaseToPosition = options .getReleaseToPosition ();
2499
2509
}
2500
- if (session .releaseToPosition == Position .RANDOM && !sessions .isEmpty ()) {
2510
+ if (position != null ) {
2511
+ // Make sure we use a valid position, as the number of sessions could have changed in the
2512
+ // meantime.
2513
+ int actualPosition = Math .min (position , sessions .size ());
2514
+ sessions .add (actualPosition , session );
2515
+ } else if (session .releaseToPosition == Position .RANDOM && !sessions .isEmpty ()) {
2501
2516
// A session should only be added at a random position the first time it is added to
2502
2517
// the pool or if the pool was deemed unbalanced. All following releases into the pool
2503
2518
// should normally happen at the default release position (unless the pool is again deemed
@@ -2510,6 +2525,7 @@ private void releaseSession(PooledSession session, boolean isNewSession) {
2510
2525
} else {
2511
2526
sessions .addFirst (session );
2512
2527
}
2528
+ session .releaseToPosition = options .getReleaseToPosition ();
2513
2529
} else {
2514
2530
waiters .poll ().put (session );
2515
2531
}
0 commit comments