Skip to content

Commit 3bf6026

Browse files
authored
feat: port ParallelCompositeUploadBlobWriteSessionConfig to work with HttpStorageOptions (#2474)
1 parent d84e255 commit 3bf6026

8 files changed

+286
-13
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@
142142
* Break the stream of bytes into smaller part objects uploading each part in parallel. Then
143143
* composing the parts together to make the ultimate object.
144144
* </td>
145-
* <td>gRPC</td>
145+
* <td>gRPC, HTTP</td>
146146
* <td>
147147
* <ol>
148148
* <li>
@@ -342,7 +342,7 @@ public static JournalingBlobWriteSessionConfig journaling(Collection<Path> paths
342342
* @since 2.28.0 This new api is in preview and is subject to breaking changes.
343343
*/
344344
@BetaApi
345-
@TransportCompatibility({Transport.GRPC})
345+
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
346346
public static ParallelCompositeUploadBlobWriteSessionConfig parallelCompositeUpload() {
347347
return ParallelCompositeUploadBlobWriteSessionConfig.withDefaults();
348348
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -118,9 +118,9 @@
118118
*/
119119
@Immutable
120120
@BetaApi
121-
@TransportCompatibility({Transport.GRPC})
121+
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
122122
public final class ParallelCompositeUploadBlobWriteSessionConfig extends BlobWriteSessionConfig
123-
implements BlobWriteSessionConfig.GrpcCompatible {
123+
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {
124124

125125
private static final int MAX_PARTS_PER_COMPOSE = 32;
126126
private final int maxPartsPerCompose;

google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java

+9-1
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,10 @@ private BlobInfo definePart(BlobInfo ultimateObject, PartRange partRange, long o
431431
PART_INDEX.appendTo(partRange, builder);
432432
OBJECT_OFFSET.appendTo(offset, builder);
433433
b.setMetadata(builder.build());
434+
// the value of a kms key name will contain the exact version when read from gcs
435+
// however, gcs will not accept that version resource identifier when creating a new object
436+
// strip it out, so it can be included as a query string parameter instead
437+
b.setKmsKeyName(null);
434438
b = partMetadataFieldDecorator.apply(b);
435439
return b.build();
436440
}
@@ -507,7 +511,11 @@ private ApiFuture<Boolean> deleteAsync(BlobId id) {
507511
@VisibleForTesting
508512
@NonNull
509513
static Opts<ObjectTargetOpt> getPartOpts(Opts<ObjectTargetOpt> opts) {
510-
return opts.filter(TO_EXCLUDE_FROM_PARTS).prepend(DOES_NOT_EXIST);
514+
return opts.filter(TO_EXCLUDE_FROM_PARTS)
515+
.prepend(DOES_NOT_EXIST)
516+
// disable gzip transfer encoding for HTTP, it causes a significant bottleneck uploading
517+
// the parts
518+
.prepend(Opts.from(UnifiedOpts.disableGzipContent()));
511519
}
512520

513521
@VisibleForTesting
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
20+
import java.io.IOException;
21+
import java.io.InputStream;
22+
import java.nio.ByteBuffer;
23+
import java.nio.channels.GatheringByteChannel;
24+
25+
/**
26+
* Facade which makes an instance of {@link RewindableContent} appear as an input stream.
27+
*
28+
* <p>It does this by calling {@link RewindableContent#writeTo(GatheringByteChannel)} on an
29+
* anonymous channel which closes over the read destination.
30+
*/
31+
final class RewindableContentInputStream extends InputStream {
32+
33+
private final RewindableContent content;
34+
35+
RewindableContentInputStream(RewindableContent content) {
36+
this.content = content;
37+
}
38+
39+
@Override
40+
public int read() throws IOException {
41+
byte[] tmp = new byte[1];
42+
int read = read(tmp);
43+
if (read == -1) {
44+
return -1;
45+
} else {
46+
return tmp[0] & 0xFF;
47+
}
48+
}
49+
50+
@Override
51+
public int read(byte[] b, int off, int len) throws IOException {
52+
// define a byte buffer as the destination for our write
53+
ByteBuffer dst = ByteBuffer.wrap(b, off, len);
54+
int remaining = dst.remaining();
55+
if (remaining == 0) {
56+
return 0;
57+
}
58+
long written =
59+
content.writeTo(
60+
new AnonWritableByteChannel() {
61+
@Override
62+
public long write(ByteBuffer[] srcs, int offset, int length) {
63+
// srcs here is the bytes of content
64+
long total = 0;
65+
for (int i = offset; i < length; i++) {
66+
ByteBuffer src = srcs[i];
67+
// copy what we can from our src to the dst buffer
68+
long written = Buffers.copy(src, dst);
69+
total += written;
70+
}
71+
return total;
72+
}
73+
});
74+
// if the dst has space, but we didn't write anything means we didn't have anything to write
75+
if (written == 0) {
76+
return -1;
77+
}
78+
return Math.toIntExact(written);
79+
}
80+
81+
private abstract static class AnonWritableByteChannel implements UnbufferedWritableByteChannel {
82+
83+
@Override
84+
public boolean isOpen() {
85+
return true;
86+
}
87+
88+
@Override
89+
public void close() {}
90+
}
91+
}

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java

+68
Original file line numberDiff line numberDiff line change
@@ -1694,4 +1694,72 @@ public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOp
16941694
}
16951695
return codecs.blobInfo().decode(object);
16961696
}
1697+
1698+
@Override
1699+
public BlobInfo internalDirectUpload(BlobInfo info, Opts<ObjectTargetOpt> opts, ByteBuffer buf) {
1700+
1701+
BlobInfo.Builder builder =
1702+
info.toBuilder()
1703+
.setMd5(
1704+
BaseEncoding.base64().encode(Hashing.md5().hashBytes(buf.duplicate()).asBytes()))
1705+
.setCrc32c(
1706+
BaseEncoding.base64()
1707+
.encode(Ints.toByteArray(Hashing.crc32c().hashBytes(buf.duplicate()).asInt())));
1708+
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
1709+
1710+
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
1711+
final StorageObject encoded = codecs.blobInfo().encode(updated);
1712+
ResultRetryAlgorithm<?> algorithm =
1713+
retryAlgorithmManager.getForObjectsCreate(encoded, optionsMap);
1714+
RewindableContent content = RewindableContent.of(buf);
1715+
return run(
1716+
algorithm,
1717+
() -> {
1718+
content.rewindTo(0);
1719+
return storageRpc.create(encoded, new RewindableContentInputStream(content), optionsMap);
1720+
},
1721+
Conversions.json().blobInfo()::decode);
1722+
}
1723+
1724+
/**
1725+
* Behavioral difference compared to {@link #delete(BlobId, BlobSourceOption...)} instead of
1726+
* returning false when an object does not exist, we throw an exception.
1727+
*/
1728+
@Override
1729+
public Void internalObjectDelete(BlobId id, Opts<ObjectSourceOpt> opts) {
1730+
final StorageObject storageObject = codecs.blobId().encode(id);
1731+
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
1732+
ResultRetryAlgorithm<?> algorithm =
1733+
retryAlgorithmManager.getForObjectsDelete(storageObject, optionsMap);
1734+
return run(
1735+
algorithm,
1736+
() -> {
1737+
boolean deleted = storageRpc.delete(storageObject, optionsMap);
1738+
// HttpStorageRpc turns a 404 into false, our code needs to know 404
1739+
if (!deleted) {
1740+
throw new StorageException(404, "NOT_FOUND", null, null);
1741+
}
1742+
return null;
1743+
},
1744+
Function.identity());
1745+
}
1746+
1747+
@Override
1748+
public BlobInfo internalObjectGet(BlobId blobId, Opts<ObjectSourceOpt> opts) {
1749+
StorageObject storedObject = codecs.blobId().encode(blobId);
1750+
ImmutableMap<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
1751+
ResultRetryAlgorithm<?> algorithm =
1752+
retryAlgorithmManager.getForObjectsGet(storedObject, optionsMap);
1753+
return run(
1754+
algorithm,
1755+
() -> {
1756+
StorageObject storageObject = storageRpc.get(storedObject, optionsMap);
1757+
// HttpStorageRpc turns a 404 into null, our code needs to know 404
1758+
if (storageObject == null) {
1759+
throw new StorageException(404, "NOT_FOUND", null, null);
1760+
}
1761+
return storageObject;
1762+
},
1763+
codecs.blobInfo()::decode);
1764+
}
16971765
}

google-cloud-storage/src/main/java/com/google/cloud/storage/spi/v1/HttpStorageRpc.java

+10-7
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.google.api.client.json.jackson2.JacksonFactory;
4444
import com.google.api.client.util.Data;
4545
import com.google.api.services.storage.Storage;
46+
import com.google.api.services.storage.Storage.Objects.Compose;
4647
import com.google.api.services.storage.Storage.Objects.Get;
4748
import com.google.api.services.storage.Storage.Objects.Insert;
4849
import com.google.api.services.storage.model.Bucket;
@@ -755,13 +756,15 @@ public StorageObject compose(
755756
Span span = startSpan(HttpStorageRpcSpans.SPAN_NAME_COMPOSE);
756757
Scope scope = tracer.withSpan(span);
757758
try {
758-
return storage
759-
.objects()
760-
.compose(target.getBucket(), target.getName(), request)
761-
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(targetOptions))
762-
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(targetOptions))
763-
.setUserProject(Option.USER_PROJECT.getString(targetOptions))
764-
.execute();
759+
Compose compose =
760+
storage
761+
.objects()
762+
.compose(target.getBucket(), target.getName(), request)
763+
.setIfMetagenerationMatch(Option.IF_METAGENERATION_MATCH.getLong(targetOptions))
764+
.setIfGenerationMatch(Option.IF_GENERATION_MATCH.getLong(targetOptions))
765+
.setUserProject(Option.USER_PROJECT.getString(targetOptions));
766+
setEncryptionHeaders(compose.getRequestHeaders(), ENCRYPTION_KEY_PREFIX, targetOptions);
767+
return compose.execute();
765768
} catch (IOException ex) {
766769
span.setStatus(Status.UNKNOWN.withDescription(ex.getMessage()));
767770
throw translate(ex);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import static com.google.cloud.storage.ByteSizeConstants._256KiB;
20+
import static com.google.cloud.storage.TestUtils.xxd;
21+
import static com.google.common.truth.Truth.assertThat;
22+
23+
import com.google.protobuf.ByteString;
24+
import java.io.IOException;
25+
import java.nio.ByteBuffer;
26+
import org.junit.Test;
27+
28+
public final class RewindableContentInputStreamTest {
29+
30+
@Test
31+
public void read_empty() throws IOException {
32+
RewindableContent content = RewindableContent.empty();
33+
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
34+
int read = in.read();
35+
assertThat(read).isEqualTo(-1);
36+
}
37+
}
38+
39+
@Test
40+
public void readB_emptySrc() throws IOException {
41+
RewindableContent content = RewindableContent.empty();
42+
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
43+
int read = in.read(new byte[1]);
44+
assertThat(read).isEqualTo(-1);
45+
}
46+
}
47+
48+
@Test
49+
public void readB_emptyDst() throws IOException {
50+
byte[] bytes = DataGenerator.base64Characters().genBytes(1);
51+
RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes));
52+
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
53+
byte[] tmp = new byte[0];
54+
int read = in.read(tmp);
55+
assertThat(read).isEqualTo(0);
56+
}
57+
}
58+
59+
@Test
60+
public void readB_singleByte() throws IOException {
61+
byte[] bytes = DataGenerator.base64Characters().genBytes(1);
62+
RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes));
63+
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
64+
byte[] tmp = new byte[_256KiB];
65+
int read = in.read(tmp);
66+
assertThat(read).isEqualTo(1);
67+
assertThat(tmp[0]).isEqualTo(bytes[0]);
68+
}
69+
}
70+
71+
@Test
72+
public void read_singleByte() throws IOException {
73+
byte[] bytes = DataGenerator.base64Characters().genBytes(1);
74+
RewindableContent content = RewindableContent.of(ByteBuffer.wrap(bytes));
75+
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
76+
int read = in.read();
77+
assertThat(read).isEqualTo(bytes[0]);
78+
}
79+
}
80+
81+
@Test
82+
public void readB_multiContent() throws IOException {
83+
byte[] bytes = DataGenerator.base64Characters().genBytes(30);
84+
RewindableContent content =
85+
RewindableContent.of(
86+
ByteBuffer.wrap(bytes, 0, 10),
87+
ByteBuffer.wrap(bytes, 10, 10),
88+
ByteBuffer.wrap(bytes, 20, 10));
89+
try (RewindableContentInputStream in = new RewindableContentInputStream(content)) {
90+
byte[] tmp = new byte[_256KiB];
91+
int read = in.read(tmp);
92+
assertThat(read).isEqualTo(30);
93+
assertThat(xxd(ByteString.copyFrom(tmp, 0, read))).isEqualTo(xxd(bytes));
94+
}
95+
}
96+
}

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITParallelCompositeUploadBlobWriteSessionConfigTest.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.cloud.storage.BucketInfo;
3232
import com.google.cloud.storage.DataGenerator;
3333
import com.google.cloud.storage.GrpcStorageOptions;
34+
import com.google.cloud.storage.HttpStorageOptions;
3435
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig;
3536
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferAllocationStrategy;
3637
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier;
@@ -73,7 +74,7 @@
7374

7475
@RunWith(StorageITRunner.class)
7576
@CrossRun(
76-
transports = {Transport.GRPC},
77+
transports = {Transport.HTTP, Transport.GRPC},
7778
backends = {Backend.PROD})
7879
public final class ITParallelCompositeUploadBlobWriteSessionConfigTest {
7980

@@ -125,6 +126,12 @@ public void setUp() throws Exception {
125126
.toBuilder()
126127
.setBlobWriteSessionConfig(pcu)
127128
.build();
129+
} else if (transport == Transport.HTTP) {
130+
storageOptions =
131+
((HttpStorageOptions) injectedStorage.getOptions())
132+
.toBuilder()
133+
.setBlobWriteSessionConfig(pcu)
134+
.build();
128135
}
129136
assertWithMessage("unable to resolve options").that(storageOptions).isNotNull();
130137
//noinspection DataFlowIssue

0 commit comments

Comments
 (0)