Skip to content

Commit d76b7bf

Browse files
Refactor STE: Part 3 (#1732)
* Move global status manager to jobMgr * Initialize exculsive string map before first order * Add daemon Mode to jobMgr * Notify on completion
1 parent ddec1b9 commit d76b7bf

6 files changed

+45
-26
lines changed

jobsAdmin/JobsAdmin.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ func (ja *jobsAdmin) JobMgrEnsureExists(jobID common.JobID,
294294
return ja.jobIDToJobMgr.EnsureExists(jobID,
295295
func() ste.IJobMgr {
296296
// Return existing or new IJobMgr to caller
297-
return ste.NewJobMgr(ja.concurrency, jobID, ja.appCtx, ja.cpuMonitor, level, commandString, ja.logDir, ja.concurrencyTuner, ja.pacer, ja.slicePool, ja.cacheLimiter, ja.fileCountLimiter, ja.jobLogger)
297+
return ste.NewJobMgr(ja.concurrency, jobID, ja.appCtx, ja.cpuMonitor, level, commandString, ja.logDir, ja.concurrencyTuner, ja.pacer, ja.slicePool, ja.cacheLimiter, ja.fileCountLimiter, ja.jobLogger, false)
298298
})
299299
}
300300

@@ -343,7 +343,7 @@ func (ja *jobsAdmin) ResurrectJob(jobId common.JobID, sourceSAS string, destinat
343343
}
344344
mmf := planFile.Map()
345345
jm := ja.JobMgrEnsureExists(jobID, mmf.Plan().LogLevel, "")
346-
jm.AddJobPart(partNum, planFile, mmf, sourceSAS, destinationSAS, false)
346+
jm.AddJobPart(partNum, planFile, mmf, sourceSAS, destinationSAS, false, nil)
347347
}
348348

349349
jm, _ := ja.JobMgr(jobId)
@@ -377,7 +377,7 @@ func (ja *jobsAdmin) ResurrectJobParts() {
377377
mmf := planFile.Map()
378378
//todo : call the compute transfer function here for each job.
379379
jm := ja.JobMgrEnsureExists(jobID, mmf.Plan().LogLevel, "")
380-
jm.AddJobPart(partNum, planFile, mmf, EMPTY_SAS_STRING, EMPTY_SAS_STRING, false)
380+
jm.AddJobPart(partNum, planFile, mmf, EMPTY_SAS_STRING, EMPTY_SAS_STRING, false, nil)
381381
}
382382
}
383383

jobsAdmin/init.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ func ExecuteNewCopyJobPartOrder(order common.CopyJobPartOrderRequest) common.Cop
180180
CredentialInfo: order.CredentialInfo,
181181
})
182182
// Supply no plan MMF because we don't have one, and AddJobPart will create one on its own.
183-
jm.AddJobPart(order.PartNum, jppfn, nil, order.SourceRoot.SAS, order.DestinationRoot.SAS, true) // Add this part to the Job and schedule its transfers
183+
jm.AddJobPart(order.PartNum, jppfn, nil, order.SourceRoot.SAS, order.DestinationRoot.SAS, true, nil) // Add this part to the Job and schedule its transfers
184184

185185
// Update jobPart Status with the status Manager
186186
jm.SendJobPartCreatedMsg(ste.JobPartCreatedMsg{TotalTransfers: uint32(len(order.Transfers.List)),

ste/jobStatusManager.go

+6-7
Original file line numberDiff line numberDiff line change
@@ -43,27 +43,26 @@ type jobStatusManager struct {
4343
xferDone chan xferDoneMsg
4444
}
4545

46-
var jstm jobStatusManager
47-
4846
/* These functions should not fail */
4947
func (jm *jobMgr) SendJobPartCreatedMsg(msg JobPartCreatedMsg) {
50-
jstm.partCreated <- msg
48+
jm.jstm.partCreated <- msg
5149
}
5250

5351
func (jm *jobMgr) SendXferDoneMsg(msg xferDoneMsg) {
54-
jstm.xferDone <- msg
52+
jm.jstm.xferDone <- msg
5553
}
5654

5755
func (jm *jobMgr) ListJobSummary() common.ListJobSummaryResponse {
58-
jstm.listReq <- true
59-
return <-jstm.respChan
56+
jm.jstm.listReq <- true
57+
return <-jm.jstm.respChan
6058
}
6159

6260
func (jm *jobMgr) ResurrectSummary(js common.ListJobSummaryResponse) {
63-
jstm.js = js
61+
jm.jstm.js = js
6462
}
6563

6664
func (jm *jobMgr) handleStatusUpdateMessage() {
65+
jstm := jm.jstm
6766
js := &jstm.js
6867
js.JobID = jm.jobID
6968
js.CompleteJobOrdered = false

ste/mgr-JobMgr.go

+29-14
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ type IJobMgr interface {
5454
//Throughput() XferThroughput
5555
// If existingPlanMMF is nil, a new MMF is opened.
5656
AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string,
57-
destinationSAS string, scheduleTransfers bool) IJobPartMgr
57+
destinationSAS string, scheduleTransfers bool, completionChan chan struct{}) IJobPartMgr
5858
SetIncludeExclude(map[string]int, map[string]int)
5959
IncludeExclude() (map[string]int, map[string]int)
6060
ResumeTransfers(appCtx context.Context)
@@ -97,14 +97,15 @@ type IJobMgr interface {
9797
AddSuccessfulBytesInActiveFiles(n int64)
9898
SuccessfulBytesInActiveFiles() uint64
9999
CancelPauseJobOrder(desiredJobStatus common.JobStatus) common.CancelPauseResumeResponse
100+
IsDaemon() bool
100101
}
101102

102103
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
103104

104105
func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx context.Context, cpuMon common.CPUMonitor, level common.LogLevel,
105106
commandString string, logFileFolder string, tuner ConcurrencyTuner,
106107
pacer PacerAdmin, slicePool common.ByteSlicePooler, cacheLimiter common.CacheLimiter, fileCountLimiter common.CacheLimiter,
107-
jobLogger common.ILoggerResetable) IJobMgr {
108+
jobLogger common.ILoggerResetable, daemonMode bool) IJobMgr {
108109
const channelSize = 100000
109110
// PartsChannelSize defines the number of JobParts which can be placed into the
110111
// parts channel. Any JobPart which comes from FE and partChannel is full,
@@ -128,6 +129,7 @@ func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx conte
128129

129130
/* Create book-keeping channels */
130131
jobPartProgressCh := make(chan jobPartProgressInfo)
132+
var jstm jobStatusManager
131133
jstm.respChan = make(chan common.ListJobSummaryResponse)
132134
jstm.listReq = make(chan bool)
133135
jstm.partCreated = make(chan JobPartCreatedMsg, 100)
@@ -140,7 +142,6 @@ func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx conte
140142
concurrency: concurrency,
141143
overwritePrompter: newOverwritePrompter(),
142144
pipelineNetworkStats: newPipelineNetworkStats(tuner), // let the stats coordinate with the concurrency tuner
143-
exclusiveDestinationMapHolder: &atomic.Value{},
144145
initMu: &sync.Mutex{},
145146
jobPartProgress: jobPartProgressCh,
146147
coordinatorChannels: CoordinatorChannels{
@@ -167,6 +168,8 @@ func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx conte
167168
cacheLimiter: cacheLimiter,
168169
fileCountLimiter: fileCountLimiter,
169170
cpuMon: cpuMon,
171+
jstm: &jstm,
172+
isDaemon: daemonMode,
170173
/*Other fields remain zero-value until this job is scheduled */}
171174
jm.reset(appCtx, commandString)
172175
jm.logJobsAdminMessages()
@@ -249,6 +252,7 @@ type jobMgrInitState struct {
249252
securityInfoPersistenceManager *securityInfoPersistenceManager
250253
folderCreationTracker FolderCreationTracker
251254
folderDeletionManager common.FolderDeletionManager
255+
exclusiveDestinationMapHolder *atomic.Value
252256
}
253257

254258
// jobMgr represents the runtime information for a Job
@@ -274,7 +278,6 @@ type jobMgr struct {
274278
cancel context.CancelFunc
275279
pipelineNetworkStats *PipelineNetworkStats
276280

277-
exclusiveDestinationMapHolder *atomic.Value
278281

279282
// Share the same HTTP Client across all job parts, so that the we maximize re-use of
280283
// its internal connection pool
@@ -311,10 +314,13 @@ type jobMgr struct {
311314
slicePool common.ByteSlicePooler
312315
cacheLimiter common.CacheLimiter
313316
fileCountLimiter common.CacheLimiter
317+
jstm *jobStatusManager
314318

315319
/* Pool sizer related stuff */
316320
atomicCurrentMainPoolSize int32 // align 64 bit integers for 32 bit arch
317321
atomicSuccessfulBytesInActiveFiles int64
322+
323+
isDaemon bool /* is it running as service */
318324
}
319325

320326
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@@ -395,12 +401,14 @@ func (jm *jobMgr) logPerfInfo(displayStrings []string, constraint common.PerfCon
395401

396402
// initializeJobPartPlanInfo func initializes the JobPartPlanInfo handler for given JobPartOrder
397403
func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string,
398-
destinationSAS string, scheduleTransfers bool) IJobPartMgr {
404+
destinationSAS string, scheduleTransfers bool, completionChan chan struct{}) IJobPartMgr {
399405
jpm := &jobPartMgr{jobMgr: jm, filename: planFile, sourceSAS: sourceSAS,
400406
destinationSAS: destinationSAS, pacer: jm.pacer,
401407
slicePool: jm.slicePool,
402408
cacheLimiter: jm.cacheLimiter,
403-
fileCountLimiter: jm.fileCountLimiter}
409+
fileCountLimiter: jm.fileCountLimiter,
410+
closeOnCompletion: completionChan,
411+
}
404412
// If an existing plan MMF was supplied, re use it. Otherwise, init a new one.
405413
if existingPlanMMF == nil {
406414
jpm.planMMF = jpm.filename.Map()
@@ -411,7 +419,6 @@ func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, e
411419
jm.jobPartMgrs.Set(partNum, jpm)
412420
jm.setFinalPartOrdered(partNum, jpm.planMMF.Plan().IsFinalPart)
413421
jm.setDirection(jpm.Plan().FromTo)
414-
jpm.exclusiveDestinationMap = jm.getExclusiveDestinationMap(partNum, jpm.Plan().FromTo)
415422

416423
jm.initMu.Lock()
417424
defer jm.initMu.Unlock()
@@ -421,9 +428,12 @@ func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, e
421428
securityInfoPersistenceManager: newSecurityInfoPersistenceManager(jm.ctx),
422429
folderCreationTracker: NewFolderCreationTracker(jpm.Plan().Fpo, jpm.Plan()),
423430
folderDeletionManager: common.NewFolderDeletionManager(jm.ctx, jpm.Plan().Fpo, logger),
431+
exclusiveDestinationMapHolder: &atomic.Value{},
424432
}
433+
jm.initState.exclusiveDestinationMapHolder.Store(common.NewExclusiveStringMap(jpm.Plan().FromTo, runtime.GOOS))
425434
}
426435
jpm.jobMgrInitState = jm.initState // so jpm can use it as much as desired without locking (since the only mutation is the init in jobManager. As far as jobPartManager is concerned, the init state is read-only
436+
jpm.exclusiveDestinationMap = jm.getExclusiveDestinationMap(partNum, jpm.Plan().FromTo)
427437

428438
if scheduleTransfers {
429439
// If the schedule transfer is set to true
@@ -455,7 +465,6 @@ func (jm *jobMgr) AddJobOrder(order common.CopyJobPartOrderRequest) IJobPartMgr
455465
jm.jobPartMgrs.Set(order.PartNum, jpm)
456466
jm.setFinalPartOrdered(order.PartNum, jpm.planMMF.Plan().IsFinalPart)
457467
jm.setDirection(jpm.Plan().FromTo)
458-
jpm.exclusiveDestinationMap = jm.getExclusiveDestinationMap(order.PartNum, jpm.Plan().FromTo)
459468

460469
jm.initMu.Lock()
461470
defer jm.initMu.Unlock()
@@ -465,9 +474,12 @@ func (jm *jobMgr) AddJobOrder(order common.CopyJobPartOrderRequest) IJobPartMgr
465474
securityInfoPersistenceManager: newSecurityInfoPersistenceManager(jm.ctx),
466475
folderCreationTracker: NewFolderCreationTracker(jpm.Plan().Fpo, jpm.Plan()),
467476
folderDeletionManager: common.NewFolderDeletionManager(jm.ctx, jpm.Plan().Fpo, logger),
477+
exclusiveDestinationMapHolder: &atomic.Value{},
468478
}
479+
jm.initState.exclusiveDestinationMapHolder.Store(common.NewExclusiveStringMap(jpm.Plan().FromTo, runtime.GOOS))
469480
}
470481
jpm.jobMgrInitState = jm.initState // so jpm can use it as much as desired without locking (since the only mutation is the init in jobManager. As far as jobPartManager is concerned, the init state is read-only
482+
jpm.exclusiveDestinationMap = jm.getExclusiveDestinationMap(order.PartNum, jpm.Plan().FromTo)
471483

472484
jm.QueueJobParts(jpm)
473485
return jpm
@@ -515,11 +527,7 @@ func (jm *jobMgr) setDirection(fromTo common.FromTo) {
515527

516528
// can't do this at time of constructing the jobManager, because it doesn't know fromTo at that time
517529
func (jm *jobMgr) getExclusiveDestinationMap(partNum PartNumber, fromTo common.FromTo) *common.ExclusiveStringMap {
518-
// assume that first part is ordered before any others
519-
if partNum == 0 {
520-
jm.exclusiveDestinationMapHolder.Store(common.NewExclusiveStringMap(fromTo, runtime.GOOS))
521-
}
522-
return jm.exclusiveDestinationMapHolder.Load().(*common.ExclusiveStringMap)
530+
return jm.initState.exclusiveDestinationMapHolder.Load().(*common.ExclusiveStringMap)
523531
}
524532

525533
func (jm *jobMgr) HttpClient() *http.Client {
@@ -592,6 +600,10 @@ func (jm *jobMgr) reportJobPartDoneHandler() {
592600
jobProgressInfo.transfersSkipped += partProgressInfo.transfersSkipped
593601
jobProgressInfo.transfersFailed += partProgressInfo.transfersFailed
594602

603+
if partProgressInfo.completionChan != nil {
604+
close(partProgressInfo.completionChan)
605+
}
606+
595607
// If the last part is still awaited or other parts all still not complete,
596608
// JobPart 0 status is not changed (unless we are cancelling)
597609
haveFinalPart = atomic.LoadInt32(&jm.atomicFinalPartOrderedIndicator) == 1
@@ -1002,7 +1014,10 @@ func (jm *jobMgr) CancelPauseJobOrder(desiredJobStatus common.JobStatus) common.
10021014
}
10031015
return jr
10041016
}
1005-
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
1017+
1018+
func (jm *jobMgr) IsDaemon() bool {
1019+
return jm.isDaemon
1020+
}
10061021

10071022
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
10081023

ste/mgr-JobPartMgr.go

+4
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,7 @@ type jobPartProgressInfo struct {
237237
transfersCompleted int
238238
transfersSkipped int
239239
transfersFailed int
240+
completionChan chan struct{}
240241
}
241242

242243
// jobPartMgr represents the runtime information for a Job's Part
@@ -310,6 +311,8 @@ type jobPartMgr struct {
310311
atomicTransfersSkipped uint32
311312

312313
cpkOptions common.CpkOptions
314+
315+
closeOnCompletion chan struct{}
313316
}
314317

315318
func (jpm *jobPartMgr) getOverwritePrompter() *overwritePrompter {
@@ -843,6 +846,7 @@ func (jpm *jobPartMgr) ReportTransferDone(status common.TransferStatus) (transfe
843846
transfersCompleted: int(atomic.LoadUint32(&jpm.atomicTransfersCompleted)),
844847
transfersSkipped: int(atomic.LoadUint32(&jpm.atomicTransfersSkipped)),
845848
transfersFailed: int(atomic.LoadUint32(&jpm.atomicTransfersFailed)),
849+
completionChan: jpm.closeOnCompletion,
846850
}
847851
jpm.jobMgr.ReportJobPartDone(jppi)
848852
}

ste/mgr-JobPartTransferMgr.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -762,7 +762,8 @@ func (jptm *jobPartTransferMgr) failActiveTransfer(typ transferErrorCode, descri
762762
jptm.SetErrorCode(int32(status)) // TODO: what are the rules about when this needs to be set, and doesn't need to be (e.g. for earlier failures)?
763763
// If the status code was 403, it means there was an authentication error and we exit.
764764
// User can resume the job if completely ordered with a new sas.
765-
if status == http.StatusForbidden {
765+
if status == http.StatusForbidden &&
766+
!jptm.jobPartMgr.(*jobPartMgr).jobMgr.IsDaemon() {
766767
// quit right away, since without proper authentication no work can be done
767768
// display a clear message
768769
common.GetLifecycleMgr().Info(fmt.Sprintf("Authentication failed, it is either not correct, or expired, or does not have the correct permission %s", err.Error()))

0 commit comments

Comments
 (0)