Skip to content

Commit d84e255

Browse files
authored
feat: port BufferToDiskThenUpload to work with HttpStorageOptions (#2473)
1 parent e5772a4 commit d84e255

File tree

4 files changed

+65
-13
lines changed

4 files changed

+65
-13
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@
8686
* Buffer bytes to a temporary file on disk. On {@link WritableByteChannel#close() close()}
8787
* upload the entire files contents to Cloud Storage. Delete the temporary file.
8888
* </td>
89-
* <td>gRPC</td>
89+
* <td>gRPC, HTTP</td>
9090
* <td>
9191
* <ol>
9292
* <li>A Resumable Upload Session will be used to upload the file on disk.</li>
@@ -272,7 +272,7 @@ public static BidiBlobWriteSessionConfig bidiWrite() {
272272
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
273273
*/
274274
@BetaApi
275-
@TransportCompatibility({Transport.GRPC})
275+
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
276276
public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOException {
277277
return bufferToDiskThenUpload(
278278
Paths.get(System.getProperty("java.io.tmpdir"), "google-cloud-storage"));
@@ -289,7 +289,7 @@ public static BlobWriteSessionConfig bufferToTempDirThenUpload() throws IOExcept
289289
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
290290
*/
291291
@BetaApi
292-
@TransportCompatibility({Transport.GRPC})
292+
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
293293
public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IOException {
294294
return bufferToDiskThenUpload(ImmutableList.of(path));
295295
}
@@ -308,7 +308,7 @@ public static BufferToDiskThenUpload bufferToDiskThenUpload(Path path) throws IO
308308
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
309309
*/
310310
@BetaApi
311-
@TransportCompatibility({Transport.GRPC})
311+
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
312312
public static BufferToDiskThenUpload bufferToDiskThenUpload(Collection<Path> paths)
313313
throws IOException {
314314
return new BufferToDiskThenUpload(ImmutableList.copyOf(paths), false);

google-cloud-storage/src/main/java/com/google/cloud/storage/BufferToDiskThenUpload.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@
5959
*/
6060
@Immutable
6161
@BetaApi
62-
@TransportCompatibility({Transport.GRPC})
62+
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
6363
public final class BufferToDiskThenUpload extends BlobWriteSessionConfig
64-
implements BlobWriteSessionConfig.GrpcCompatible {
64+
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {
6565
private static final long serialVersionUID = 9059242302276891867L;
6666

6767
/**

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java

+53-6
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
import java.util.concurrent.TimeoutException;
9292
import java.util.function.Function;
9393
import java.util.function.Supplier;
94+
import org.checkerframework.checker.nullness.qual.NonNull;
9495
import org.checkerframework.checker.nullness.qual.Nullable;
9596

9697
final class StorageImpl extends BaseService<StorageOptions> implements Storage, StorageInternal {
@@ -147,7 +148,8 @@ public Blob create(BlobInfo blobInfo, BlobTargetOption... options) {
147148
.setMd5(EMPTY_BYTE_ARRAY_MD5)
148149
.setCrc32c(EMPTY_BYTE_ARRAY_CRC32C)
149150
.build();
150-
return internalCreate(updatedInfo, EMPTY_BYTE_ARRAY, 0, 0, options);
151+
final Opts<ObjectTargetOpt> objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo);
152+
return internalCreate(updatedInfo, EMPTY_BYTE_ARRAY, 0, 0, objectTargetOptOpts);
151153
}
152154

153155
@Override
@@ -161,7 +163,8 @@ public Blob create(BlobInfo blobInfo, byte[] content, BlobTargetOption... option
161163
BaseEncoding.base64()
162164
.encode(Ints.toByteArray(Hashing.crc32c().hashBytes(content).asInt())))
163165
.build();
164-
return internalCreate(updatedInfo, content, 0, content.length, options);
166+
final Opts<ObjectTargetOpt> objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo);
167+
return internalCreate(updatedInfo, content, 0, content.length, objectTargetOptOpts);
165168
}
166169

167170
@Override
@@ -180,7 +183,8 @@ public Blob create(
180183
Ints.toByteArray(
181184
Hashing.crc32c().hashBytes(content, offset, length).asInt())))
182185
.build();
183-
return internalCreate(updatedInfo, content, offset, length, options);
186+
final Opts<ObjectTargetOpt> objectTargetOptOpts = Opts.unwrap(options).resolveFrom(updatedInfo);
187+
return internalCreate(updatedInfo, content, offset, length, objectTargetOptOpts);
184188
}
185189

186190
@Override
@@ -203,12 +207,11 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op
203207

204208
private Blob internalCreate(
205209
BlobInfo info,
206-
final byte[] content,
210+
final byte @NonNull [] content,
207211
final int offset,
208212
final int length,
209-
BlobTargetOption... options) {
213+
Opts<ObjectTargetOpt> opts) {
210214
Preconditions.checkNotNull(content);
211-
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(info);
212215
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
213216

214217
BlobInfo updated = opts.blobInfoMapper().apply(info.toBuilder()).build();
@@ -1647,4 +1650,48 @@ public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... o
16471650
writerFactory.writeSession(this, blobInfo, opts);
16481651
return BlobWriteSessions.of(writableByteChannelSession);
16491652
}
1653+
1654+
@Override
1655+
public BlobInfo internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> opts)
1656+
throws IOException {
1657+
if (Files.isDirectory(path)) {
1658+
throw new StorageException(0, path + " is a directory");
1659+
}
1660+
long size = Files.size(path);
1661+
if (size == 0L) {
1662+
return internalCreate(info, EMPTY_BYTE_ARRAY, 0, 0, opts);
1663+
}
1664+
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
1665+
BlobInfo.Builder builder = info.toBuilder().setMd5(null).setCrc32c(null);
1666+
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
1667+
StorageObject encode = codecs.blobInfo().encode(updated);
1668+
1669+
Supplier<String> uploadIdSupplier =
1670+
ResumableMedia.startUploadForBlobInfo(
1671+
getOptions(),
1672+
updated,
1673+
optionsMap,
1674+
retryAlgorithmManager.getForResumableUploadSessionCreate(optionsMap));
1675+
JsonResumableWrite jsonResumableWrite =
1676+
JsonResumableWrite.of(encode, optionsMap, uploadIdSupplier.get(), 0);
1677+
1678+
JsonResumableSession session =
1679+
ResumableSession.json(
1680+
HttpClientContext.from(storageRpc),
1681+
getOptions().asRetryDependencies(),
1682+
retryAlgorithmManager.idempotent(),
1683+
jsonResumableWrite);
1684+
HttpContentRange contentRange =
1685+
HttpContentRange.of(ByteRangeSpec.relativeLength(0L, size), size);
1686+
ResumableOperationResult<StorageObject> put =
1687+
session.put(RewindableContent.of(path), contentRange);
1688+
// all exception translation is taken care of down in the JsonResumableSession
1689+
StorageObject object = put.getObject();
1690+
if (object == null) {
1691+
// if by some odd chance the put didn't get the StorageObject, query for it
1692+
ResumableOperationResult<StorageObject> query = session.query();
1693+
object = query.getObject();
1694+
}
1695+
return codecs.blobInfo().decode(object);
1696+
}
16501697
}

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ public void allDefaults() throws Exception {
6969
}
7070

7171
@Test
72-
@CrossRun.Exclude(transports = Transport.HTTP)
7372
public void bufferToTempDirThenUpload() throws Exception {
7473
StorageOptions options = null;
7574
if (transport == Transport.GRPC) {
@@ -78,6 +77,12 @@ public void bufferToTempDirThenUpload() throws Exception {
7877
.toBuilder()
7978
.setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload())
8079
.build();
80+
} else if (transport == Transport.HTTP) {
81+
options =
82+
((HttpStorageOptions) storage.getOptions())
83+
.toBuilder()
84+
.setBlobWriteSessionConfig(BlobWriteSessionConfigs.bufferToTempDirThenUpload())
85+
.build();
8186
}
8287
assertWithMessage("unable to resolve options").that(options).isNotNull();
8388
//noinspection DataFlowIssue

0 commit comments

Comments
 (0)