Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor STE: Part 3 #1732

Merged
merged 4 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions jobsAdmin/JobsAdmin.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ func (ja *jobsAdmin) JobMgrEnsureExists(jobID common.JobID,
return ja.jobIDToJobMgr.EnsureExists(jobID,
func() ste.IJobMgr {
// Return existing or new IJobMgr to caller
return ste.NewJobMgr(ja.concurrency, jobID, ja.appCtx, ja.cpuMonitor, level, commandString, ja.logDir, ja.concurrencyTuner, ja.pacer, ja.slicePool, ja.cacheLimiter, ja.fileCountLimiter, ja.jobLogger)
return ste.NewJobMgr(ja.concurrency, jobID, ja.appCtx, ja.cpuMonitor, level, commandString, ja.logDir, ja.concurrencyTuner, ja.pacer, ja.slicePool, ja.cacheLimiter, ja.fileCountLimiter, ja.jobLogger, false)
})
}

Expand Down Expand Up @@ -343,7 +343,7 @@ func (ja *jobsAdmin) ResurrectJob(jobId common.JobID, sourceSAS string, destinat
}
mmf := planFile.Map()
jm := ja.JobMgrEnsureExists(jobID, mmf.Plan().LogLevel, "")
jm.AddJobPart(partNum, planFile, mmf, sourceSAS, destinationSAS, false)
jm.AddJobPart(partNum, planFile, mmf, sourceSAS, destinationSAS, false, nil)
}

jm, _ := ja.JobMgr(jobId)
Expand Down Expand Up @@ -377,7 +377,7 @@ func (ja *jobsAdmin) ResurrectJobParts() {
mmf := planFile.Map()
//todo : call the compute transfer function here for each job.
jm := ja.JobMgrEnsureExists(jobID, mmf.Plan().LogLevel, "")
jm.AddJobPart(partNum, planFile, mmf, EMPTY_SAS_STRING, EMPTY_SAS_STRING, false)
jm.AddJobPart(partNum, planFile, mmf, EMPTY_SAS_STRING, EMPTY_SAS_STRING, false, nil)
}
}

Expand Down
2 changes: 1 addition & 1 deletion jobsAdmin/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func ExecuteNewCopyJobPartOrder(order common.CopyJobPartOrderRequest) common.Cop
CredentialInfo: order.CredentialInfo,
})
// Supply no plan MMF because we don't have one, and AddJobPart will create one on its own.
jm.AddJobPart(order.PartNum, jppfn, nil, order.SourceRoot.SAS, order.DestinationRoot.SAS, true) // Add this part to the Job and schedule its transfers
jm.AddJobPart(order.PartNum, jppfn, nil, order.SourceRoot.SAS, order.DestinationRoot.SAS, true, nil) // Add this part to the Job and schedule its transfers

// Update jobPart Status with the status Manager
jm.SendJobPartCreatedMsg(ste.JobPartCreatedMsg{TotalTransfers: uint32(len(order.Transfers.List)),
Expand Down
13 changes: 6 additions & 7 deletions ste/jobStatusManager.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,27 +43,26 @@ type jobStatusManager struct {
xferDone chan xferDoneMsg
}

var jstm jobStatusManager

/* These functions should not fail */
func (jm *jobMgr) SendJobPartCreatedMsg(msg JobPartCreatedMsg) {
jstm.partCreated <- msg
jm.jstm.partCreated <- msg
}

func (jm *jobMgr) SendXferDoneMsg(msg xferDoneMsg) {
jstm.xferDone <- msg
jm.jstm.xferDone <- msg
}

func (jm *jobMgr) ListJobSummary() common.ListJobSummaryResponse {
jstm.listReq <- true
return <-jstm.respChan
jm.jstm.listReq <- true
return <-jm.jstm.respChan
}

func (jm *jobMgr) ResurrectSummary(js common.ListJobSummaryResponse) {
jstm.js = js
jm.jstm.js = js
}

func (jm *jobMgr) handleStatusUpdateMessage() {
jstm := jm.jstm
js := &jstm.js
js.JobID = jm.jobID
js.CompleteJobOrdered = false
Expand Down
43 changes: 29 additions & 14 deletions ste/mgr-JobMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type IJobMgr interface {
//Throughput() XferThroughput
// If existingPlanMMF is nil, a new MMF is opened.
AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string,
destinationSAS string, scheduleTransfers bool) IJobPartMgr
destinationSAS string, scheduleTransfers bool, completionChan chan struct{}) IJobPartMgr
SetIncludeExclude(map[string]int, map[string]int)
IncludeExclude() (map[string]int, map[string]int)
ResumeTransfers(appCtx context.Context)
Expand Down Expand Up @@ -97,14 +97,15 @@ type IJobMgr interface {
AddSuccessfulBytesInActiveFiles(n int64)
SuccessfulBytesInActiveFiles() uint64
CancelPauseJobOrder(desiredJobStatus common.JobStatus) common.CancelPauseResumeResponse
IsDaemon() bool
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx context.Context, cpuMon common.CPUMonitor, level common.LogLevel,
commandString string, logFileFolder string, tuner ConcurrencyTuner,
pacer PacerAdmin, slicePool common.ByteSlicePooler, cacheLimiter common.CacheLimiter, fileCountLimiter common.CacheLimiter,
jobLogger common.ILoggerResetable) IJobMgr {
jobLogger common.ILoggerResetable, daemonMode bool) IJobMgr {
const channelSize = 100000
// PartsChannelSize defines the number of JobParts which can be placed into the
// parts channel. Any JobPart which comes from FE and partChannel is full,
Expand All @@ -128,6 +129,7 @@ func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx conte

/* Create book-keeping channels */
jobPartProgressCh := make(chan jobPartProgressInfo)
var jstm jobStatusManager
jstm.respChan = make(chan common.ListJobSummaryResponse)
jstm.listReq = make(chan bool)
jstm.partCreated = make(chan JobPartCreatedMsg, 100)
Expand All @@ -140,7 +142,6 @@ func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx conte
concurrency: concurrency,
overwritePrompter: newOverwritePrompter(),
pipelineNetworkStats: newPipelineNetworkStats(tuner), // let the stats coordinate with the concurrency tuner
exclusiveDestinationMapHolder: &atomic.Value{},
initMu: &sync.Mutex{},
jobPartProgress: jobPartProgressCh,
coordinatorChannels: CoordinatorChannels{
Expand All @@ -167,6 +168,8 @@ func NewJobMgr(concurrency ConcurrencySettings, jobID common.JobID, appCtx conte
cacheLimiter: cacheLimiter,
fileCountLimiter: fileCountLimiter,
cpuMon: cpuMon,
jstm: &jstm,
isDaemon: daemonMode,
/*Other fields remain zero-value until this job is scheduled */}
jm.reset(appCtx, commandString)
jm.logJobsAdminMessages()
Expand Down Expand Up @@ -249,6 +252,7 @@ type jobMgrInitState struct {
securityInfoPersistenceManager *securityInfoPersistenceManager
folderCreationTracker FolderCreationTracker
folderDeletionManager common.FolderDeletionManager
exclusiveDestinationMapHolder *atomic.Value
}

// jobMgr represents the runtime information for a Job
Expand All @@ -274,7 +278,6 @@ type jobMgr struct {
cancel context.CancelFunc
pipelineNetworkStats *PipelineNetworkStats

exclusiveDestinationMapHolder *atomic.Value

// Share the same HTTP Client across all job parts, so that the we maximize re-use of
// its internal connection pool
Expand Down Expand Up @@ -311,10 +314,13 @@ type jobMgr struct {
slicePool common.ByteSlicePooler
cacheLimiter common.CacheLimiter
fileCountLimiter common.CacheLimiter
jstm *jobStatusManager

/* Pool sizer related stuff */
atomicCurrentMainPoolSize int32 // align 64 bit integers for 32 bit arch
atomicSuccessfulBytesInActiveFiles int64

isDaemon bool /* is it running as service */
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -395,12 +401,14 @@ func (jm *jobMgr) logPerfInfo(displayStrings []string, constraint common.PerfCon

// initializeJobPartPlanInfo func initializes the JobPartPlanInfo handler for given JobPartOrder
func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, existingPlanMMF *JobPartPlanMMF, sourceSAS string,
destinationSAS string, scheduleTransfers bool) IJobPartMgr {
destinationSAS string, scheduleTransfers bool, completionChan chan struct{}) IJobPartMgr {
jpm := &jobPartMgr{jobMgr: jm, filename: planFile, sourceSAS: sourceSAS,
destinationSAS: destinationSAS, pacer: jm.pacer,
slicePool: jm.slicePool,
cacheLimiter: jm.cacheLimiter,
fileCountLimiter: jm.fileCountLimiter}
fileCountLimiter: jm.fileCountLimiter,
closeOnCompletion: completionChan,
}
// If an existing plan MMF was supplied, re use it. Otherwise, init a new one.
if existingPlanMMF == nil {
jpm.planMMF = jpm.filename.Map()
Expand All @@ -411,7 +419,6 @@ func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, e
jm.jobPartMgrs.Set(partNum, jpm)
jm.setFinalPartOrdered(partNum, jpm.planMMF.Plan().IsFinalPart)
jm.setDirection(jpm.Plan().FromTo)
jpm.exclusiveDestinationMap = jm.getExclusiveDestinationMap(partNum, jpm.Plan().FromTo)

jm.initMu.Lock()
defer jm.initMu.Unlock()
Expand All @@ -421,9 +428,12 @@ func (jm *jobMgr) AddJobPart(partNum PartNumber, planFile JobPartPlanFileName, e
securityInfoPersistenceManager: newSecurityInfoPersistenceManager(jm.ctx),
folderCreationTracker: NewFolderCreationTracker(jpm.Plan().Fpo, jpm.Plan()),
folderDeletionManager: common.NewFolderDeletionManager(jm.ctx, jpm.Plan().Fpo, logger),
exclusiveDestinationMapHolder: &atomic.Value{},
}
jm.initState.exclusiveDestinationMapHolder.Store(common.NewExclusiveStringMap(jpm.Plan().FromTo, runtime.GOOS))
}
jpm.jobMgrInitState = jm.initState // so jpm can use it as much as desired without locking (since the only mutation is the init in jobManager. As far as jobPartManager is concerned, the init state is read-only
jpm.exclusiveDestinationMap = jm.getExclusiveDestinationMap(partNum, jpm.Plan().FromTo)

if scheduleTransfers {
// If the schedule transfer is set to true
Expand Down Expand Up @@ -455,7 +465,6 @@ func (jm *jobMgr) AddJobOrder(order common.CopyJobPartOrderRequest) IJobPartMgr
jm.jobPartMgrs.Set(order.PartNum, jpm)
jm.setFinalPartOrdered(order.PartNum, jpm.planMMF.Plan().IsFinalPart)
jm.setDirection(jpm.Plan().FromTo)
jpm.exclusiveDestinationMap = jm.getExclusiveDestinationMap(order.PartNum, jpm.Plan().FromTo)

jm.initMu.Lock()
defer jm.initMu.Unlock()
Expand All @@ -465,9 +474,12 @@ func (jm *jobMgr) AddJobOrder(order common.CopyJobPartOrderRequest) IJobPartMgr
securityInfoPersistenceManager: newSecurityInfoPersistenceManager(jm.ctx),
folderCreationTracker: NewFolderCreationTracker(jpm.Plan().Fpo, jpm.Plan()),
folderDeletionManager: common.NewFolderDeletionManager(jm.ctx, jpm.Plan().Fpo, logger),
exclusiveDestinationMapHolder: &atomic.Value{},
}
jm.initState.exclusiveDestinationMapHolder.Store(common.NewExclusiveStringMap(jpm.Plan().FromTo, runtime.GOOS))
}
jpm.jobMgrInitState = jm.initState // so jpm can use it as much as desired without locking (since the only mutation is the init in jobManager. As far as jobPartManager is concerned, the init state is read-only
jpm.exclusiveDestinationMap = jm.getExclusiveDestinationMap(order.PartNum, jpm.Plan().FromTo)

jm.QueueJobParts(jpm)
return jpm
Expand Down Expand Up @@ -515,11 +527,7 @@ func (jm *jobMgr) setDirection(fromTo common.FromTo) {

// can't do this at time of constructing the jobManager, because it doesn't know fromTo at that time
func (jm *jobMgr) getExclusiveDestinationMap(partNum PartNumber, fromTo common.FromTo) *common.ExclusiveStringMap {
// assume that first part is ordered before any others
if partNum == 0 {
jm.exclusiveDestinationMapHolder.Store(common.NewExclusiveStringMap(fromTo, runtime.GOOS))
}
return jm.exclusiveDestinationMapHolder.Load().(*common.ExclusiveStringMap)
return jm.initState.exclusiveDestinationMapHolder.Load().(*common.ExclusiveStringMap)
}

func (jm *jobMgr) HttpClient() *http.Client {
Expand Down Expand Up @@ -592,6 +600,10 @@ func (jm *jobMgr) reportJobPartDoneHandler() {
jobProgressInfo.transfersSkipped += partProgressInfo.transfersSkipped
jobProgressInfo.transfersFailed += partProgressInfo.transfersFailed

if partProgressInfo.completionChan != nil {
close(partProgressInfo.completionChan)
}

// If the last part is still awaited or other parts all still not complete,
// JobPart 0 status is not changed (unless we are cancelling)
haveFinalPart = atomic.LoadInt32(&jm.atomicFinalPartOrderedIndicator) == 1
Expand Down Expand Up @@ -1002,7 +1014,10 @@ func (jm *jobMgr) CancelPauseJobOrder(desiredJobStatus common.JobStatus) common.
}
return jr
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

func (jm *jobMgr) IsDaemon() bool {
return jm.isDaemon
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

Expand Down
4 changes: 4 additions & 0 deletions ste/mgr-JobPartMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ type jobPartProgressInfo struct {
transfersCompleted int
transfersSkipped int
transfersFailed int
completionChan chan struct{}
}

// jobPartMgr represents the runtime information for a Job's Part
Expand Down Expand Up @@ -310,6 +311,8 @@ type jobPartMgr struct {
atomicTransfersSkipped uint32

cpkOptions common.CpkOptions

closeOnCompletion chan struct{}
}

func (jpm *jobPartMgr) getOverwritePrompter() *overwritePrompter {
Expand Down Expand Up @@ -843,6 +846,7 @@ func (jpm *jobPartMgr) ReportTransferDone(status common.TransferStatus) (transfe
transfersCompleted: int(atomic.LoadUint32(&jpm.atomicTransfersCompleted)),
transfersSkipped: int(atomic.LoadUint32(&jpm.atomicTransfersSkipped)),
transfersFailed: int(atomic.LoadUint32(&jpm.atomicTransfersFailed)),
completionChan: jpm.closeOnCompletion,
}
jpm.jobMgr.ReportJobPartDone(jppi)
}
Expand Down
3 changes: 2 additions & 1 deletion ste/mgr-JobPartTransferMgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,8 @@ func (jptm *jobPartTransferMgr) failActiveTransfer(typ transferErrorCode, descri
jptm.SetErrorCode(int32(status)) // TODO: what are the rules about when this needs to be set, and doesn't need to be (e.g. for earlier failures)?
// If the status code was 403, it means there was an authentication error and we exit.
// User can resume the job if completely ordered with a new sas.
if status == http.StatusForbidden {
if status == http.StatusForbidden &&
!jptm.jobPartMgr.(*jobPartMgr).jobMgr.IsDaemon() {
// quit right away, since without proper authentication no work can be done
// display a clear message
common.GetLifecycleMgr().Info(fmt.Sprintf("Authentication failed, it is either not correct, or expired, or does not have the correct permission %s", err.Error()))
Expand Down