Skip to content

Commit d5fb01a

Browse files
authored
fix: Transactions with readTime will omit begin and commit transaction requests, and instead pass readTime on individual read requests. (#1565)
* Optimize ReadOnly transactions. * Pretty * Refactor * Handle null readTime * Consistent error messages * Pretty * Refactor * Make more backward compatible * Clirr * Feedback
1 parent 71e053e commit d5fb01a

File tree

10 files changed

+553
-174
lines changed

10 files changed

+553
-174
lines changed

google-cloud-firestore/clirr-ignored-differences.xml

+6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717

1818
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
1919
<differences>
20+
<!-- ReadTimeTransaction - added abstract modifier to Transaction class-->
21+
<difference>
22+
<differenceType>3005</differenceType>
23+
<className>com/google/cloud/firestore/Transaction</className>
24+
</difference>
25+
2026
<!-- Shutdown/Shutdown Now -->
2127
<difference>
2228
<differenceType>7012</differenceType>

google-cloud-firestore/src/main/java/com/google/cloud/firestore/AggregateQuery.java

+21-18
Original file line numberDiff line numberDiff line change
@@ -72,20 +72,22 @@ public Query getQuery() {
7272
*/
7373
@Nonnull
7474
public ApiFuture<AggregateQuerySnapshot> get() {
75-
return get(null);
75+
return get(null, null);
7676
}
7777

7878
@Nonnull
79-
ApiFuture<AggregateQuerySnapshot> get(@Nullable final ByteString transactionId) {
79+
ApiFuture<AggregateQuerySnapshot> get(
80+
@Nullable final ByteString transactionId, @Nullable com.google.protobuf.Timestamp readTime) {
8081
AggregateQueryResponseDeliverer responseDeliverer =
8182
new AggregateQueryResponseDeliverer(
82-
transactionId, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
83+
transactionId, readTime, /* startTimeNanos= */ query.rpcContext.getClock().nanoTime());
8384
runQuery(responseDeliverer);
8485
return responseDeliverer.getFuture();
8586
}
8687

8788
private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
88-
RunAggregationQueryRequest request = toProto(responseDeliverer.getTransactionId());
89+
RunAggregationQueryRequest request =
90+
toProto(responseDeliverer.transactionId, responseDeliverer.readTime);
8991
AggregateQueryResponseObserver responseObserver =
9092
new AggregateQueryResponseObserver(responseDeliverer);
9193
ServerStreamingCallable<RunAggregationQueryRequest, RunAggregationQueryResponse> callable =
@@ -96,28 +98,24 @@ private void runQuery(AggregateQueryResponseDeliverer responseDeliverer) {
9698
private final class AggregateQueryResponseDeliverer {
9799

98100
@Nullable private final ByteString transactionId;
101+
@Nullable private final com.google.protobuf.Timestamp readTime;
99102
private final long startTimeNanos;
100103
private final SettableApiFuture<AggregateQuerySnapshot> future = SettableApiFuture.create();
101104
private final AtomicBoolean isFutureCompleted = new AtomicBoolean(false);
102105

103-
AggregateQueryResponseDeliverer(@Nullable ByteString transactionId, long startTimeNanos) {
106+
AggregateQueryResponseDeliverer(
107+
@Nullable ByteString transactionId,
108+
@Nullable com.google.protobuf.Timestamp readTime,
109+
long startTimeNanos) {
104110
this.transactionId = transactionId;
111+
this.readTime = readTime;
105112
this.startTimeNanos = startTimeNanos;
106113
}
107114

108115
ApiFuture<AggregateQuerySnapshot> getFuture() {
109116
return future;
110117
}
111118

112-
@Nullable
113-
ByteString getTransactionId() {
114-
return transactionId;
115-
}
116-
117-
long getStartTimeNanos() {
118-
return startTimeNanos;
119-
}
120-
121119
void deliverResult(@Nonnull Map<String, Value> data, Timestamp readTime) {
122120
if (isFutureCompleted.compareAndSet(false, true)) {
123121
Map<String, Value> mappedData = new HashMap<>();
@@ -176,8 +174,8 @@ private boolean shouldRetry(Throwable throwable) {
176174
FirestoreSettings.newBuilder().runAggregationQuerySettings().getRetryableCodes();
177175
return query.shouldRetryQuery(
178176
throwable,
179-
responseDeliverer.getTransactionId(),
180-
responseDeliverer.getStartTimeNanos(),
177+
responseDeliverer.transactionId,
178+
responseDeliverer.startTimeNanos,
181179
retryableCodes);
182180
}
183181

@@ -193,18 +191,23 @@ public void onComplete() {}
193191
*/
194192
@Nonnull
195193
public RunAggregationQueryRequest toProto() {
196-
return toProto(null);
194+
return toProto(null, null);
197195
}
198196

199197
@Nonnull
200-
RunAggregationQueryRequest toProto(@Nullable final ByteString transactionId) {
198+
RunAggregationQueryRequest toProto(
199+
@Nullable final ByteString transactionId,
200+
@Nullable final com.google.protobuf.Timestamp readTime) {
201201
RunQueryRequest runQueryRequest = query.toProto();
202202

203203
RunAggregationQueryRequest.Builder request = RunAggregationQueryRequest.newBuilder();
204204
request.setParent(runQueryRequest.getParent());
205205
if (transactionId != null) {
206206
request.setTransaction(transactionId);
207207
}
208+
if (readTime != null) {
209+
request.setReadTime(readTime);
210+
}
208211

209212
StructuredAggregationQuery.Builder structuredAggregationQuery =
210213
request.getStructuredAggregationQueryBuilder();

google-cloud-firestore/src/main/java/com/google/cloud/firestore/FirestoreImpl.java

+33-5
Original file line numberDiff line numberDiff line change
@@ -209,13 +209,14 @@ public void getAll(
209209
final @Nonnull DocumentReference[] documentReferences,
210210
@Nullable FieldMask fieldMask,
211211
@Nonnull final ApiStreamObserver<DocumentSnapshot> apiStreamObserver) {
212-
this.getAll(documentReferences, fieldMask, null, apiStreamObserver);
212+
this.getAll(documentReferences, fieldMask, null, null, apiStreamObserver);
213213
}
214214

215215
void getAll(
216216
final @Nonnull DocumentReference[] documentReferences,
217217
@Nullable FieldMask fieldMask,
218218
@Nullable ByteString transactionId,
219+
@Nullable com.google.protobuf.Timestamp readTime,
219220
final ApiStreamObserver<DocumentSnapshot> apiStreamObserver) {
220221

221222
ResponseObserver<BatchGetDocumentsResponse> responseObserver =
@@ -304,6 +305,10 @@ public void onComplete() {
304305
request.setTransaction(transactionId);
305306
}
306307

308+
if (readTime != null) {
309+
request.setReadTime(readTime);
310+
}
311+
307312
for (DocumentReference docRef : documentReferences) {
308313
request.addDocuments(docRef.getName());
309314
}
@@ -318,17 +323,33 @@ public void onComplete() {
318323
streamRequest(request.build(), responseObserver, firestoreClient.batchGetDocumentsCallable());
319324
}
320325

326+
final ApiFuture<List<DocumentSnapshot>> getAll(
327+
final @Nonnull DocumentReference[] documentReferences,
328+
@Nullable FieldMask fieldMask,
329+
@Nullable com.google.protobuf.Timestamp readTime) {
330+
return getAll(documentReferences, fieldMask, null, readTime);
331+
}
332+
333+
private ApiFuture<List<DocumentSnapshot>> getAll(
334+
final @Nonnull DocumentReference[] documentReferences,
335+
@Nullable FieldMask fieldMask,
336+
@Nullable ByteString transactionId) {
337+
return getAll(documentReferences, fieldMask, transactionId, null);
338+
}
339+
321340
/** Internal getAll() method that accepts an optional transaction id. */
322341
ApiFuture<List<DocumentSnapshot>> getAll(
323342
final @Nonnull DocumentReference[] documentReferences,
324343
@Nullable FieldMask fieldMask,
325-
@Nullable ByteString transactionId) {
344+
@Nullable ByteString transactionId,
345+
@Nullable com.google.protobuf.Timestamp readTime) {
326346
final SettableApiFuture<List<DocumentSnapshot>> futureList = SettableApiFuture.create();
327347
final Map<DocumentReference, DocumentSnapshot> documentSnapshotMap = new HashMap<>();
328348
getAll(
329349
documentReferences,
330350
fieldMask,
331351
transactionId,
352+
readTime,
332353
new ApiStreamObserver<DocumentSnapshot>() {
333354
@Override
334355
public void onNext(DocumentSnapshot documentSnapshot) {
@@ -390,9 +411,16 @@ public <T> ApiFuture<T> runAsyncTransaction(
390411
@Nonnull final Transaction.AsyncFunction<T> updateFunction,
391412
@Nonnull TransactionOptions transactionOptions) {
392413

393-
TransactionRunner<T> transactionRunner =
394-
new TransactionRunner<>(this, updateFunction, transactionOptions);
395-
return transactionRunner.run();
414+
if (transactionOptions.getReadTime() != null) {
415+
// READ_ONLY transactions with readTime have no retry, nor transaction state, so we don't need
416+
// a runner.
417+
return updateFunction.updateCallback(
418+
new ReadTimeTransaction(this, transactionOptions.getReadTime()));
419+
} else {
420+
// For READ_ONLY transactions without readTime, there is still strong consistency applied,
421+
// that cannot be tracked client side.
422+
return new ServerSideTransactionRunner<>(this, updateFunction, transactionOptions).run();
423+
}
396424
}
397425

398426
@Nonnull

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Query.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -1784,7 +1784,7 @@ boolean shouldRetry(DocumentSnapshot lastDocument, Throwable t) {
17841784
*/
17851785
@Nonnull
17861786
public ApiFuture<QuerySnapshot> get() {
1787-
return get(null);
1787+
return get(null, null);
17881788
}
17891789

17901790
/**
@@ -1811,7 +1811,7 @@ public ListenerRegistration addSnapshotListener(
18111811
return Watch.forQuery(this).runWatch(executor, listener);
18121812
}
18131813

1814-
ApiFuture<QuerySnapshot> get(@Nullable ByteString transactionId) {
1814+
ApiFuture<QuerySnapshot> get(@Nullable ByteString transactionId, @Nullable Timestamp readTime) {
18151815
final SettableApiFuture<QuerySnapshot> result = SettableApiFuture.create();
18161816

18171817
internalStream(
@@ -1843,7 +1843,7 @@ public void onCompleted() {
18431843
},
18441844
/* startTimeNanos= */ rpcContext.getClock().nanoTime(),
18451845
transactionId,
1846-
/* readTime= */ null);
1846+
readTime);
18471847

18481848
return result;
18491849
}

0 commit comments

Comments
 (0)