53
53
import com .google .common .annotations .VisibleForTesting ;
54
54
import com .google .common .base .Preconditions ;
55
55
import com .google .common .util .concurrent .MoreExecutors ;
56
+ import com .google .spanner .v1 .DirectedReadOptions ;
56
57
import com .google .spanner .v1 .ExecuteSqlRequest .QueryOptions ;
57
58
import com .google .spanner .v1 .ResultSetStats ;
58
59
import java .util .ArrayList ;
@@ -236,6 +237,7 @@ static UnitOfWorkType of(TransactionMode transactionMode) {
236
237
*/
237
238
private int maxPartitionedParallelism ;
238
239
240
+ private DirectedReadOptions directedReadOptions = null ;
239
241
private QueryOptions queryOptions = QueryOptions .getDefaultInstance ();
240
242
private RpcPriority rpcPriority = null ;
241
243
private SavepointSupport savepointSupport = SavepointSupport .FAIL_AFTER_ROLLBACK ;
@@ -510,6 +512,21 @@ public TimestampBound getReadOnlyStaleness() {
510
512
return this .readOnlyStaleness ;
511
513
}
512
514
515
+ @ Override
516
+ public void setDirectedRead (DirectedReadOptions directedReadOptions ) {
517
+ ConnectionPreconditions .checkState (!isClosed (), CLOSED_ERROR_MSG );
518
+ ConnectionPreconditions .checkState (
519
+ !isTransactionStarted (),
520
+ "Cannot set directed read options when a transaction has been started" );
521
+ this .directedReadOptions = directedReadOptions ;
522
+ }
523
+
524
+ @ Override
525
+ public DirectedReadOptions getDirectedRead () {
526
+ ConnectionPreconditions .checkState (!isClosed (), CLOSED_ERROR_MSG );
527
+ return this .directedReadOptions ;
528
+ }
529
+
513
530
@ Override
514
531
public void setOptimizerVersion (String optimizerVersion ) {
515
532
Preconditions .checkNotNull (optimizerVersion );
@@ -1131,7 +1148,8 @@ public ResultSet partitionQuery(
1131
1148
CallType .SYNC ,
1132
1149
parsedStatement ,
1133
1150
getEffectivePartitionOptions (partitionOptions ),
1134
- mergeDataBoost (mergeQueryRequestOptions (mergeQueryStatementTag (options )))));
1151
+ mergeDataBoost (
1152
+ mergeQueryRequestOptions (parsedStatement , mergeQueryStatementTag (options )))));
1135
1153
}
1136
1154
1137
1155
private PartitionOptions getEffectivePartitionOptions (
@@ -1427,41 +1445,38 @@ private List<ParsedStatement> parseUpdateStatements(Iterable<Statement> updates)
1427
1445
1428
1446
private QueryOption [] mergeDataBoost (QueryOption ... options ) {
1429
1447
if (this .dataBoostEnabled ) {
1430
-
1431
- // Shortcut for the most common scenario.
1432
- if (options == null || options .length == 0 ) {
1433
- options = new QueryOption [] {Options .dataBoostEnabled (true )};
1434
- } else {
1435
- options = Arrays .copyOf (options , options .length + 1 );
1436
- options [options .length - 1 ] = Options .dataBoostEnabled (true );
1437
- }
1448
+ options = appendQueryOption (options , Options .dataBoostEnabled (true ));
1438
1449
}
1439
1450
return options ;
1440
1451
}
1441
1452
1442
1453
private QueryOption [] mergeQueryStatementTag (QueryOption ... options ) {
1443
1454
if (this .statementTag != null ) {
1444
- // Shortcut for the most common scenario.
1445
- if (options == null || options .length == 0 ) {
1446
- options = new QueryOption [] {Options .tag (statementTag )};
1447
- } else {
1448
- options = Arrays .copyOf (options , options .length + 1 );
1449
- options [options .length - 1 ] = Options .tag (statementTag );
1450
- }
1455
+ options = appendQueryOption (options , Options .tag (statementTag ));
1451
1456
this .statementTag = null ;
1452
1457
}
1453
1458
return options ;
1454
1459
}
1455
1460
1456
- private QueryOption [] mergeQueryRequestOptions (QueryOption ... options ) {
1461
+ private QueryOption [] mergeQueryRequestOptions (
1462
+ ParsedStatement parsedStatement , QueryOption ... options ) {
1457
1463
if (this .rpcPriority != null ) {
1458
- // Shortcut for the most common scenario.
1459
- if (options == null || options .length == 0 ) {
1460
- options = new QueryOption [] {Options .priority (this .rpcPriority )};
1461
- } else {
1462
- options = Arrays .copyOf (options , options .length + 1 );
1463
- options [options .length - 1 ] = Options .priority (this .rpcPriority );
1464
- }
1464
+ options = appendQueryOption (options , Options .priority (this .rpcPriority ));
1465
+ }
1466
+ if (this .directedReadOptions != null
1467
+ && currentUnitOfWork != null
1468
+ && currentUnitOfWork .supportsDirectedReads (parsedStatement )) {
1469
+ options = appendQueryOption (options , Options .directedRead (this .directedReadOptions ));
1470
+ }
1471
+ return options ;
1472
+ }
1473
+
1474
+ private QueryOption [] appendQueryOption (QueryOption [] options , QueryOption append ) {
1475
+ if (options == null || options .length == 0 ) {
1476
+ options = new QueryOption [] {append };
1477
+ } else {
1478
+ options = Arrays .copyOf (options , options .length + 1 );
1479
+ options [options .length - 1 ] = append ;
1465
1480
}
1466
1481
return options ;
1467
1482
}
@@ -1516,7 +1531,7 @@ private ResultSet internalExecuteQuery(
1516
1531
callType ,
1517
1532
statement ,
1518
1533
analyzeMode ,
1519
- mergeQueryRequestOptions (mergeQueryStatementTag (options ))));
1534
+ mergeQueryRequestOptions (statement , mergeQueryStatementTag (options ))));
1520
1535
}
1521
1536
1522
1537
private AsyncResultSet internalExecuteQueryAsync (
@@ -1538,7 +1553,7 @@ private AsyncResultSet internalExecuteQueryAsync(
1538
1553
callType ,
1539
1554
statement ,
1540
1555
analyzeMode ,
1541
- mergeQueryRequestOptions (mergeQueryStatementTag (options ))),
1556
+ mergeQueryRequestOptions (statement , mergeQueryStatementTag (options ))),
1542
1557
spanner .getAsyncExecutorProvider (),
1543
1558
options );
1544
1559
}
0 commit comments