Skip to content

Commit 8c7404d

Browse files
feat: Adds a ZeroCopy response marshaller for grpc ReadObject handling (#2489)
* feat: Adds a ZeroCopy response marshaller for grpc ReadObject handling * Update google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com> * add copyright header * Apply suggestions from code review Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com> * Extract classes to own files, more StorageClient initialization * copyright headers on new files * formatter * one more lint issue * fix: improve GrpcStorageOptions.ReadObjectResponseZeroCopyMessageMarshaller#close() handling of multiple IOExceptions * clean up ZeroCopyMarshallerTest * add gprc core to pom --------- Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com>
1 parent e2030b2 commit 8c7404d

14 files changed

+767
-78
lines changed

google-cloud-storage/pom.xml

+8
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@
9696
<groupId>com.google.protobuf</groupId>
9797
<artifactId>protobuf-java-util</artifactId>
9898
</dependency>
99+
<dependency>
100+
<groupId>io.grpc</groupId>
101+
<artifactId>grpc-core</artifactId>
102+
</dependency>
103+
<dependency>
104+
<groupId>io.grpc</groupId>
105+
<artifactId>grpc-protobuf</artifactId>
106+
</dependency>
99107
<dependency>
100108
<groupId>com.google.api.grpc</groupId>
101109
<artifactId>proto-google-common-protos</artifactId>

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicDownloadSessionBuilder.java

+11-5
Original file line numberDiff line numberDiff line change
@@ -48,19 +48,23 @@ public static GapicDownloadSessionBuilder create() {
4848
* ultimately produced channel will not do any retries of its own.
4949
*/
5050
public ReadableByteChannelSessionBuilder byteChannel(
51-
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read) {
52-
return new ReadableByteChannelSessionBuilder(read);
51+
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
52+
ResponseContentLifecycleManager responseContentLifecycleManager) {
53+
return new ReadableByteChannelSessionBuilder(read, responseContentLifecycleManager);
5354
}
5455

5556
public static final class ReadableByteChannelSessionBuilder {
5657

5758
private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
59+
private final ResponseContentLifecycleManager responseContentLifecycleManager;
5860
private boolean autoGzipDecompression;
5961
private Hasher hasher;
6062

6163
private ReadableByteChannelSessionBuilder(
62-
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read) {
64+
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
65+
ResponseContentLifecycleManager responseContentLifecycleManager) {
6366
this.read = read;
67+
this.responseContentLifecycleManager = responseContentLifecycleManager;
6468
this.hasher = Hasher.noop();
6569
this.autoGzipDecompression = false;
6670
}
@@ -100,11 +104,13 @@ public UnbufferedReadableByteChannelSessionBuilder unbuffered() {
100104
return (object, resultFuture) -> {
101105
if (autoGzipDecompression) {
102106
return new GzipReadableByteChannel(
103-
new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher),
107+
new GapicUnbufferedReadableByteChannel(
108+
resultFuture, read, object, hasher, responseContentLifecycleManager),
104109
ApiFutures.transform(
105110
resultFuture, Object::getContentEncoding, MoreExecutors.directExecutor()));
106111
} else {
107-
return new GapicUnbufferedReadableByteChannel(resultFuture, read, object, hasher);
112+
return new GapicUnbufferedReadableByteChannel(
113+
resultFuture, read, object, hasher, responseContentLifecycleManager);
108114
}
109115
};
110116
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedReadableByteChannel.java

+26-53
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,14 @@
1616

1717
package com.google.cloud.storage;
1818

19-
import static com.google.common.base.Preconditions.checkArgument;
20-
2119
import com.google.api.client.http.HttpStatusCodes;
2220
import com.google.api.core.ApiFuture;
2321
import com.google.api.core.SettableApiFuture;
2422
import com.google.api.gax.rpc.ServerStream;
2523
import com.google.api.gax.rpc.ServerStreamingCallable;
2624
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
2725
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
26+
import com.google.protobuf.ByteString;
2827
import com.google.storage.v2.ChecksummedData;
2928
import com.google.storage.v2.Object;
3029
import com.google.storage.v2.ReadObjectRequest;
@@ -46,25 +45,28 @@ final class GapicUnbufferedReadableByteChannel
4645
private final ReadObjectRequest req;
4746
private final Hasher hasher;
4847
private final LazyServerStreamIterator iter;
48+
private final ResponseContentLifecycleManager rclm;
4949

5050
private boolean open = true;
5151
private boolean complete = false;
5252
private long blobOffset;
5353

5454
private Object metadata;
5555

56-
private ByteBuffer leftovers;
56+
private ResponseContentLifecycleHandle leftovers;
5757

5858
GapicUnbufferedReadableByteChannel(
5959
SettableApiFuture<Object> result,
6060
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
6161
ReadObjectRequest req,
62-
Hasher hasher) {
62+
Hasher hasher,
63+
ResponseContentLifecycleManager rclm) {
6364
this.result = result;
6465
this.read = read;
6566
this.req = req;
6667
this.hasher = hasher;
6768
this.blobOffset = req.getReadOffset();
69+
this.rclm = rclm;
6870
this.iter = new LazyServerStreamIterator();
6971
}
7072

@@ -82,15 +84,17 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
8284
ReadCursor c = new ReadCursor(blobOffset, blobOffset + totalBufferCapacity);
8385
while (c.hasRemaining()) {
8486
if (leftovers != null) {
85-
copy(c, leftovers, dsts, offset, length);
87+
leftovers.copy(c, dsts, offset, length);
8688
if (!leftovers.hasRemaining()) {
89+
leftovers.close();
8790
leftovers = null;
8891
}
8992
continue;
9093
}
9194

9295
if (iter.hasNext()) {
9396
ReadObjectResponse resp = iter.next();
97+
ResponseContentLifecycleHandle handle = rclm.get(resp);
9498
if (resp.hasMetadata()) {
9599
Object respMetadata = resp.getMetadata();
96100
if (metadata == null) {
@@ -107,22 +111,24 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
107111
}
108112
}
109113
ChecksummedData checksummedData = resp.getChecksummedData();
110-
ByteBuffer content = checksummedData.getContent().asReadOnlyByteBuffer();
111-
// very important to know whether a crc32c value is set. Without checking, protobuf will
114+
ByteString content = checksummedData.getContent();
115+
int contentSize = content.size();
116+
// Very important to know whether a crc32c value is set. Without checking, protobuf will
112117
// happily return 0, which is a valid crc32c value.
113118
if (checksummedData.hasCrc32C()) {
114-
Crc32cLengthKnown expected =
115-
Crc32cValue.of(checksummedData.getCrc32C(), checksummedData.getContent().size());
119+
Crc32cLengthKnown expected = Crc32cValue.of(checksummedData.getCrc32C(), contentSize);
116120
try {
117-
hasher.validate(expected, content::duplicate);
121+
hasher.validate(expected, content.asReadOnlyByteBufferList());
118122
} catch (IOException e) {
119123
close();
120124
throw e;
121125
}
122126
}
123-
copy(c, content, dsts, offset, length);
124-
if (content.hasRemaining()) {
125-
leftovers = content;
127+
handle.copy(c, dsts, offset, length);
128+
if (handle.hasRemaining()) {
129+
leftovers = handle;
130+
} else {
131+
handle.close();
126132
}
127133
} else {
128134
complete = true;
@@ -144,59 +150,26 @@ public boolean isOpen() {
144150
@Override
145151
public void close() throws IOException {
146152
open = false;
147-
iter.close();
153+
try {
154+
if (leftovers != null) {
155+
leftovers.close();
156+
}
157+
} finally {
158+
iter.close();
159+
}
148160
}
149161

150162
ApiFuture<Object> getResult() {
151163
return result;
152164
}
153165

154-
private void copy(ReadCursor c, ByteBuffer content, ByteBuffer[] dsts, int offset, int length) {
155-
long copiedBytes = Buffers.copy(content, dsts, offset, length);
156-
c.advance(copiedBytes);
157-
}
158-
159166
private IOException closeWithError(String message) throws IOException {
160167
close();
161168
StorageException cause =
162169
new StorageException(HttpStatusCodes.STATUS_CODE_PRECONDITION_FAILED, message);
163170
throw new IOException(message, cause);
164171
}
165172

166-
/**
167-
* Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of
168-
* {@link #read}
169-
*/
170-
private static final class ReadCursor {
171-
private final long beginning;
172-
private long offset;
173-
private final long limit;
174-
175-
private ReadCursor(long beginning, long limit) {
176-
this.limit = limit;
177-
this.beginning = beginning;
178-
this.offset = beginning;
179-
}
180-
181-
public boolean hasRemaining() {
182-
return limit - offset > 0;
183-
}
184-
185-
public void advance(long incr) {
186-
checkArgument(incr >= 0);
187-
offset += incr;
188-
}
189-
190-
public long read() {
191-
return offset - beginning;
192-
}
193-
194-
@Override
195-
public String toString() {
196-
return String.format("ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit);
197-
}
198-
}
199-
200173
private final class LazyServerStreamIterator implements Iterator<ReadObjectResponse>, Closeable {
201174
private ServerStream<ReadObjectResponse> serverStream;
202175
private Iterator<ReadObjectResponse> responseIterator;

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,18 @@
2828
final class GrpcBlobReadChannel extends BaseStorageReadChannel<Object> {
2929

3030
private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read;
31+
private final ResponseContentLifecycleManager responseContentLifecycleManager;
3132
private final ReadObjectRequest request;
3233
private final boolean autoGzipDecompression;
3334

3435
GrpcBlobReadChannel(
3536
ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> read,
37+
ResponseContentLifecycleManager responseContentLifecycleManager,
3638
ReadObjectRequest request,
3739
boolean autoGzipDecompression) {
3840
super(Conversions.grpc().blobInfo());
3941
this.read = read;
42+
this.responseContentLifecycleManager = responseContentLifecycleManager;
4043
this.request = request;
4144
this.autoGzipDecompression = autoGzipDecompression;
4245
}
@@ -53,7 +56,7 @@ protected LazyReadChannel<?, Object> newLazyReadChannel() {
5356
ReadableByteChannelSessionBuilder b =
5457
ResumableMedia.gapic()
5558
.read()
56-
.byteChannel(read)
59+
.byteChannel(read, responseContentLifecycleManager)
5760
.setHasher(Hasher.noop())
5861
.setAutoGzipDecompression(autoGzipDecompression);
5962
BufferHandle bufferHandle = getBufferHandle();

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,7 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
180180
.collect(ImmutableSet.toImmutableSet())));
181181

182182
final StorageClient storageClient;
183+
final ResponseContentLifecycleManager responseContentLifecycleManager;
183184
final WriterFactory writerFactory;
184185
final GrpcConversions codecs;
185186
final GrpcRetryAlgorithmManager retryAlgorithmManager;
@@ -192,10 +193,12 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
192193
GrpcStorageImpl(
193194
GrpcStorageOptions options,
194195
StorageClient storageClient,
196+
ResponseContentLifecycleManager responseContentLifecycleManager,
195197
WriterFactory writerFactory,
196198
Opts<UserProject> defaultOpts) {
197199
super(options);
198200
this.storageClient = storageClient;
201+
this.responseContentLifecycleManager = responseContentLifecycleManager;
199202
this.writerFactory = writerFactory;
200203
this.defaultOpts = defaultOpts;
201204
this.codecs = Conversions.grpc();
@@ -716,8 +719,10 @@ public GrpcBlobReadChannel reader(BlobId blob, BlobSourceOption... options) {
716719
ReadObjectRequest request = getReadObjectRequest(blob, opts);
717720
Set<StatusCode.Code> codes = resultRetryAlgorithmToCodes(retryAlgorithmManager.getFor(request));
718721
GrpcCallContext grpcCallContext = Retrying.newCallContext().withRetryableCodes(codes);
722+
719723
return new GrpcBlobReadChannel(
720724
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
725+
responseContentLifecycleManager,
721726
request,
722727
!opts.autoGzipDecompression());
723728
}
@@ -1868,7 +1873,9 @@ private UnbufferedReadableByteChannelSession<Object> unbufferedReadSession(
18681873
opts.grpcMetadataMapper().apply(Retrying.newCallContext().withRetryableCodes(codes));
18691874
return ResumableMedia.gapic()
18701875
.read()
1871-
.byteChannel(storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext))
1876+
.byteChannel(
1877+
storageClient.readObjectCallable().withDefaultCallContext(grpcCallContext),
1878+
responseContentLifecycleManager)
18721879
.setAutoGzipDecompression(!opts.autoGzipDecompression())
18731880
.unbuffered()
18741881
.setReadObjectRequest(readObjectRequest)

0 commit comments

Comments
 (0)