54
54
import com .google .cloud .spanner .SessionPoolOptions .InactiveTransactionRemovalOptions ;
55
55
import com .google .cloud .spanner .SpannerException .ResourceNotFoundException ;
56
56
import com .google .cloud .spanner .SpannerImpl .ClosedException ;
57
+ import com .google .cloud .spanner .spi .v1 .SpannerRpc ;
57
58
import com .google .common .annotations .VisibleForTesting ;
58
59
import com .google .common .base .Function ;
59
60
import com .google .common .base .MoreObjects ;
@@ -1366,12 +1367,19 @@ PooledSession get(final boolean eligibleForLongRunning) {
1366
1367
}
1367
1368
}
1368
1369
1369
- final class PooledSession implements Session {
1370
+ class PooledSession implements Session {
1370
1371
@ VisibleForTesting SessionImpl delegate ;
1371
1372
private volatile Instant lastUseTime ;
1372
1373
private volatile SpannerException lastException ;
1373
1374
private volatile boolean allowReplacing = true ;
1374
1375
1376
+ /**
1377
+ * This ensures that the session is added at a random position in the pool the first time it is
1378
+ * actually added to the pool.
1379
+ */
1380
+ @ GuardedBy ("lock" )
1381
+ private Position releaseToPosition = initialReleasePosition ;
1382
+
1375
1383
/**
1376
1384
* Property to mark if the session is eligible to be long-running. This can only be true if the
1377
1385
* session is executing certain types of transactions (for ex - Partitioned DML) which can be
@@ -1403,6 +1411,13 @@ private PooledSession(SessionImpl delegate) {
1403
1411
this .lastUseTime = clock .instant ();
1404
1412
}
1405
1413
1414
+ int getChannel () {
1415
+ Long channelHint = (Long ) delegate .getOptions ().get (SpannerRpc .Option .CHANNEL_HINT );
1416
+ return channelHint == null
1417
+ ? 0
1418
+ : (int ) (channelHint % sessionClient .getSpanner ().getOptions ().getNumChannels ());
1419
+ }
1420
+
1406
1421
@ Override
1407
1422
public String toString () {
1408
1423
return getName ();
@@ -1536,7 +1551,7 @@ public void close() {
1536
1551
if (state != SessionState .CLOSING ) {
1537
1552
state = SessionState .AVAILABLE ;
1538
1553
}
1539
- releaseSession (this , Position . FIRST );
1554
+ releaseSession (this , false );
1540
1555
}
1541
1556
}
1542
1557
@@ -1576,7 +1591,7 @@ private void determineDialectAsync(final SettableFuture<Dialect> dialect) {
1576
1591
// in the database dialect, and there's nothing sensible that we can do with it here.
1577
1592
dialect .setException (t );
1578
1593
} finally {
1579
- releaseSession (this , Position . FIRST );
1594
+ releaseSession (this , false );
1580
1595
}
1581
1596
});
1582
1597
}
@@ -1830,7 +1845,7 @@ private void keepAliveSessions(Instant currTime) {
1830
1845
logger .log (Level .FINE , "Keeping alive session " + sessionToKeepAlive .getName ());
1831
1846
numSessionsToKeepAlive --;
1832
1847
sessionToKeepAlive .keepAlive ();
1833
- releaseSession (sessionToKeepAlive , Position . FIRST );
1848
+ releaseSession (sessionToKeepAlive , false );
1834
1849
} catch (SpannerException e ) {
1835
1850
handleException (e , sessionToKeepAlive );
1836
1851
}
@@ -1929,7 +1944,7 @@ private void removeLongRunningSessions(
1929
1944
}
1930
1945
}
1931
1946
1932
- private enum Position {
1947
+ enum Position {
1933
1948
FIRST ,
1934
1949
RANDOM
1935
1950
}
@@ -1962,6 +1977,15 @@ private enum Position {
1962
1977
1963
1978
final PoolMaintainer poolMaintainer ;
1964
1979
private final Clock clock ;
1980
+ /**
1981
+ * initialReleasePosition determines where in the pool sessions are added when they are released
1982
+ * into the pool the first time. This is always RANDOM in production, but some tests use FIRST to
1983
+ * be able to verify the order of sessions in the pool. Using RANDOM ensures that we do not get an
1984
+ * unbalanced session pool where all sessions belonging to one gRPC channel are added to the same
1985
+ * region in the pool.
1986
+ */
1987
+ private final Position initialReleasePosition ;
1988
+
1965
1989
private final Object lock = new Object ();
1966
1990
private final Random random = new Random ();
1967
1991
@@ -2045,6 +2069,7 @@ static SessionPool createPool(
2045
2069
((GrpcTransportOptions ) spannerOptions .getTransportOptions ()).getExecutorFactory (),
2046
2070
sessionClient ,
2047
2071
poolMaintainerClock == null ? new Clock () : poolMaintainerClock ,
2072
+ Position .RANDOM ,
2048
2073
Metrics .getMetricRegistry (),
2049
2074
labelValues );
2050
2075
}
@@ -2053,20 +2078,22 @@ static SessionPool createPool(
2053
2078
SessionPoolOptions poolOptions ,
2054
2079
ExecutorFactory <ScheduledExecutorService > executorFactory ,
2055
2080
SessionClient sessionClient ) {
2056
- return createPool (poolOptions , executorFactory , sessionClient , new Clock ());
2081
+ return createPool (poolOptions , executorFactory , sessionClient , new Clock (), Position . RANDOM );
2057
2082
}
2058
2083
2059
2084
static SessionPool createPool (
2060
2085
SessionPoolOptions poolOptions ,
2061
2086
ExecutorFactory <ScheduledExecutorService > executorFactory ,
2062
2087
SessionClient sessionClient ,
2063
- Clock clock ) {
2088
+ Clock clock ,
2089
+ Position initialReleasePosition ) {
2064
2090
return createPool (
2065
2091
poolOptions ,
2066
2092
null ,
2067
2093
executorFactory ,
2068
2094
sessionClient ,
2069
2095
clock ,
2096
+ initialReleasePosition ,
2070
2097
Metrics .getMetricRegistry (),
2071
2098
SPANNER_DEFAULT_LABEL_VALUES );
2072
2099
}
@@ -2077,6 +2104,7 @@ static SessionPool createPool(
2077
2104
ExecutorFactory <ScheduledExecutorService > executorFactory ,
2078
2105
SessionClient sessionClient ,
2079
2106
Clock clock ,
2107
+ Position initialReleasePosition ,
2080
2108
MetricRegistry metricRegistry ,
2081
2109
List <LabelValue > labelValues ) {
2082
2110
SessionPool pool =
@@ -2087,6 +2115,7 @@ static SessionPool createPool(
2087
2115
executorFactory .get (),
2088
2116
sessionClient ,
2089
2117
clock ,
2118
+ initialReleasePosition ,
2090
2119
metricRegistry ,
2091
2120
labelValues );
2092
2121
pool .initPool ();
@@ -2100,6 +2129,7 @@ private SessionPool(
2100
2129
ScheduledExecutorService executor ,
2101
2130
SessionClient sessionClient ,
2102
2131
Clock clock ,
2132
+ Position initialReleasePosition ,
2103
2133
MetricRegistry metricRegistry ,
2104
2134
List <LabelValue > labelValues ) {
2105
2135
this .options = options ;
@@ -2108,6 +2138,7 @@ private SessionPool(
2108
2138
this .executor = executor ;
2109
2139
this .sessionClient = sessionClient ;
2110
2140
this .clock = clock ;
2141
+ this .initialReleasePosition = initialReleasePosition ;
2111
2142
this .poolMaintainer = new PoolMaintainer ();
2112
2143
this .initMetricsCollection (metricRegistry , labelValues );
2113
2144
this .waitOnMinSessionsLatch =
@@ -2233,7 +2264,7 @@ private void handleException(SpannerException e, PooledSession session) {
2233
2264
if (isSessionNotFound (e )) {
2234
2265
invalidateSession (session );
2235
2266
} else {
2236
- releaseSession (session , Position . FIRST );
2267
+ releaseSession (session , false );
2237
2268
}
2238
2269
}
2239
2270
@@ -2396,33 +2427,128 @@ private void maybeCreateSession() {
2396
2427
}
2397
2428
}
2398
2429
}
2430
+
2399
2431
/** Releases a session back to the pool. This might cause one of the waiters to be unblocked. */
2400
- private void releaseSession (PooledSession session , Position position ) {
2432
+ private void releaseSession (PooledSession session , boolean isNewSession ) {
2401
2433
Preconditions .checkNotNull (session );
2402
2434
synchronized (lock ) {
2403
2435
if (closureFuture != null ) {
2404
2436
return ;
2405
2437
}
2406
2438
if (waiters .size () == 0 ) {
2407
- // No pending waiters
2408
- switch (position ) {
2409
- case RANDOM :
2410
- if (!sessions .isEmpty ()) {
2411
- int pos = random .nextInt (sessions .size () + 1 );
2412
- sessions .add (pos , session );
2413
- break ;
2414
- }
2415
- // fallthrough
2416
- case FIRST :
2417
- default :
2418
- sessions .addFirst (session );
2439
+ // There are no pending waiters.
2440
+ // Add to a random position if the head of the session pool already contains many sessions
2441
+ // with the same channel as this one.
2442
+ if (session .releaseToPosition == Position .FIRST && isUnbalanced (session )) {
2443
+ session .releaseToPosition = Position .RANDOM ;
2444
+ } else if (session .releaseToPosition == Position .RANDOM
2445
+ && !isNewSession
2446
+ && checkedOutSessions .size () <= 2 ) {
2447
+ // Do not randomize if there are few other sessions checked out and this session has been
2448
+ // used. This ensures that this session will be re-used for the next transaction, which is
2449
+ // more efficient.
2450
+ session .releaseToPosition = Position .FIRST ;
2451
+ }
2452
+ if (session .releaseToPosition == Position .RANDOM && !sessions .isEmpty ()) {
2453
+ // A session should only be added at a random position the first time it is added to
2454
+ // the pool or if the pool was deemed unbalanced. All following releases into the pool
2455
+ // should normally happen at the front of the pool (unless the pool is again deemed to be
2456
+ // unbalanced).
2457
+ session .releaseToPosition = Position .FIRST ;
2458
+ int pos = random .nextInt (sessions .size () + 1 );
2459
+ sessions .add (pos , session );
2460
+ } else {
2461
+ sessions .addFirst (session );
2419
2462
}
2420
2463
} else {
2421
2464
waiters .poll ().put (session );
2422
2465
}
2423
2466
}
2424
2467
}
2425
2468
2469
+ private boolean isUnbalanced (PooledSession session ) {
2470
+ int channel = session .getChannel ();
2471
+ int numChannels = sessionClient .getSpanner ().getOptions ().getNumChannels ();
2472
+ return isUnbalanced (channel , this .sessions , this .checkedOutSessions , numChannels );
2473
+ }
2474
+
2475
+ /**
2476
+ * Returns true if the given list of sessions is considered unbalanced when compared to the
2477
+ * sessionChannel that is about to be added to the pool.
2478
+ *
2479
+ * <p>The method returns true if all the following is true:
2480
+ *
2481
+ * <ol>
2482
+ * <li>The list of sessions is not empty.
2483
+ * <li>The number of checked out sessions is > 2.
2484
+ * <li>The number of channels being used by the pool is > 1.
2485
+ * <li>And at least one of the following is true:
2486
+ * <ol>
2487
+ * <li>The first numChannels sessions in the list of sessions contains more than 2
2488
+ * sessions that use the same channel as the one being added.
2489
+ * <li>The list of currently checked out sessions contains more than 2 times the the
2490
+ * number of sessions with the same channel as the one being added than it should in
2491
+ * order for it to be perfectly balanced. Perfectly balanced in this case means that
2492
+ * the list should preferably contain size/numChannels sessions of each channel.
2493
+ * </ol>
2494
+ * </ol>
2495
+ *
2496
+ * @param channelOfSessionBeingAdded the channel number being used by the session that is about to
2497
+ * be released into the pool
2498
+ * @param sessions the list of all sessions in the pool
2499
+ * @param checkedOutSessions the currently checked out sessions of the pool
2500
+ * @param numChannels the number of channels in use
2501
+ * @return true if the pool is considered unbalanced, and false otherwise
2502
+ */
2503
+ @ VisibleForTesting
2504
+ static boolean isUnbalanced (
2505
+ int channelOfSessionBeingAdded ,
2506
+ List <PooledSession > sessions ,
2507
+ Set <PooledSessionFuture > checkedOutSessions ,
2508
+ int numChannels ) {
2509
+ // Do not re-balance the pool if the number of checked out sessions is low, as it is
2510
+ // better to re-use sessions as much as possible in a low-QPS scenario.
2511
+ if (sessions .isEmpty () || checkedOutSessions .size () <= 2 ) {
2512
+ return false ;
2513
+ }
2514
+ if (numChannels == 1 ) {
2515
+ return false ;
2516
+ }
2517
+
2518
+ // Ideally, the first numChannels sessions in the pool should contain exactly one session for
2519
+ // each channel.
2520
+ // Check if the first numChannels sessions at the head of the pool already contain more than 2
2521
+ // sessions that use the same channel as this one. If so, we re-balance.
2522
+ // We also re-balance the pool in the specific case that the pool uses 2 channels and the first
2523
+ // two sessions use those two channels.
2524
+ int maxSessionsAtHeadOfPool = Math .min (numChannels , 3 );
2525
+ int count = 0 ;
2526
+ for (int i = 0 ; i < Math .min (numChannels , sessions .size ()); i ++) {
2527
+ PooledSession otherSession = sessions .get (i );
2528
+ if (channelOfSessionBeingAdded == otherSession .getChannel ()) {
2529
+ count ++;
2530
+ if (count >= maxSessionsAtHeadOfPool ) {
2531
+ return true ;
2532
+ }
2533
+ }
2534
+ }
2535
+ // Ideally, the use of a channel in the checked out sessions is exactly
2536
+ // numCheckedOut / numChannels
2537
+ // We check whether we are more than a factor two away from that perfect distribution.
2538
+ // If we are, then we re-balance.
2539
+ count = 0 ;
2540
+ int checkedOutThreshold = Math .max (2 , 2 * checkedOutSessions .size () / numChannels );
2541
+ for (PooledSessionFuture otherSession : checkedOutSessions ) {
2542
+ if (otherSession .isDone () && channelOfSessionBeingAdded == otherSession .get ().getChannel ()) {
2543
+ count ++;
2544
+ if (count > checkedOutThreshold ) {
2545
+ return true ;
2546
+ }
2547
+ }
2548
+ }
2549
+ return false ;
2550
+ }
2551
+
2426
2552
private void handleCreateSessionsFailure (SpannerException e , int count ) {
2427
2553
synchronized (lock ) {
2428
2554
for (int i = 0 ; i < count ; i ++) {
@@ -2622,7 +2748,7 @@ public void onSessionReady(SessionImpl session) {
2622
2748
// Release the session to a random position in the pool to prevent the case that a batch
2623
2749
// of sessions that are affiliated with the same channel are all placed sequentially in
2624
2750
// the pool.
2625
- releaseSession (pooledSession , Position . RANDOM );
2751
+ releaseSession (pooledSession , true );
2626
2752
}
2627
2753
}
2628
2754
}
0 commit comments