-
Notifications
You must be signed in to change notification settings - Fork 231
/
Copy pathsync.go
827 lines (702 loc) · 38 KB
/
sync.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
// Copyright © 2017 Microsoft <wastore@microsoft.com>
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.
package cmd
import (
"context"
"encoding/json"
"fmt"
"runtime"
"strings"
"sync/atomic"
"time"
"github.com/Azure/azure-storage-azcopy/v10/jobsAdmin"
"github.com/Azure/azure-storage-azcopy/v10/common"
"github.com/Azure/azure-storage-azcopy/v10/ste"
"github.com/spf13/cobra"
)
type rawSyncCmdArgs struct {
src string
dst string
recursive bool
fromTo string
// options from flags
blockSizeMB float64
include string
exclude string
excludePath string
includeFileAttributes string
excludeFileAttributes string
legacyInclude string // for warning messages only
legacyExclude string // for warning messages only
includeRegex string
excludeRegex string
compareHash string
localHashStorageMode string
preservePermissions bool
preserveSMBPermissions bool // deprecated and synonymous with preservePermissions
preserveOwner bool
preserveSMBInfo bool
preservePOSIXProperties bool
followSymlinks bool
preserveSymlinks bool
backupMode bool
putMd5 bool
md5ValidationOption string
// this flag indicates the user agreement with respect to deleting the extra files at the destination
// which do not exists at source. With this flag turned on/off, users will not be asked for permission.
// otherwise the user is prompted to make a decision
deleteDestination string
// this flag is to disable comparator and overwrite files at destination irrespective
mirrorMode bool
s2sPreserveAccessTier bool
// Opt-in flag to preserve the blob index tags during service to service transfer.
s2sPreserveBlobTags bool
forceIfReadOnly bool
// Optional flag to encrypt user data with user provided key.
// Key is provide in the REST request itself
// Provided key (EncryptionKey and EncryptionKeySHA256) and its hash will be fetched from environment variables
// Set EncryptionAlgorithm = "AES256" by default.
cpkInfo bool
// Key is present in AzureKeyVault and Azure KeyVault is linked with storage account.
// Provided key name will be fetched from Azure Key Vault and will be used to encrypt the data
cpkScopeInfo string
// dry run mode bool
dryrun bool
trailingDot string
}
func (raw *rawSyncCmdArgs) parsePatterns(pattern string) (cookedPatterns []string) {
cookedPatterns = make([]string, 0)
rawPatterns := strings.Split(pattern, ";")
for _, pattern := range rawPatterns {
// skip the empty patterns
if len(pattern) != 0 {
cookedPatterns = append(cookedPatterns, pattern)
}
}
return
}
// it is assume that the given url has the SAS stripped, and safe to print
func (raw *rawSyncCmdArgs) validateURLIsNotServiceLevel(url string, location common.Location) error {
srcLevel, err := DetermineLocationLevel(url, location, true)
if err != nil {
return err
}
if srcLevel == ELocationLevel.Service() {
return fmt.Errorf("service level URLs (%s) are not supported in sync: ", url)
}
return nil
}
// validates and transform raw input into cooked input
func (raw *rawSyncCmdArgs) cook() (cookedSyncCmdArgs, error) {
cooked := cookedSyncCmdArgs{}
// set up the front end scanning logger
azcopyScanningLogger = common.NewJobLogger(azcopyCurrentJobID, azcopyLogVerbosity, azcopyLogPathFolder, "-scanning")
azcopyScanningLogger.OpenLog()
glcm.RegisterCloseFunc(func() {
azcopyScanningLogger.CloseLog()
})
// this if statement ladder remains instead of being separated to help determine valid combinations for sync
// consider making a map of valid source/dest combos and consolidating this to generic source/dest setups, akin to the lower if statement
// TODO: if expand the set of source/dest combos supported by sync, update this method the declarative test framework:
var err error
err = cooked.trailingDot.Parse(raw.trailingDot)
if err != nil {
return cooked, err
}
cooked.fromTo, err = ValidateFromTo(raw.src, raw.dst, raw.fromTo)
if err != nil {
return cooked, err
}
switch cooked.fromTo {
case common.EFromTo.Unknown():
return cooked, fmt.Errorf("Unable to infer the source '%s' / destination '%s'. ", raw.src, raw.dst)
case common.EFromTo.LocalBlob(), common.EFromTo.LocalFile(), common.EFromTo.LocalBlobFS():
cooked.destination, err = SplitResourceString(raw.dst, cooked.fromTo.To())
common.PanicIfErr(err)
case common.EFromTo.BlobLocal(), common.EFromTo.FileLocal(), common.EFromTo.BlobFSLocal():
cooked.source, err = SplitResourceString(raw.src, cooked.fromTo.From())
common.PanicIfErr(err)
case common.EFromTo.BlobBlob(), common.EFromTo.FileFile(), common.EFromTo.BlobFile(), common.EFromTo.FileBlob(), common.EFromTo.BlobFSBlobFS(), common.EFromTo.BlobFSBlob(), common.EFromTo.BlobFSFile(), common.EFromTo.BlobBlobFS(), common.EFromTo.FileBlobFS():
cooked.destination, err = SplitResourceString(raw.dst, cooked.fromTo.To())
common.PanicIfErr(err)
cooked.source, err = SplitResourceString(raw.src, cooked.fromTo.From())
common.PanicIfErr(err)
default:
return cooked, fmt.Errorf("source '%s' / destination '%s' combination '%s' not supported for sync command ", raw.src, raw.dst, cooked.fromTo)
}
// Do this check separately so we don't end up with a bunch of code duplication when new src/dstn are added
if cooked.fromTo.From() == common.ELocation.Local() {
cooked.source = common.ResourceString{Value: common.ToExtendedPath(cleanLocalPath(raw.src))}
} else if cooked.fromTo.To() == common.ELocation.Local() {
cooked.destination = common.ResourceString{Value: common.ToExtendedPath(cleanLocalPath(raw.dst))}
}
// we do not support service level sync yet
if cooked.fromTo.From().IsRemote() {
err = raw.validateURLIsNotServiceLevel(cooked.source.Value, cooked.fromTo.From())
if err != nil {
return cooked, err
}
}
// we do not support service level sync yet
if cooked.fromTo.To().IsRemote() {
err = raw.validateURLIsNotServiceLevel(cooked.destination.Value, cooked.fromTo.To())
if err != nil {
return cooked, err
}
}
// use the globally generated JobID
cooked.jobID = azcopyCurrentJobID
cooked.blockSize, err = blockSizeInBytes(raw.blockSizeMB)
if err != nil {
return cooked, err
}
if err = cooked.symlinkHandling.Determine(raw.followSymlinks, raw.preserveSymlinks); err != nil {
return cooked, err
}
cooked.recursive = raw.recursive
cooked.forceIfReadOnly = raw.forceIfReadOnly
if err = validateForceIfReadOnly(cooked.forceIfReadOnly, cooked.fromTo); err != nil {
return cooked, err
}
cooked.backupMode = raw.backupMode
if err = validateBackupMode(cooked.backupMode, cooked.fromTo); err != nil {
return cooked, err
}
// determine whether we should prompt the user to delete extra files
err = cooked.deleteDestination.Parse(raw.deleteDestination)
if err != nil {
return cooked, err
}
// warn on legacy filters
if raw.legacyInclude != "" || raw.legacyExclude != "" {
return cooked, fmt.Errorf("the include and exclude parameters have been replaced by include-pattern and exclude-pattern. They work on filenames only (not paths)")
}
// parse the filter patterns
cooked.includePatterns = raw.parsePatterns(raw.include)
cooked.excludePatterns = raw.parsePatterns(raw.exclude)
cooked.excludePaths = raw.parsePatterns(raw.excludePath)
// parse the attribute filter patterns
cooked.includeFileAttributes = raw.parsePatterns(raw.includeFileAttributes)
cooked.excludeFileAttributes = raw.parsePatterns(raw.excludeFileAttributes)
cooked.preserveSMBInfo = raw.preserveSMBInfo && areBothLocationsSMBAware(cooked.fromTo)
if err = validatePreserveSMBPropertyOption(cooked.preserveSMBInfo, cooked.fromTo, nil, "preserve-smb-info"); err != nil {
return cooked, err
}
isUserPersistingPermissions := raw.preserveSMBPermissions || raw.preservePermissions
if cooked.preserveSMBInfo && !isUserPersistingPermissions {
glcm.Info("Please note: the preserve-permissions flag is set to false, thus AzCopy will not copy SMB ACLs between the source and destination. To learn more: https://aka.ms/AzCopyandAzureFiles.")
}
if err = validatePreserveSMBPropertyOption(isUserPersistingPermissions, cooked.fromTo, nil, PreservePermissionsFlag); err != nil {
return cooked, err
}
// TODO: the check on raw.preservePermissions on the next line can be removed once we have full support for these properties in sync
// if err = validatePreserveOwner(raw.preserveOwner, cooked.fromTo); raw.preservePermissions && err != nil {
// return cooked, err
// }
cooked.preservePermissions = common.NewPreservePermissionsOption(isUserPersistingPermissions, raw.preserveOwner, cooked.fromTo)
if cooked.fromTo == common.EFromTo.BlobBlob() && cooked.preservePermissions.IsTruthy() {
cooked.isHNSToHNS = true // override HNS settings, since if a user is tx'ing blob->blob and copying permissions, it's DEFINITELY going to be HNS (since perms don't exist w/o HNS).
}
cooked.preservePOSIXProperties = raw.preservePOSIXProperties
if cooked.preservePOSIXProperties && !areBothLocationsPOSIXAware(cooked.fromTo) {
return cooked, fmt.Errorf("in order to use --preserve-posix-properties, both the source and destination must be POSIX-aware (valid pairings are Linux->Blob, Blob->Linux, Blob->Blob)")
}
if err = cooked.compareHash.Parse(raw.compareHash); err != nil {
return cooked, err
} else {
switch cooked.compareHash {
case common.ESyncHashType.MD5():
// Save any new MD5s on files we download.
raw.putMd5 = true
default: // no need to put a hash of any kind.
}
}
if err = common.LocalHashStorageMode.Parse(raw.localHashStorageMode); err != nil {
return cooked, err
}
cooked.putMd5 = raw.putMd5
if err = validatePutMd5(cooked.putMd5, cooked.fromTo); err != nil {
return cooked, err
}
err = cooked.md5ValidationOption.Parse(raw.md5ValidationOption)
if err != nil {
return cooked, err
}
if err = validateMd5Option(cooked.md5ValidationOption, cooked.fromTo); err != nil {
return cooked, err
}
if cooked.fromTo.IsS2S() {
cooked.preserveAccessTier = raw.s2sPreserveAccessTier
}
// Check if user has provided `s2s-preserve-blob-tags` flag.
// If yes, we have to ensure that both source and destination must be blob storages.
if raw.s2sPreserveBlobTags {
if cooked.fromTo.From() != common.ELocation.Blob() || cooked.fromTo.To() != common.ELocation.Blob() {
return cooked, fmt.Errorf("either source or destination is not a blob storage. " +
"blob index tags is a property of blobs only therefore both source and destination must be blob storage")
} else {
cooked.s2sPreserveBlobTags = raw.s2sPreserveBlobTags
}
}
// Setting CPK-N
cpkOptions := common.CpkOptions{}
// Setting CPK-N
if raw.cpkScopeInfo != "" {
if raw.cpkInfo {
return cooked, fmt.Errorf("cannot use both cpk-by-name and cpk-by-value at the same time")
}
cpkOptions.CpkScopeInfo = raw.cpkScopeInfo
}
// Setting CPK-V
// Get the key (EncryptionKey and EncryptionKeySHA256) value from environment variables when required.
cpkOptions.CpkInfo = raw.cpkInfo
// We only support transfer from source encrypted by user key when user wishes to download.
// Due to service limitation, S2S transfer is not supported for source encrypted by user key.
if cooked.fromTo.IsDownload() && (cpkOptions.CpkScopeInfo != "" || cpkOptions.CpkInfo) {
glcm.Info("Client Provided Key for encryption/decryption is provided for download scenario. " +
"Assuming source is encrypted.")
cpkOptions.IsSourceEncrypted = true
}
cooked.cpkOptions = cpkOptions
cooked.mirrorMode = raw.mirrorMode
cooked.includeRegex = raw.parsePatterns(raw.includeRegex)
cooked.excludeRegex = raw.parsePatterns(raw.excludeRegex)
cooked.dryrunMode = raw.dryrun
if azcopyOutputVerbosity == common.EOutputVerbosity.Quiet() || azcopyOutputVerbosity == common.EOutputVerbosity.Essential() {
if cooked.deleteDestination == common.EDeleteDestination.Prompt() {
err = fmt.Errorf("cannot set output level '%s' with delete-destination option '%s'", azcopyOutputVerbosity.String(), cooked.deleteDestination.String())
} else if cooked.dryrunMode {
err = fmt.Errorf("cannot set output level '%s' with dry-run mode", azcopyOutputVerbosity.String())
}
}
if err != nil {
return cooked, err
}
return cooked, nil
}
type cookedSyncCmdArgs struct {
// NOTE: for the 64 bit atomic functions to work on a 32 bit system, we have to guarantee the right 64-bit alignment
// so the 64 bit integers are placed first in the struct to avoid future breaks
// refer to: https://golang.org/pkg/sync/atomic/#pkg-note-BUG
// defines the number of files listed at the source and compared.
atomicSourceFilesScanned uint64
// defines the number of files listed at the destination and compared.
atomicDestinationFilesScanned uint64
// defines the scanning status of the sync operation.
// 0 means scanning is in progress and 1 means scanning is complete.
atomicScanningStatus uint32
// defines whether first part has been ordered or not.
// 0 means first part is not ordered and 1 means first part is ordered.
atomicFirstPartOrdered uint32
// deletion count keeps track of how many extra files from the destination were removed
atomicDeletionCount uint32
source common.ResourceString
destination common.ResourceString
fromTo common.FromTo
credentialInfo common.CredentialInfo
s2sSourceCredentialType common.CredentialType
isHNSToHNS bool // Because DFS sources and destinations are obscured, this is necessary for folder property transfers on ADLS Gen 2.
// filters
recursive bool
symlinkHandling common.SymlinkHandlingType
includePatterns []string
excludePatterns []string
excludePaths []string
includeFileAttributes []string
excludeFileAttributes []string
includeRegex []string
excludeRegex []string
// options
compareHash common.SyncHashType
preservePermissions common.PreservePermissionsOption
preserveSMBInfo bool
preservePOSIXProperties bool
putMd5 bool
md5ValidationOption common.HashValidationOption
blockSize int64
forceIfReadOnly bool
backupMode bool
// commandString hold the user given command which is logged to the Job log file
commandString string
// generated
jobID common.JobID
// variables used to calculate progress
// intervalStartTime holds the last time value when the progress summary was fetched
// the value of this variable is used to calculate the throughput
// it gets updated every time the progress summary is fetched
intervalStartTime time.Time
intervalBytesTransferred uint64
// used to calculate job summary
jobStartTime time.Time
// this flag is set by the enumerator
// it is useful to indicate whether we are simply waiting for the purpose of cancelling
// this is set to true once the final part has been dispatched
isEnumerationComplete bool
// this flag indicates the user agreement with respect to deleting the extra files at the destination
// which do not exists at source. With this flag turned on/off, users will not be asked for permission.
// otherwise the user is prompted to make a decision
deleteDestination common.DeleteDestination
preserveAccessTier bool
// To specify whether user wants to preserve the blob index tags during service to service transfer.
s2sPreserveBlobTags bool
cpkOptions common.CpkOptions
mirrorMode bool
dryrunMode bool
trailingDot common.TrailingDotOption
}
func (cca *cookedSyncCmdArgs) incrementDeletionCount() {
atomic.AddUint32(&cca.atomicDeletionCount, 1)
}
func (cca *cookedSyncCmdArgs) getDeletionCount() uint32 {
return atomic.LoadUint32(&cca.atomicDeletionCount)
}
// setFirstPartOrdered sets the value of atomicFirstPartOrdered to 1
func (cca *cookedSyncCmdArgs) setFirstPartOrdered() {
atomic.StoreUint32(&cca.atomicFirstPartOrdered, 1)
}
// firstPartOrdered returns the value of atomicFirstPartOrdered.
func (cca *cookedSyncCmdArgs) firstPartOrdered() bool {
return atomic.LoadUint32(&cca.atomicFirstPartOrdered) > 0
}
// setScanningComplete sets the value of atomicScanningStatus to 1.
func (cca *cookedSyncCmdArgs) setScanningComplete() {
atomic.StoreUint32(&cca.atomicScanningStatus, 1)
}
// scanningComplete returns the value of atomicScanningStatus.
func (cca *cookedSyncCmdArgs) scanningComplete() bool {
return atomic.LoadUint32(&cca.atomicScanningStatus) > 0
}
// wraps call to lifecycle manager to wait for the job to complete
// if blocking is specified to true, then this method will never return
// if blocking is specified to false, then another goroutine spawns and wait out the job
func (cca *cookedSyncCmdArgs) waitUntilJobCompletion(blocking bool) {
// print initial message to indicate that the job is starting
glcm.Init(common.GetStandardInitOutputBuilder(cca.jobID.String(), fmt.Sprintf("%s%s%s.log", azcopyLogPathFolder, common.OS_PATH_SEPARATOR, cca.jobID), false, ""))
// initialize the times necessary to track progress
cca.jobStartTime = time.Now()
cca.intervalStartTime = time.Now()
cca.intervalBytesTransferred = 0
// hand over control to the lifecycle manager if blocking
if blocking {
glcm.InitiateProgressReporting(cca)
glcm.SurrenderControl()
} else {
// non-blocking, return after spawning a go routine to watch the job
glcm.InitiateProgressReporting(cca)
}
}
func (cca *cookedSyncCmdArgs) Cancel(lcm common.LifecycleMgr) {
// prompt for confirmation, except when enumeration is complete
if !cca.isEnumerationComplete {
answer := lcm.Prompt("The enumeration (source/destination comparison) is not complete, "+
"cancelling the job at this point means it cannot be resumed.",
common.PromptDetails{
PromptType: common.EPromptType.Cancel(),
ResponseOptions: []common.ResponseOption{
common.EResponseOption.Yes(),
common.EResponseOption.No(),
},
})
if answer != common.EResponseOption.Yes() {
// user aborted cancel
return
}
}
err := cookedCancelCmdArgs{jobID: cca.jobID}.process()
if err != nil {
lcm.Error("error occurred while cancelling the job " + cca.jobID.String() + ". Failed with error " + err.Error())
}
}
type scanningProgressJsonTemplate struct {
FilesScannedAtSource uint64
FilesScannedAtDestination uint64
}
func (cca *cookedSyncCmdArgs) reportScanningProgress(lcm common.LifecycleMgr, throughput float64) {
lcm.Progress(func(format common.OutputFormat) string {
srcScanned := atomic.LoadUint64(&cca.atomicSourceFilesScanned)
dstScanned := atomic.LoadUint64(&cca.atomicDestinationFilesScanned)
if format == common.EOutputFormat.Json() {
jsonOutputTemplate := scanningProgressJsonTemplate{
FilesScannedAtSource: srcScanned,
FilesScannedAtDestination: dstScanned,
}
outputString, err := json.Marshal(jsonOutputTemplate)
common.PanicIfErr(err)
return string(outputString)
}
// text output
throughputString := ""
if cca.firstPartOrdered() {
throughputString = fmt.Sprintf(", 2-sec Throughput (Mb/s): %v", jobsAdmin.ToFixed(throughput, 4))
}
return fmt.Sprintf("%v Files Scanned at Source, %v Files Scanned at Destination%s",
srcScanned, dstScanned, throughputString)
})
}
func (cca *cookedSyncCmdArgs) getJsonOfSyncJobSummary(summary common.ListJobSummaryResponse) string {
wrapped := common.ListSyncJobSummaryResponse{ListJobSummaryResponse: summary}
wrapped.DeleteTotalTransfers = cca.getDeletionCount()
wrapped.DeleteTransfersCompleted = cca.getDeletionCount()
jsonOutput, err := json.Marshal(wrapped)
common.PanicIfErr(err)
return string(jsonOutput)
}
func (cca *cookedSyncCmdArgs) ReportProgressOrExit(lcm common.LifecycleMgr) (totalKnownCount uint32) {
duration := time.Since(cca.jobStartTime) // report the total run time of the job
var summary common.ListJobSummaryResponse
var throughput float64
var jobDone bool
// fetch a job status and compute throughput if the first part was dispatched
if cca.firstPartOrdered() {
Rpc(common.ERpcCmd.ListJobSummary(), &cca.jobID, &summary)
Rpc(common.ERpcCmd.GetJobLCMWrapper(), &cca.jobID, &lcm)
jobDone = summary.JobStatus.IsJobDone()
totalKnownCount = summary.TotalTransfers
// compute the average throughput for the last time interval
bytesInMb := float64(float64(summary.BytesOverWire-cca.intervalBytesTransferred) * 8 / float64(base10Mega))
timeElapsed := time.Since(cca.intervalStartTime).Seconds()
throughput = common.Iff(timeElapsed != 0, bytesInMb/timeElapsed, 0)
// reset the interval timer and byte count
cca.intervalStartTime = time.Now()
cca.intervalBytesTransferred = summary.BytesOverWire
}
// first part not dispatched, and we are still scanning
// so a special message is outputted to notice the user that we are not stalling
if !cca.scanningComplete() {
cca.reportScanningProgress(lcm, throughput)
return
}
lcm.Progress(func(format common.OutputFormat) string {
if format == common.EOutputFormat.Json() {
return cca.getJsonOfSyncJobSummary(summary)
}
// indicate whether constrained by disk or not
perfString, diskString := getPerfDisplayText(summary.PerfStrings, summary.PerfConstraint, duration, false)
return fmt.Sprintf("%.1f %%, %v Done, %v Failed, %v Pending, %v Total%s, 2-sec Throughput (Mb/s): %v%s",
summary.PercentComplete,
summary.TransfersCompleted,
summary.TransfersFailed,
summary.TotalTransfers-summary.TransfersCompleted-summary.TransfersFailed,
summary.TotalTransfers, perfString, jobsAdmin.ToFixed(throughput, 4), diskString)
})
if jobDone {
exitCode := common.EExitCode.Success()
if summary.TransfersFailed > 0 {
exitCode = common.EExitCode.Error()
}
lcm.Exit(func(format common.OutputFormat) string {
if format == common.EOutputFormat.Json() {
return cca.getJsonOfSyncJobSummary(summary)
}
screenStats, logStats := formatExtraStats(cca.fromTo, summary.AverageIOPS, summary.AverageE2EMilliseconds, summary.NetworkErrorPercentage, summary.ServerBusyPercentage)
output := fmt.Sprintf(
`
Job %s Summary
Files Scanned at Source: %v
Files Scanned at Destination: %v
Elapsed Time (Minutes): %v
Number of Copy Transfers for Files: %v
Number of Copy Transfers for Folder Properties: %v
Total Number Of Copy Transfers: %v
Number of Copy Transfers Completed: %v
Number of Copy Transfers Failed: %v
Number of Deletions at Destination: %v
Total Number of Bytes Transferred: %v
Total Number of Bytes Enumerated: %v
Final Job Status: %v%s%s
`,
summary.JobID.String(),
atomic.LoadUint64(&cca.atomicSourceFilesScanned),
atomic.LoadUint64(&cca.atomicDestinationFilesScanned),
jobsAdmin.ToFixed(duration.Minutes(), 4),
summary.FileTransfers,
summary.FolderPropertyTransfers,
summary.TotalTransfers,
summary.TransfersCompleted,
summary.TransfersFailed,
cca.atomicDeletionCount,
summary.TotalBytesTransferred,
summary.TotalBytesEnumerated,
summary.JobStatus,
screenStats,
formatPerfAdvice(summary.PerformanceAdvice))
jobMan, exists := jobsAdmin.JobsAdmin.JobMgr(summary.JobID)
if exists {
jobMan.Log(common.LogInfo, logStats+"\n"+output)
}
return output
}, exitCode)
}
return
}
func (cca *cookedSyncCmdArgs) process() (err error) {
ctx := context.WithValue(context.TODO(), ste.ServiceAPIVersionOverride, ste.DefaultServiceApiVersion)
err = common.SetBackupMode(cca.backupMode, cca.fromTo)
if err != nil {
return err
}
if err := common.VerifyIsURLResolvable(cca.source.Value); cca.fromTo.From().IsRemote() && err != nil {
return fmt.Errorf("failed to resolve source: %w", err)
}
if err := common.VerifyIsURLResolvable(cca.destination.Value); cca.fromTo.To().IsRemote() && err != nil {
return fmt.Errorf("failed to resolve destination: %w", err)
}
// Verifies credential type and initializes credential info.
// Note that this is for the destination.
cca.credentialInfo, _, err = GetCredentialInfoForLocation(ctx, cca.fromTo.To(), cca.destination.Value, cca.destination.SAS, false, cca.cpkOptions)
if err != nil {
return err
}
srcCredInfo, _, err := GetCredentialInfoForLocation(ctx, cca.fromTo.From(), cca.source.Value, cca.source.SAS, true, cca.cpkOptions)
if err != nil {
return err
}
cca.s2sSourceCredentialType = srcCredInfo.CredentialType
// Download is the only time our primary credential type will be based on source
if cca.fromTo.IsDownload() {
cca.credentialInfo = srcCredInfo
} else if cca.fromTo.IsS2S() {
cca.s2sSourceCredentialType = srcCredInfo.CredentialType // Assign the source credential type in S2S
}
// For OAuthToken credential, assign OAuthTokenInfo to CopyJobPartOrderRequest properly,
// the info will be transferred to STE.
if cca.credentialInfo.CredentialType.IsAzureOAuth() || srcCredInfo.CredentialType.IsAzureOAuth() {
uotm := GetUserOAuthTokenManagerInstance()
// Get token from env var or cache.
if tokenInfo, err := uotm.GetTokenInfo(ctx); err != nil {
return err
} else {
cca.credentialInfo.OAuthTokenInfo = *tokenInfo
if srcCredInfo.CredentialType.IsAzureOAuth() {
cca.credentialInfo.S2SSourceTokenCredential = common.ScopedCredential(tokenInfo, []string{common.StorageScope})
}
}
}
enumerator, err := cca.initEnumerator(ctx)
if err != nil {
return err
}
// trigger the progress reporting
if !cca.dryrunMode {
cca.waitUntilJobCompletion(false)
}
// trigger the enumeration
err = enumerator.enumerate()
if err != nil {
return err
}
return nil
}
func init() {
raw := rawSyncCmdArgs{}
// syncCmd represents the sync command
var syncCmd = &cobra.Command{
Use: "sync",
Aliases: []string{"sc", "s"},
Short: syncCmdShortDescription,
Long: syncCmdLongDescription,
Example: syncCmdExample,
Args: func(cmd *cobra.Command, args []string) error {
if len(args) != 2 {
return fmt.Errorf("2 arguments source and destination are required for this command. Number of commands passed %d", len(args))
}
raw.src = args[0]
raw.dst = args[1]
return nil
},
Run: func(cmd *cobra.Command, args []string) {
glcm.EnableInputWatcher()
if cancelFromStdin {
glcm.EnableCancelFromStdIn()
}
cooked, err := raw.cook()
if err != nil {
glcm.Error("error parsing the input given by the user. Failed with error " + err.Error())
}
cooked.commandString = copyHandlerUtil{}.ConstructCommandStringFromArgs()
err = cooked.process()
if err != nil {
glcm.Error("Cannot perform sync due to error: " + err.Error())
}
if cooked.dryrunMode {
glcm.Exit(nil, common.EExitCode.Success())
}
glcm.SurrenderControl()
},
}
rootCmd.AddCommand(syncCmd)
syncCmd.PersistentFlags().BoolVar(&raw.recursive, "recursive", true, "True by default, look into sub-directories recursively when syncing between directories. (default true).")
syncCmd.PersistentFlags().StringVar(&raw.fromTo, "from-to", "", "Optionally specifies the source destination combination. For Example: LocalBlob, BlobLocal, LocalFile, FileLocal, BlobFile, FileBlob, etc.")
// TODO: enable for copy with IfSourceNewer
// smb info/permissions can be persisted in the scenario of File -> File
syncCmd.PersistentFlags().BoolVar(&raw.preserveSMBPermissions, "preserve-smb-permissions", false, "False by default. Preserves SMB ACLs between aware resources (Azure Files). This flag applies to both files and folders, unless a file-only filter is specified (e.g. include-pattern).")
syncCmd.PersistentFlags().BoolVar(&raw.preserveSMBInfo, "preserve-smb-info", (runtime.GOOS == "windows"), "Preserves SMB property info (last write time, creation time, attribute bits)"+
" between SMB-aware resources (Windows and Azure Files). On windows, this flag will be set to true by default. If the source or destination is a "+
"volume mounted on Linux using SMB protocol, this flag will have to be explicitly set to true. Only the attribute bits supported by Azure Files "+
"will be transferred; any others will be ignored. This flag applies to both files and folders, unless a file-only filter is specified "+
"(e.g. include-pattern). The info transferred for folders is the same as that for files, except for Last Write Time which is never preserved for folders.")
syncCmd.PersistentFlags().BoolVar(&raw.preservePOSIXProperties, "preserve-posix-properties", false, "'Preserves' property info gleaned from stat or statx into object metadata.")
// TODO: enable when we support local <-> File
syncCmd.PersistentFlags().BoolVar(&raw.forceIfReadOnly, "force-if-read-only", false, "When overwriting an existing file on Windows or Azure Files, force the overwrite to work even if the existing file has its read-only attribute set")
// syncCmd.PersistentFlags().BoolVar(&raw.preserveOwner, common.PreserveOwnerFlagName, common.PreserveOwnerDefault, "Only has an effect in downloads, and only when --preserve-smb-permissions is used. If true (the default), the file Owner and Group are preserved in downloads. If set to false, --preserve-smb-permissions will still preserve ACLs but Owner and Group will be based on the user running AzCopy")
// syncCmd.PersistentFlags().BoolVar(&raw.backupMode, common.BackupModeFlagName, false, "Activates Windows' SeBackupPrivilege for uploads, or SeRestorePrivilege for downloads, to allow AzCopy to see read all files, regardless of their file system permissions, and to restore all permissions. Requires that the account running AzCopy already has these permissions (e.g. has Administrator rights or is a member of the 'Backup Operators' group). All this flag does is activate privileges that the account already has")
syncCmd.PersistentFlags().Float64Var(&raw.blockSizeMB, "block-size-mb", 0, "Use this block size (specified in MiB) when uploading to Azure Storage or downloading from Azure Storage. Default is automatically calculated based on file size. Decimal fractions are allowed (For example: 0.25).")
syncCmd.PersistentFlags().StringVar(&raw.include, "include-pattern", "", "Include only files where the name matches the pattern list. For example: *.jpg;*.pdf;exactName")
syncCmd.PersistentFlags().StringVar(&raw.exclude, "exclude-pattern", "", "Exclude files where the name matches the pattern list. For example: *.jpg;*.pdf;exactName")
syncCmd.PersistentFlags().StringVar(&raw.excludePath, "exclude-path", "", "Exclude these paths when comparing the source against the destination. "+
"This option does not support wildcard characters (*). Checks relative path prefix(For example: myFolder;myFolder/subDirName/file.pdf).")
syncCmd.PersistentFlags().StringVar(&raw.includeFileAttributes, "include-attributes", "", "(Windows only) Include only files whose attributes match the attribute list. For example: A;S;R")
syncCmd.PersistentFlags().StringVar(&raw.excludeFileAttributes, "exclude-attributes", "", "(Windows only) Exclude files whose attributes match the attribute list. For example: A;S;R")
syncCmd.PersistentFlags().StringVar(&raw.includeRegex, "include-regex", "", "Include the relative path of the files that match with the regular expressions. Separate regular expressions with ';'.")
syncCmd.PersistentFlags().StringVar(&raw.excludeRegex, "exclude-regex", "", "Exclude the relative path of the files that match with the regular expressions. Separate regular expressions with ';'.")
syncCmd.PersistentFlags().StringVar(&raw.deleteDestination, "delete-destination", "false", "Defines whether to delete extra files from the destination that are not present at the source. Could be set to true, false, or prompt. "+
"If set to prompt, the user will be asked a question before scheduling files and blobs for deletion. (default 'false').")
syncCmd.PersistentFlags().BoolVar(&raw.putMd5, "put-md5", false, "Create an MD5 hash of each file, and save the hash as the Content-MD5 property of the destination blob or file. (By default the hash is NOT created.) Only available when uploading.")
syncCmd.PersistentFlags().StringVar(&raw.md5ValidationOption, "check-md5", common.DefaultHashValidationOption.String(), "Specifies how strictly MD5 hashes should be validated when downloading. This option is only available when downloading. Available values include: NoCheck, LogOnly, FailIfDifferent, FailIfDifferentOrMissing. (default 'FailIfDifferent').")
syncCmd.PersistentFlags().BoolVar(&raw.s2sPreserveAccessTier, "s2s-preserve-access-tier", true, "Preserve access tier during service to service copy. "+
"Please refer to [Azure Blob storage: hot, cool, and archive access tiers](https://docs.microsoft.com/azure/storage/blobs/storage-blob-storage-tiers) to ensure destination storage account supports setting access tier. "+
"In the cases that setting access tier is not supported, please use s2sPreserveAccessTier=false to bypass copying access tier. (default true). ")
syncCmd.PersistentFlags().BoolVar(&raw.s2sPreserveBlobTags, "s2s-preserve-blob-tags", false, "Preserve index tags during service to service sync from one blob storage to another")
// Public Documentation: https://docs.microsoft.com/en-us/azure/storage/blobs/encryption-customer-provided-keys
// Clients making requests against Azure Blob storage have the option to provide an encryption key on a per-request basis.
// Including the encryption key on the request provides granular control over encryption settings for Blob storage operations.
// Customer-provided keys can be stored in Azure Key Vault or in another key store linked to storage account.
syncCmd.PersistentFlags().StringVar(&raw.cpkScopeInfo, "cpk-by-name", "", "Client provided key by name let clients making requests against Azure Blob storage an option to provide an encryption key on a per-request basis. Provided key name will be fetched from Azure Key Vault and will be used to encrypt the data")
syncCmd.PersistentFlags().BoolVar(&raw.cpkInfo, "cpk-by-value", false, "Client provided key by name let clients making requests against Azure Blob storage an option to provide an encryption key on a per-request basis. Provided key and its hash will be fetched from environment variables")
syncCmd.PersistentFlags().BoolVar(&raw.mirrorMode, "mirror-mode", false, "Disable last-modified-time based comparison and overwrites the conflicting files and blobs at the destination if this flag is set to true. Default is false")
syncCmd.PersistentFlags().BoolVar(&raw.dryrun, "dry-run", false, "Prints the path of files that would be copied or removed by the sync command. This flag does not copy or remove the actual files.")
syncCmd.PersistentFlags().StringVar(&raw.trailingDot, "trailing-dot", "", "'Enable' by default to treat file share related operations in a safe manner. Available options: Enable, Disable. "+
"Choose 'Disable' to go back to legacy (potentially unsafe) treatment of trailing dot files where the file service will trim any trailing dots in paths. This can result in potential data corruption if the transfer contains two paths that differ only by a trailing dot (ex: mypath and mypath.). If this flag is set to 'Disable' and AzCopy encounters a trailing dot file, it will warn customers in the scanning log but will not attempt to abort the operation."+
"If the destination does not support trailing dot files (Windows or Blob Storage), AzCopy will fail if the trailing dot file is the root of the transfer and skip any trailing dot paths encountered during enumeration.")
syncCmd.PersistentFlags().StringVar(&raw.compareHash, "compare-hash", "None", "Inform sync to rely on hashes as an alternative to LMT. Missing hashes at a remote source will throw an error. (None, MD5) Default: None")
syncCmd.PersistentFlags().StringVar(&common.LocalHashDir, "hash-meta-dir", "", "When using `--local-hash-storage-mode=HiddenFiles` you can specify an alternate directory to store hash metadata files in (as opposed to next to the related files in the source)")
syncCmd.PersistentFlags().StringVar(&raw.localHashStorageMode, "local-hash-storage-mode", common.EHashStorageMode.Default().String(), "Specify an alternative way to cache file hashes; valid options are: HiddenFiles (OS Agnostic), XAttr (Linux/MacOS only; requires user_xattr on all filesystems traversed @ source), AlternateDataStreams (Windows only; requires named streams on target volume)")
// temp, to assist users with change in param names, by providing a clearer message when these obsolete ones are accidentally used
syncCmd.PersistentFlags().StringVar(&raw.legacyInclude, "include", "", "Legacy include param. DO NOT USE")
syncCmd.PersistentFlags().StringVar(&raw.legacyExclude, "exclude", "", "Legacy exclude param. DO NOT USE")
_ = syncCmd.PersistentFlags().MarkHidden("include")
_ = syncCmd.PersistentFlags().MarkHidden("exclude")
// TODO follow sym link is not implemented, clarify behavior first
// syncCmd.PersistentFlags().BoolVar(&raw.followSymlinks, "follow-symlinks", false, "follow symbolic links when performing sync from local file system.")
// TODO sync does not support all BlobAttributes on the command line, this functionality should be added
// Deprecate the old persist-smb-permissions flag
_ = syncCmd.PersistentFlags().MarkHidden("preserve-smb-permissions")
syncCmd.PersistentFlags().BoolVar(&raw.preservePermissions, PreservePermissionsFlag, false, "False by default. Preserves ACLs between aware resources (Windows and Azure Files, or ADLS Gen 2 to ADLS Gen 2). For Hierarchical Namespace accounts, you will need a container SAS or OAuth token with Modify Ownership and Modify Permissions permissions. For downloads, you will also need the --backup flag to restore permissions where the new Owner will not be the user running AzCopy. This flag applies to both files and folders, unless a file-only filter is specified (e.g. include-pattern).")
}