-
Notifications
You must be signed in to change notification settings - Fork 235
/
Copy pathcopyEnumeratorHelper.go
99 lines (82 loc) · 3.75 KB
/
copyEnumeratorHelper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package cmd
import (
"fmt"
"github.com/Azure/azure-pipeline-go/pipeline"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
"math/rand"
)
var EnumerationParallelism = 1
var EnumerationParallelStatFiles = false
// addTransfer accepts a new transfer, if the threshold is reached, dispatch a job part order.
func addTransfer(e *common.CopyJobPartOrderRequest, transfer common.CopyTransfer, cca *CookedCopyCmdArgs) error {
// Source and destination paths are and should be relative paths.
// dispatch the transfers once the number reaches NumOfFilesPerDispatchJobPart
// we do this so that in the case of large transfer, the transfer engine can get started
// while the frontend is still gathering more transfers
if len(e.Transfers.List) == NumOfFilesPerDispatchJobPart {
shuffleTransfers(e.Transfers.List)
resp := common.CopyJobPartOrderResponse{}
Rpc(common.ERpcCmd.CopyJobPartOrder(), (*common.CopyJobPartOrderRequest)(e), &resp)
if !resp.JobStarted {
return fmt.Errorf("copy job part order with JobId %s and part number %d failed because %s", e.JobID, e.PartNum, resp.ErrorMsg)
}
// if the current part order sent to engine is 0, then start fetching the Job Progress summary.
if e.PartNum == 0 {
cca.waitUntilJobCompletion(false)
}
e.Transfers = common.Transfers{}
e.PartNum++
}
// only append the transfer after we've checked and dispatched a part
// so that there is at least one transfer for the final part
{
// Should this block be a function?
e.Transfers.List = append(e.Transfers.List, transfer)
e.Transfers.TotalSizeInBytes += uint64(transfer.SourceSize)
switch transfer.EntityType {
case common.EEntityType.File():
e.Transfers.FileTransferCount++
case common.EEntityType.Folder():
e.Transfers.FolderTransferCount++
case common.EEntityType.Symlink():
e.Transfers.SymlinkTransferCount++
}
}
return nil
}
// this function shuffles the transfers before they are dispatched
// this is done to avoid hitting the same partition continuously in an append only pattern
// TODO this should probably be removed after the high throughput block blob feature is implemented on the service side
func shuffleTransfers(transfers []common.CopyTransfer) {
rand.Shuffle(len(transfers), func(i, j int) { transfers[i], transfers[j] = transfers[j], transfers[i] })
}
// we need to send a last part with isFinalPart set to true, along with whatever transfers that still haven't been sent
// dispatchFinalPart sends a last part with isFinalPart set to true, along with whatever transfers that still haven't been sent.
func dispatchFinalPart(e *common.CopyJobPartOrderRequest, cca *CookedCopyCmdArgs) error {
shuffleTransfers(e.Transfers.List)
e.IsFinalPart = true
var resp common.CopyJobPartOrderResponse
Rpc(common.ERpcCmd.CopyJobPartOrder(), (*common.CopyJobPartOrderRequest)(e), &resp)
if !resp.JobStarted {
// Output the log location and such
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), fmt.Sprintf("%s%s%s.log", azcopyLogPathFolder, common.OS_PATH_SEPARATOR, cca.jobID), cca.isCleanupJob, cca.cleanupJobMessage))
if cca.dryrunMode {
return nil
}
if resp.ErrorMsg == common.ECopyJobPartOrderErrorType.NoTransfersScheduledErr() {
return NothingScheduledError
}
return fmt.Errorf("copy job part order with JobId %s and part number %d failed because %s", e.JobID, e.PartNum, resp.ErrorMsg)
}
if jobsAdmin.JobsAdmin != nil {
jobsAdmin.JobsAdmin.LogToJobLog(FinalPartCreatedMessage, pipeline.LogInfo)
}
// set the flag on cca, to indicate the enumeration is done
cca.isEnumerationComplete = true
// if the current part order sent to engine is 0, then start fetching the Job Progress summary.
if e.PartNum == 0 {
cca.waitUntilJobCompletion(false)
}
return nil
}