Skip to content

Commit 79d721d

Browse files
authored
feat: Plumb PartNamingStrategy for Parallel Composite Uploads in Transfer Manager (#2547)
* feat: Plumb PartNamingStrategy for Parallel Composite Uploads in Transfer Manager * address pr comments * fix broken builds
1 parent a63eccb commit 79d721d

File tree

3 files changed

+60
-1
lines changed

3 files changed

+60
-1
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerConfig.java

+34
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
package com.google.cloud.storage.transfermanager;
1818

19+
import static com.google.common.base.Preconditions.checkNotNull;
20+
21+
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
1922
import com.google.cloud.storage.StorageOptions;
2023
import com.google.common.base.MoreObjects;
2124
import java.util.Objects;
@@ -31,18 +34,22 @@ public final class TransferManagerConfig {
3134
private final boolean allowDivideAndConquerDownload;
3235
private final boolean allowParallelCompositeUpload;
3336

37+
private final PartNamingStrategy partNamingStrategy;
38+
3439
private final StorageOptions storageOptions;
3540

3641
TransferManagerConfig(
3742
int maxWorkers,
3843
int perWorkerBufferSize,
3944
boolean allowDivideAndConquerDownload,
4045
boolean allowParallelCompositeUpload,
46+
PartNamingStrategy partNamingStrategy,
4147
StorageOptions storageOptions) {
4248
this.maxWorkers = maxWorkers;
4349
this.perWorkerBufferSize = perWorkerBufferSize;
4450
this.allowDivideAndConquerDownload = allowDivideAndConquerDownload;
4551
this.allowParallelCompositeUpload = allowParallelCompositeUpload;
52+
this.partNamingStrategy = partNamingStrategy;
4653
this.storageOptions = storageOptions;
4754
}
4855

@@ -101,6 +108,15 @@ public StorageOptions getStorageOptions() {
101108
return storageOptions;
102109
}
103110

111+
/**
112+
* Part Naming Strategy to be used during Parallel Composite Uploads
113+
*
114+
* @see Builder#setParallelCompositeUploadPartNamingStrategy(PartNamingStrategy)
115+
*/
116+
public PartNamingStrategy getParallelCompositeUploadPartNamingStrategy() {
117+
return partNamingStrategy;
118+
}
119+
104120
/** The service object for {@link TransferManager} */
105121
public TransferManager getService() {
106122
return new TransferManagerImpl(this, DefaultQos.of(this));
@@ -169,13 +185,15 @@ public static class Builder {
169185
private boolean allowParallelCompositeUpload;
170186

171187
private StorageOptions storageOptions;
188+
private PartNamingStrategy partNamingStrategy;
172189

173190
private Builder() {
174191
this.perWorkerBufferSize = 16 * 1024 * 1024;
175192
this.maxWorkers = 2 * Runtime.getRuntime().availableProcessors();
176193
this.allowDivideAndConquerDownload = false;
177194
this.allowParallelCompositeUpload = false;
178195
this.storageOptions = StorageOptions.getDefaultInstance();
196+
this.partNamingStrategy = PartNamingStrategy.noPrefix();
179197
}
180198

181199
/**
@@ -246,6 +264,21 @@ public Builder setStorageOptions(StorageOptions storageOptions) {
246264
return this;
247265
}
248266

267+
/**
268+
* Part Naming Strategy that Transfer Manager will use during Parallel Composite Upload
269+
*
270+
* <p><i>Default Value:</i> {@link PartNamingStrategy#noPrefix()}
271+
*
272+
* @return the instance of Builder with the value for PartNamingStrategy modified.
273+
* @see TransferManagerConfig#getParallelCompositeUploadPartNamingStrategy()
274+
*/
275+
public Builder setParallelCompositeUploadPartNamingStrategy(
276+
PartNamingStrategy partNamingStrategy) {
277+
checkNotNull(partNamingStrategy);
278+
this.partNamingStrategy = partNamingStrategy;
279+
return this;
280+
}
281+
249282
/**
250283
* Creates a TransferManagerConfig object.
251284
*
@@ -257,6 +290,7 @@ public TransferManagerConfig build() {
257290
perWorkerBufferSize,
258291
allowDivideAndConquerDownload,
259292
allowParallelCompositeUpload,
293+
partNamingStrategy,
260294
storageOptions);
261295
}
262296
}

google-cloud-storage/src/main/java/com/google/cloud/storage/transfermanager/TransferManagerImpl.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,9 @@ final class TransferManagerImpl implements TransferManager {
8686
.withBufferAllocationStrategy(
8787
BufferAllocationStrategy.fixedPool(
8888
transferManagerConfig.getMaxWorkers(),
89-
transferManagerConfig.getPerWorkerBufferSize()));
89+
transferManagerConfig.getPerWorkerBufferSize()))
90+
.withPartNamingStrategy(
91+
transferManagerConfig.getParallelCompositeUploadPartNamingStrategy());
9092
storageOptions = storageOptions.toBuilder().setBlobWriteSessionConfig(pcuConfig).build();
9193
}
9294
this.pcuQueue = new ConcurrentLinkedDeque<>();

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITTransferManagerTest.java

+23
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.cloud.storage.BlobInfo;
2424
import com.google.cloud.storage.BucketInfo;
2525
import com.google.cloud.storage.DataGenerator;
26+
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
2627
import com.google.cloud.storage.Storage;
2728
import com.google.cloud.storage.Storage.BlobSourceOption;
2829
import com.google.cloud.storage.Storage.BlobWriteOption;
@@ -141,6 +142,28 @@ public void uploadFiles() throws Exception {
141142
}
142143
}
143144

145+
@Test
146+
public void uploadFilesPartNaming() throws Exception {
147+
TransferManagerConfig config =
148+
TransferManagerConfigTestingInstances.defaults(storage.getOptions())
149+
.toBuilder()
150+
.setAllowParallelCompositeUpload(true)
151+
.setPerWorkerBufferSize(128 * 1024)
152+
.setParallelCompositeUploadPartNamingStrategy(PartNamingStrategy.prefix("not-root"))
153+
.build();
154+
long size = CHUNK_THRESHOLD + 100L;
155+
try (TransferManager transferManager = config.getService();
156+
TmpFile tmpFile = DataGenerator.base64Characters().tempFile(baseDir, size)) {
157+
ParallelUploadConfig parallelUploadConfig =
158+
ParallelUploadConfig.newBuilder().setBucketName(bucket.getName()).build();
159+
UploadJob job =
160+
transferManager.uploadFiles(
161+
Collections.singletonList(tmpFile.getPath()), parallelUploadConfig);
162+
List<UploadResult> uploadResults = job.getUploadResults();
163+
assertThat(uploadResults.get(0).getStatus()).isEqualTo(TransferStatus.SUCCESS);
164+
}
165+
}
166+
144167
@Test
145168
public void uploadFilesWithOpts() throws Exception {
146169
TransferManagerConfig config =

0 commit comments

Comments
 (0)