Skip to content

Commit 8d79b8d

Browse files
authored
fix: update batch handling to ensure each operation has its own unique idempotency-token (#2905)
1 parent 2a5242e commit 8d79b8d

File tree

5 files changed

+138
-26
lines changed

5 files changed

+138
-26
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSession.java

+21-17
Original file line numberDiff line numberDiff line change
@@ -63,23 +63,27 @@ final class JsonResumableSession {
6363
new JsonResumableSessionPutTask(
6464
context, resumableWrite.getUploadId(), content, contentRange);
6565
HttpRpcContext httpRpcContext = HttpRpcContext.getInstance();
66-
httpRpcContext.newInvocationId();
67-
AtomicBoolean dirty = new AtomicBoolean(false);
68-
return Retrying.run(
69-
deps,
70-
alg,
71-
() -> {
72-
if (dirty.getAndSet(true)) {
73-
ResumableOperationResult<@Nullable StorageObject> query = query();
74-
long persistedSize = query.getPersistedSize();
75-
if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) {
76-
return query;
77-
} else {
78-
task.rewindTo(persistedSize);
66+
try {
67+
httpRpcContext.newInvocationId();
68+
AtomicBoolean dirty = new AtomicBoolean(false);
69+
return Retrying.run(
70+
deps,
71+
alg,
72+
() -> {
73+
if (dirty.getAndSet(true)) {
74+
ResumableOperationResult<@Nullable StorageObject> query = query();
75+
long persistedSize = query.getPersistedSize();
76+
if (contentRange.endOffsetEquals(persistedSize) || query.getObject() != null) {
77+
return query;
78+
} else {
79+
task.rewindTo(persistedSize);
80+
}
7981
}
80-
}
81-
return task.call();
82-
},
83-
Decoder.identity());
82+
return task.call();
83+
},
84+
Decoder.identity());
85+
} finally {
86+
httpRpcContext.clearInvocationId();
87+
}
8488
}
8589
}

google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java

+20-4
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,12 @@
4444
import com.google.api.client.util.Data;
4545
import com.google.api.services.storage.Storage;
4646
import com.google.api.services.storage.Storage.Objects.Compose;
47+
import com.google.api.services.storage.Storage.Objects.Delete;
4748
import com.google.api.services.storage.Storage.Objects.Get;
4849
import com.google.api.services.storage.Storage.Objects.Insert;
4950
import com.google.api.services.storage.Storage.Objects.Move;
51+
import com.google.api.services.storage.Storage.Objects.Patch;
52+
import com.google.api.services.storage.StorageRequest;
5053
import com.google.api.services.storage.model.Bucket;
5154
import com.google.api.services.storage.model.Bucket.RetentionPolicy;
5255
import com.google.api.services.storage.model.BucketAccessControl;
@@ -109,6 +112,7 @@ public class HttpStorageRpc implements StorageRpc {
109112
// declare this HttpStatus code here as it's not included in java.net.HttpURLConnection
110113
private static final int SC_REQUESTED_RANGE_NOT_SATISFIABLE = 416;
111114
private static final boolean IS_RECORD_EVENTS = true;
115+
private static final String X_GOOG_GCS_IDEMPOTENCY_TOKEN = "x-goog-gcs-idempotency-token";
112116

113117
private final StorageOptions options;
114118
private final Storage storage;
@@ -208,7 +212,7 @@ public void intercept(HttpRequest request) throws IOException {
208212
.filter(java.util.Objects::nonNull)
209213
.collect(JOINER);
210214
headers.set("x-goog-api-client", newValue);
211-
headers.set("x-goog-gcs-idempotency-token", invocationId);
215+
headers.set(X_GOOG_GCS_IDEMPOTENCY_TOKEN, invocationId);
212216

213217
String userAgent = headers.getUserAgent();
214218
if ((userAgent == null
@@ -247,7 +251,9 @@ public void addDelete(
247251
batches.add(storage.batch());
248252
currentBatchSize = 0;
249253
}
250-
deleteCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback));
254+
Delete call = deleteCall(storageObject, options);
255+
addIdempotencyTokenToCall(call);
256+
call.queue(batches.getLast(), toJsonCallback(callback));
251257
currentBatchSize++;
252258
} catch (IOException ex) {
253259
throw translate(ex);
@@ -264,7 +270,9 @@ public void addPatch(
264270
batches.add(storage.batch());
265271
currentBatchSize = 0;
266272
}
267-
patchCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback));
273+
Patch call = patchCall(storageObject, options);
274+
addIdempotencyTokenToCall(call);
275+
call.queue(batches.getLast(), toJsonCallback(callback));
268276
currentBatchSize++;
269277
} catch (IOException ex) {
270278
throw translate(ex);
@@ -281,7 +289,9 @@ public void addGet(
281289
batches.add(storage.batch());
282290
currentBatchSize = 0;
283291
}
284-
getCall(storageObject, options).queue(batches.getLast(), toJsonCallback(callback));
292+
Get call = getCall(storageObject, options);
293+
addIdempotencyTokenToCall(call);
294+
call.queue(batches.getLast(), toJsonCallback(callback));
285295
currentBatchSize++;
286296
} catch (IOException ex) {
287297
throw translate(ex);
@@ -310,6 +320,12 @@ public void submit() {
310320
span.end(HttpStorageRpcSpans.END_SPAN_OPTIONS);
311321
}
312322
}
323+
324+
private void addIdempotencyTokenToCall(StorageRequest<?> call) {
325+
HttpRpcContext instance = HttpRpcContext.getInstance();
326+
call.getRequestHeaders().set(X_GOOG_GCS_IDEMPOTENCY_TOKEN, instance.newInvocationId());
327+
instance.clearInvocationId();
328+
}
313329
}
314330

315331
private static <T> JsonBatchCallback<T> toJsonCallback(final RpcBatch.Callback<T> callback) {

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBatchTest.java

+53
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,15 @@
2323
import static org.junit.Assert.assertTrue;
2424
import static org.junit.Assert.fail;
2525

26+
import com.google.cloud.WriteChannel;
2627
import com.google.cloud.storage.Blob;
2728
import com.google.cloud.storage.BlobId;
2829
import com.google.cloud.storage.BlobInfo;
2930
import com.google.cloud.storage.BucketInfo;
31+
import com.google.cloud.storage.DataGenerator;
3032
import com.google.cloud.storage.Storage;
3133
import com.google.cloud.storage.Storage.BlobTargetOption;
34+
import com.google.cloud.storage.Storage.BlobWriteOption;
3235
import com.google.cloud.storage.StorageBatch;
3336
import com.google.cloud.storage.StorageBatchResult;
3437
import com.google.cloud.storage.StorageException;
@@ -40,6 +43,14 @@
4043
import com.google.cloud.storage.it.runner.annotations.StorageFixture;
4144
import com.google.cloud.storage.it.runner.registry.Generator;
4245
import com.google.common.collect.ImmutableMap;
46+
import java.io.IOException;
47+
import java.nio.ByteBuffer;
48+
import java.time.Clock;
49+
import java.time.OffsetDateTime;
50+
import java.time.ZoneOffset;
51+
import java.util.List;
52+
import java.util.stream.Collectors;
53+
import java.util.stream.IntStream;
4354
import org.junit.Before;
4455
import org.junit.Test;
4556
import org.junit.runner.RunWith;
@@ -208,4 +219,46 @@ public void testBatchRequestFail() {
208219
assertThat(e.getMessage()).contains("Invalid argument");
209220
}
210221
}
222+
223+
@Test
224+
public void batchSuccessiveUpdatesWork() {
225+
byte[] bytes = DataGenerator.base64Characters().genBytes(137);
226+
227+
List<BlobId> blobs =
228+
IntStream.range(0, 2)
229+
.mapToObj(
230+
i -> {
231+
BlobInfo info = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
232+
try (WriteChannel writer = storage.writer(info, BlobWriteOption.doesNotExist())) {
233+
writer.write(ByteBuffer.wrap(bytes));
234+
} catch (IOException e) {
235+
throw new RuntimeException(e);
236+
}
237+
return info.getBlobId();
238+
})
239+
.collect(Collectors.toList());
240+
241+
OffsetDateTime now1 = Clock.systemUTC().instant().atOffset(ZoneOffset.UTC);
242+
243+
List<Blob> update1 =
244+
storage.update(
245+
blobs.stream()
246+
.map(id -> BlobInfo.newBuilder(id).setCustomTimeOffsetDateTime(now1).build())
247+
.collect(Collectors.toList()));
248+
249+
OffsetDateTime now2 = Clock.systemUTC().instant().atOffset(ZoneOffset.UTC);
250+
List<Blob> update2 =
251+
storage.update(
252+
blobs.stream()
253+
.map(id -> BlobInfo.newBuilder(id).setCustomTimeOffsetDateTime(now2).build())
254+
.collect(Collectors.toList()));
255+
256+
assertThat(
257+
update2.stream()
258+
.filter(b -> !now2.equals(b.getCustomTimeOffsetDateTime()))
259+
.map(BlobInfo::getBlobId)
260+
.map(BlobId::toGsUtilUriWithGeneration)
261+
.collect(Collectors.toList()))
262+
.isEmpty();
263+
}
211264
}

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITHttpIdempotencyTokenTest.java

+35
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import com.google.cloud.storage.Storage.BlobTargetOption;
3434
import com.google.cloud.storage.Storage.BlobWriteOption;
3535
import com.google.cloud.storage.Storage.BucketListOption;
36+
import com.google.cloud.storage.StorageBatch;
37+
import com.google.cloud.storage.StorageBatchResult;
3638
import com.google.cloud.storage.StorageOptions;
3739
import com.google.cloud.storage.it.runner.StorageITRunner;
3840
import com.google.cloud.storage.it.runner.annotations.Backend;
@@ -42,6 +44,9 @@
4244
import com.google.common.collect.ImmutableList;
4345
import com.google.common.truth.IterableSubject;
4446
import java.nio.ByteBuffer;
47+
import java.time.Clock;
48+
import java.time.OffsetDateTime;
49+
import java.time.ZoneOffset;
4550
import java.util.Arrays;
4651
import java.util.List;
4752
import java.util.stream.Collectors;
@@ -179,4 +184,34 @@ public void resumableUpload() throws Exception {
179184
// 4. Finalize session and put final 45B
180185
assertAll(() -> subject.hasSize(4), () -> assertThat(actualXxd).isEqualTo(expectedXxd));
181186
}
187+
188+
@Test
189+
public void batch() throws Exception {
190+
BlobInfo info1 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
191+
BlobInfo info2 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
192+
BlobInfo info3 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
193+
storage.create(info1, BlobTargetOption.doesNotExist());
194+
storage.create(info2, BlobTargetOption.doesNotExist());
195+
storage.create(info3, BlobTargetOption.doesNotExist());
196+
197+
requestAuditing.clear();
198+
OffsetDateTime now = Clock.systemUTC().instant().atOffset(ZoneOffset.UTC);
199+
200+
StorageBatch batch = storage.batch();
201+
StorageBatchResult<Blob> r1 = batch.get(info1.getBlobId());
202+
StorageBatchResult<Blob> r2 =
203+
batch.update(info2.toBuilder().setCustomTimeOffsetDateTime(now).build());
204+
StorageBatchResult<Boolean> r3 = batch.delete(info3.getBlobId());
205+
206+
batch.submit();
207+
assertAll(
208+
() -> assertThat(r1).isNotNull(),
209+
() -> assertThat(r2.get().getCustomTimeOffsetDateTime()).isEqualTo(now),
210+
() -> assertThat(r3.get()).isTrue(),
211+
() -> {
212+
IterableSubject subject =
213+
requestAuditing.assertRequestHeader(X_GOOG_GCS_IDEMPOTENCY_TOKEN);
214+
subject.hasSize(3);
215+
});
216+
}
182217
}

google-cloud-storage/src/test/java/com/google/cloud/storage/spi/v1/HttpRpcContextTest.java

+9-5
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,15 @@ public void testNewInvocationId() {
4343
UUID uuid = UUID.fromString("28220dff-1e8b-4770-9e10-022c2a99d8f3");
4444
HttpRpcContext testContext = new HttpRpcContext(() -> uuid);
4545

46-
assertThat(testContext.newInvocationId()).isEqualTo(uuid);
47-
assertThat(testContext.getInvocationId()).isEqualTo(uuid);
48-
// call again to ensure the id is consistent with our supplier
49-
assertThat(testContext.newInvocationId()).isEqualTo(uuid);
50-
assertThat(testContext.getInvocationId()).isEqualTo(uuid);
46+
try {
47+
assertThat(testContext.newInvocationId()).isEqualTo(uuid);
48+
assertThat(testContext.getInvocationId()).isEqualTo(uuid);
49+
// call again to ensure the id is consistent with our supplier
50+
assertThat(testContext.newInvocationId()).isEqualTo(uuid);
51+
assertThat(testContext.getInvocationId()).isEqualTo(uuid);
52+
} finally {
53+
testContext.clearInvocationId();
54+
}
5155
}
5256

5357
@Test

0 commit comments

Comments
 (0)