Skip to content

Commit 41024fe

Browse files
authored
Added parallel blob enumeration (#1045)
* Added parallel blob enumeration * Added numOfFolders flag to benchmark commmand * Address comment about order * Fix bug surfaced by rebase * Added tool script for measuring performance * Address comments
1 parent e586092 commit 41024fe

5 files changed

+128
-55
lines changed

cmd/benchmark.go

+19-11
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ type rawBenchmarkCmdArgs struct {
4444
sizePerFile string
4545
fileCount uint
4646
deleteTestData bool
47+
numOfFolders uint
4748

4849
// options from flags
4950
blockSizeMB float64
@@ -137,7 +138,7 @@ func (raw rawBenchmarkCmdArgs) cook() (cookedCopyCmdArgs, error) {
137138
c.src = raw.target
138139
} else { // Upload
139140
// src must be string, but needs to indicate that its for benchmark and encode what we want
140-
c.src = benchmarkSourceHelper{}.ToUrl(raw.fileCount, bytesPerFile)
141+
c.src = benchmarkSourceHelper{}.ToUrl(raw.fileCount, bytesPerFile, raw.numOfFolders)
141142
c.dst, err = raw.appendVirtualDir(raw.target, virtualDir)
142143
if err != nil {
143144
return dummyCooked, err
@@ -253,35 +254,41 @@ type benchmarkSourceHelper struct{}
253254
// you want a URL that can't possibly be a real one, so we'll use that
254255
const benchmarkSourceHost = "benchmark.invalid"
255256

256-
func (h benchmarkSourceHelper) ToUrl(fileCount uint, bytesPerFile int64) string {
257-
return fmt.Sprintf("https://%s?fc=%d&bpf=%d", benchmarkSourceHost, fileCount, bytesPerFile)
257+
func (h benchmarkSourceHelper) ToUrl(fileCount uint, bytesPerFile int64, numOfFolders uint) string {
258+
return fmt.Sprintf("https://%s?fc=%d&bpf=%d&nf=%d", benchmarkSourceHost, fileCount, bytesPerFile, numOfFolders)
258259
}
259260

260-
func (h benchmarkSourceHelper) FromUrl(s string) (fileCount uint, bytesPerFile int64, err error) {
261+
func (h benchmarkSourceHelper) FromUrl(s string) (fileCount uint, bytesPerFile int64, numOfFolders uint, err error) {
261262
// TODO: consider replace with regex?
262263

263264
expectedPrefix := "https://" + benchmarkSourceHost + "?"
264265
if !strings.HasPrefix(s, expectedPrefix) {
265-
return 0, 0, errors.New("invalid benchmark source string")
266+
return 0, 0, 0, errors.New("invalid benchmark source string")
266267
}
267268
s = strings.TrimPrefix(s, expectedPrefix)
268269
pieces := strings.Split(s, "&")
269-
if len(pieces) != 2 ||
270+
if len(pieces) != 3 ||
270271
!strings.HasPrefix(pieces[0], "fc=") ||
271-
!strings.HasPrefix(pieces[1], "bpf=") {
272-
return 0, 0, errors.New("invalid benchmark source string")
272+
!strings.HasPrefix(pieces[1], "bpf=") ||
273+
!strings.HasPrefix(pieces[2], "nf=") {
274+
return 0, 0, 0, errors.New("invalid benchmark source string")
273275
}
274276
pieces[0] = strings.Split(pieces[0], "=")[1]
275277
pieces[1] = strings.Split(pieces[1], "=")[1]
278+
pieces[2] = strings.Split(pieces[2], "=")[1]
276279
fc, err := strconv.ParseUint(pieces[0], 10, 64)
277280
if err != nil {
278-
return 0, 0, err
281+
return 0, 0, 0, err
279282
}
280283
bpf, err := strconv.ParseInt(pieces[1], 10, 64)
281284
if err != nil {
282-
return 0, 0, err
285+
return 0, 0, 0, err
283286
}
284-
return uint(fc), bpf, nil
287+
nf, err := strconv.ParseUint(pieces[2], 10, 64)
288+
if err != nil {
289+
return 0, 0, 0, err
290+
}
291+
return uint(fc), bpf, uint(nf), nil
285292
}
286293

287294
var benchCmd *cobra.Command
@@ -330,6 +337,7 @@ func init() {
330337

331338
benchCmd.PersistentFlags().StringVar(&raw.sizePerFile, common.SizePerFileParam, "250M", "size of each auto-generated data file. Must be "+sizeStringDescription)
332339
benchCmd.PersistentFlags().UintVar(&raw.fileCount, common.FileCountParam, common.FileCountDefault, "number of auto-generated data files to use")
340+
benchCmd.PersistentFlags().UintVar(&raw.numOfFolders, "number-of-folders", 0, "If larger than 0, create folders to divide up the data.")
333341
benchCmd.PersistentFlags().BoolVar(&raw.deleteTestData, "delete-test-data", true, "if true, the benchmark data will be deleted at the end of the benchmark run. Set it to false if you want to keep the data at the destination - e.g. to use it for manual tests outside benchmark mode")
334342

335343
benchCmd.PersistentFlags().Float64Var(&raw.blockSizeMB, "block-size-mb", 0, "use this block size (specified in MiB). Default is automatically calculated based on file size. Decimal fractions are allowed - e.g. 0.25. Identical to the same-named parameter in the copy command")

cmd/zc_traverser_benchmark.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -28,17 +28,19 @@ import (
2828
type benchmarkTraverser struct {
2929
fileCount uint
3030
bytesPerFile int64
31+
numOfFolders uint
3132
incrementEnumerationCounter enumerationCounterFunc
3233
}
3334

3435
func newBenchmarkTraverser(source string, incrementEnumerationCounter enumerationCounterFunc) (*benchmarkTraverser, error) {
35-
fc, bpf, err := benchmarkSourceHelper{}.FromUrl(source)
36+
fc, bpf, nf, err := benchmarkSourceHelper{}.FromUrl(source)
3637
if err != nil {
3738
return nil, err
3839
}
3940
return &benchmarkTraverser{
4041
fileCount: fc,
4142
bytesPerFile: bpf,
43+
numOfFolders: nf,
4244
incrementEnumerationCounter: incrementEnumerationCounter},
4345
nil
4446
}
@@ -69,6 +71,11 @@ func (t *benchmarkTraverser) traverse(preprocessor objectMorpher, processor obje
6971
name := t.toReversedString(i) // this gives an even distribution through the namespace (compare the starting characters, for 0 to 199, when reversed or not). This is useful for performance when High Throughput Block Blob pathway does not apply
7072
relativePath := name
7173

74+
if t.numOfFolders > 0 {
75+
assignedFolder := t.toReversedString(i % t.numOfFolders)
76+
relativePath = assignedFolder + common.AZCOPY_PATH_SEPARATOR_STRING + relativePath
77+
}
78+
7279
if t.incrementEnumerationCounter != nil {
7380
t.incrementEnumerationCounter(common.EEntityType.File())
7481
}

cmd/zc_traverser_blob.go

+60-42
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package cmd
2323
import (
2424
"context"
2525
"fmt"
26+
"github.com/Azure/azure-storage-azcopy/common/parallel"
2627
"net/url"
2728
"strings"
2829

@@ -160,57 +161,74 @@ func (t *blobTraverser) traverse(preprocessor objectMorpher, processor objectPro
160161
// as a performance optimization, get an extra prefix to do pre-filtering. It's typically the start portion of a blob name.
161162
extraSearchPrefix := filterSet(filters).GetEnumerationPreFilter(t.recursive)
162163

163-
for marker := (azblob.Marker{}); marker.NotDone(); {
164-
165-
// see the TO DO in GetEnumerationPreFilter if/when we make this more directory-aware
164+
// Define how to enumerate its contents
165+
// This func must be thread safe/goroutine safe
166+
enumerateOneDir := func(dir parallel.Directory, enqueueDir func(parallel.Directory), enqueueOutput func(parallel.DirectoryEntry, error)) error {
167+
currentDirPath := dir.(string)
168+
for marker := (azblob.Marker{}); marker.NotDone(); {
169+
lResp, err := containerURL.ListBlobsHierarchySegment(t.ctx, marker, "/", azblob.ListBlobsSegmentOptions{Prefix: currentDirPath,
170+
Details: azblob.BlobListingDetails{Metadata: true}})
171+
if err != nil {
172+
return fmt.Errorf("cannot list files due to reason %s", err)
173+
}
166174

167-
// look for all blobs that start with the prefix
168-
// TODO optimize for the case where recursive is off
169-
listBlob, err := containerURL.ListBlobsFlatSegment(t.ctx, marker,
170-
azblob.ListBlobsSegmentOptions{Prefix: searchPrefix + extraSearchPrefix, Details: azblob.BlobListingDetails{Metadata: true}})
171-
if err != nil {
172-
return fmt.Errorf("cannot list blobs. Failed with error %s", err.Error())
173-
}
175+
// queue up the sub virtual directories if recursive is true
176+
if t.recursive {
177+
for _, virtualDir := range lResp.Segment.BlobPrefixes {
178+
enqueueDir(virtualDir.Name)
179+
}
180+
}
174181

175-
// process the blobs returned in this result segment
176-
for _, blobInfo := range listBlob.Segment.BlobItems {
177-
// if the blob represents a hdi folder, then skip it
178-
if util.doesBlobRepresentAFolder(blobInfo.Metadata) && !(t.includeDirectoryStubs && t.recursive) {
179-
continue
182+
// process the blobs returned in this result segment
183+
for _, blobInfo := range lResp.Segment.BlobItems {
184+
// if the blob represents a hdi folder, then skip it
185+
if util.doesBlobRepresentAFolder(blobInfo.Metadata) && !(t.includeDirectoryStubs && t.recursive) {
186+
continue
187+
}
188+
189+
relativePath := strings.TrimPrefix(blobInfo.Name, searchPrefix)
190+
adapter := blobPropertiesAdapter{blobInfo.Properties}
191+
storedObject := newStoredObject(
192+
preprocessor,
193+
getObjectNameOnly(blobInfo.Name),
194+
relativePath,
195+
common.EEntityType.File(),
196+
blobInfo.Properties.LastModified,
197+
*blobInfo.Properties.ContentLength,
198+
adapter,
199+
adapter, // adapter satisfies both interfaces
200+
common.FromAzBlobMetadataToCommonMetadata(blobInfo.Metadata),
201+
blobUrlParts.ContainerName,
202+
)
203+
enqueueOutput(storedObject, nil)
180204
}
181205

182-
relativePath := strings.TrimPrefix(blobInfo.Name, searchPrefix)
206+
marker = lResp.NextMarker
207+
}
208+
return nil
209+
}
183210

184-
// if recursive
185-
if !t.recursive && strings.Contains(relativePath, common.AZCOPY_PATH_SEPARATOR_STRING) {
186-
continue
187-
}
211+
// initiate parallel scanning, starting at the root path
212+
workerContext, cancelWorkers := context.WithCancel(t.ctx)
213+
cCrawled := parallel.Crawl(workerContext, searchPrefix+extraSearchPrefix, enumerateOneDir, enumerationParallelism)
188214

189-
adapter := blobPropertiesAdapter{blobInfo.Properties}
190-
storedObject := newStoredObject(
191-
preprocessor,
192-
getObjectNameOnly(blobInfo.Name),
193-
relativePath,
194-
common.EEntityType.File(),
195-
blobInfo.Properties.LastModified,
196-
*blobInfo.Properties.ContentLength,
197-
adapter,
198-
adapter, // adapter satisfies both interfaces
199-
common.FromAzBlobMetadataToCommonMetadata(blobInfo.Metadata),
200-
blobUrlParts.ContainerName,
201-
)
202-
203-
if t.incrementEnumerationCounter != nil {
204-
t.incrementEnumerationCounter(common.EEntityType.File())
205-
}
215+
for x := range cCrawled {
216+
item, workerError := x.Item()
217+
if workerError != nil {
218+
cancelWorkers()
219+
return workerError
220+
}
206221

207-
processErr := processIfPassedFilters(filters, storedObject, processor)
208-
if processErr != nil {
209-
return processErr
210-
}
222+
if t.incrementEnumerationCounter != nil {
223+
t.incrementEnumerationCounter(common.EEntityType.File())
211224
}
212225

213-
marker = listBlob.NextMarker
226+
object := item.(storedObject)
227+
processErr := processIfPassedFilters(filters, object, processor)
228+
if processErr != nil {
229+
cancelWorkers()
230+
return processErr
231+
}
214232
}
215233

216234
return

cmd/zt_remove_blob_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -412,7 +412,7 @@ func (s *cmdIntegrationSuite) TestRemoveBlobsWithDirectoryStubs(c *chk.C) {
412412
runCopyAndVerify(c, raw, func(err error) {
413413
c.Assert(err, chk.IsNil)
414414

415-
// there should be exactly 20 top files, no directory stubs should included
415+
// there should be exactly 20 top files, no directory stubs should be included
416416
c.Assert(len(mockedRPC.transfers), chk.Equals, 20)
417417

418418
for _, transfer := range mockedRPC.transfers {

tool_test_perf.sh

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#!/bin/bash
2+
3+
# this script provides a quick way to validate whether a change causes a performance gain versus regression
4+
5+
RunCurrent () {
6+
./azcopy_current cp 'src' 'dst' --recursive --log-level=WARNING --check-length=false
7+
}
8+
9+
RunNew () {
10+
./azcopy_new cp 'src' 'dst' --recursive --log-level=WARNING --check-length=false
11+
}
12+
13+
export AZCOPY_LOG_LOCATION=/datadrive/logs
14+
export AZCOPY_JOB_PLAN_LOCATION=/datadrive/plans
15+
16+
for i in {1..5}
17+
do
18+
# start with the new version, which might give it a disadvantage
19+
# but since we run the experiment repeately, the results average out eventually
20+
# in addition, it's better to give the disadvantage to the new version so that we can be extra sure that it's better/equal to current
21+
echo Running new version for "$i"th time >> cmd-output.txt
22+
23+
# keep the result of the run, in case any error occurs
24+
start_time=$(date +%s)
25+
RunNew >> cmd-output.txt
26+
end_time=$(date +%s)
27+
28+
# insert a record into the result CSV file
29+
# this format is used so that we can import it easily into Excel
30+
echo $i, new, $(expr "$end_time" - "$start_time") >> result.csv
31+
32+
# run the current version immedietely after
33+
echo Running current version for "$i"th time >> cmd-output.txt
34+
35+
# do the same for current version
36+
start_time=$(date +%s)
37+
RunCurrent >> cmd-output.txt
38+
end_time=$(date +%s)
39+
echo $i, current, $(expr "$end_time" - "$start_time") >> result.csv
40+
done

0 commit comments

Comments
 (0)