Skip to content

Commit

Permalink
use --jobId argument and input_args.json file for grid jobs (#8883)
Browse files Browse the repository at this point in the history
* add support for --jobId arg to CMSRunAnalysis.py

* transfer input_args.json to WN

* make it work also for automatic splitting

* pylint: avoid undef. vars in CMSRunAnalysis.py
  • Loading branch information
belforte authored Jan 20, 2025
1 parent bd7348c commit a6115c5
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 61 deletions.
32 changes: 30 additions & 2 deletions scripts/CMSRunAnalysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ def handleException(exitAcronym, exitCode, exitMsg):

def parseArgs():
parser = PassThroughOptionParser()
parser.add_option('--jobId', dest='jobId', type='string')
parser.add_option('--json', dest='jsonArgFile', type='string')
parser.add_option('-a', dest='archiveJob', type='string')
parser.add_option('-o', dest='outFiles', type='string')
Expand Down Expand Up @@ -319,9 +320,36 @@ def parseArgs():
if value == 'None':
setattr(opts, name, None)

# allow for most input arguments to be passed via a JSON file
# in this case only -r and --JobNumber need to be present as arguments
# allow for arguments simply be the jobId (a string because automtic splitting has format like N-M
if getattr(opts, 'jobId', None):
arguments = {}
with open('input_args.json', 'r', encoding='UTF-8') as fh:
allArgs = json.load(fh) # read file prepared by DagmanCreator
for args in allArgs:
if args['CRAB_Id'] == opts.jobId:
arguments = args # pick the arguments for this job
break
if not arguments:
raise Exception("input jobId not found in input_args.json")
for key, value in arguments.items():
setattr(opts, key, value)

# remap key in input_args.json to the argument names required by CMSRunAnalysis.py
# use as : value_of_argument_name = inputArgs[argMap[argument_name]]
# to ease transition to cleaner code the new key are only added if missing
argMap = {
'archiveJob': 'CRAB_Archive', 'outFiles': 'CRAB_AdditionalOutputFiles',
'sourceURL': 'CRAB_ISB', 'cmsswVersion': 'CRAB_JobSW',
'scramArch': 'CRAB_JobArch', 'runAndLumis': 'runAndLumiMask',
'inputFile' : 'inputFiles', 'lheInputFiles': 'lheInputFiles'
}
for key, value in argMap.items():
if not getattr(opts, key, None):
setattr(opts, key, arguments[value]) # assign to our variables

# allow for most input arguments to be passed via a (job specific) JSON file
if getattr(opts, 'jsonArgFile', None):
arguments = {}
with open(opts.jsonArgFile, 'r', encoding='UTF-8') as fh:
arguments = json.load(fh)
for key, value in arguments.items():
Expand Down
165 changes: 106 additions & 59 deletions src/python/TaskWorker/Actions/DagmanCreator.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@
SCRIPT DEFER 4 1800 POST Job{count} dag_bootstrap.sh POSTJOB $JOBID $RETURN $RETRY $MAX_RETRIES {taskname} {count} {tempDest} {outputDest} cmsRun_{count}.log.tar.gz {stage} {remoteOutputFiles}
#PRE_SKIP Job{count} 3
RETRY Job{count} {maxretries} UNLESS-EXIT 2
VARS Job{count} count="{count}" runAndLumiMask="job_lumis_{count}.json" lheInputFiles="{lheInputFiles}" firstEvent="{firstEvent}" firstLumi="{firstLumi}" lastEvent="{lastEvent}" firstRun="{firstRun}" maxRuntime="{maxRuntime}" eventsPerLumi="{eventsPerLumi}" seeding="{seeding}" inputFiles="job_input_file_list_{count}.txt" scriptExe="{scriptExe}" scriptArgs="{scriptArgs}" +CRAB_localOutputFiles="\\"{localOutputFiles}\\"" +CRAB_DataBlock="\\"{block}\\"" +CRAB_Destination="\\"{destination}\\""
VARS Job{count} count="{count}"
# following 3 classAds could possibly be moved to Job.submit but as they are job-dependent
# would need to be done in the PreJob... doing it here is a bit ugly, but simpler
VARS Job{count} My.CRAB_localOutputFiles="\\"{localOutputFiles}\\""
VARS Job{count} My.CRAB_DataBlock="\\"{block}\\""
VARS Job{count} My.CRAB_Destination="\\"{destination}\\""
ABORT-DAG-ON Job{count} 3
"""

Expand Down Expand Up @@ -146,7 +151,7 @@
Log = job_log
# args changed...
Arguments = "-a $(CRAB_Archive) --sourceURL=$(CRAB_ISB) --jobNumber=$(CRAB_Id) --cmsswVersion=$(CRAB_JobSW) --scramArch=$(CRAB_JobArch) '--inputFile=$(inputFiles)' '--runAndLumis=$(runAndLumiMask)' --lheInputFiles=$(lheInputFiles) --firstEvent=$(firstEvent) --firstLumi=$(firstLumi) --lastEvent=$(lastEvent) --firstRun=$(firstRun) --seeding=$(seeding) --scriptExe=$(scriptExe) --eventsPerLumi=$(eventsPerLumi) --maxRuntime=$(maxRuntime) '--scriptArgs=$(scriptArgs)' -o $(CRAB_AdditionalOutputFiles)"
Arguments = "--jobId=$(CRAB_Id)"
transfer_input_files = CMSRunAnalysis.sh, cmscp.py%(additional_input_file)s
transfer_output_files = jobReport.json.$(count), WMArchiveReport.json.$(count)
Expand Down Expand Up @@ -371,6 +376,7 @@ def __init__(self, config, crabserver, procnum=-1, rucioClient=None):
""" need a comment line here """
TaskAction.__init__(self, config, crabserver, procnum)
self.rucioClient = rucioClient
self.runningInTW = crabserver is not None

def populateGlideinMatching(self, info):
""" actually simply set the required arch and microarch """
Expand Down Expand Up @@ -403,7 +409,6 @@ def populateGlideinMatching(self, info):
self.logger.error(f"Not supported microarch: {min_micro_arch}. Ignore it")
info['required_minimum_microarch'] = 'any'


def getDashboardTaskType(self, task):
""" Get the dashboard activity name for the task.
"""
Expand Down Expand Up @@ -448,7 +453,6 @@ def isGlobalBlacklistIgnored(self, kwargs):

return kwargs['task']['tm_ignore_global_blacklist'] == 'T'


def makeJobSubmit(self, task):
"""
Create the submit file. This is reused by all jobs in the task; differences
Expand Down Expand Up @@ -565,6 +569,7 @@ def makeJobSubmit(self, task):
else:
raise TaskWorkerException(f"Cannot find TaskManagerRun.tar.gz inside the cwd: {os.getcwd()}")
info['additional_input_file'] += ", sandbox.tar.gz" # it will be present on SPOOL_DIR after dab_bootstrap
info['additional_input_file'] += ", input_args.json"
info['additional_input_file'] += ", run_and_lumis.tar.gz"
info['additional_input_file'] += ", input_files.tar.gz"
info['additional_input_file'] += ", submit_env.sh"
Expand Down Expand Up @@ -701,14 +706,15 @@ def makeDagSpecs(self, task, siteinfo, jobgroup, block, availablesites, datasite

return dagSpecs, i

def prepareLocal(self, dagSpecs, info, kw, inputFiles, subdags):
""" Prepare a file named "input_args.json" with all the input parameters of each jobs. It is a list
with a dictionary for each job. The dictionary key/value pairs are the arguments of gWMS-CMSRunAnalysis.sh
N.B.: in the JDL: "Executable = gWMS-CMSRunAnalysis.sh" and "Arguments = $(CRAB_Archive) --sourceURL=$(CRAB_ISB) ..."
where each argument of each job is set in "input_args.json".
Also, this prepareLocal method prepare a single "InputFiles.tar.gz" file with all the inputs files moved
from the TW to the schedd.
This is used by the client preparelocal command.
def prepareJobArguments(self, dagSpecs, task):
""" Prepare an object with all the input parameters of each jobs. It is a list
with a dictionary for each job. The dictionary key/value pairs are the variables needed in CMSRunAnalysis.py
This will be save in "input_args*.json", a differnt json file for the main DAG and each subdags
Inputs:
dagSpecs : list of dictionaries with information for each DAG job
task: dictionary, the "standard" task dictionary with info from the DataBase TASK table
Returns:
argdicts : list of dictionaries, one per job, with the args needeed by CMSRunAnalysis.py
"""

argdicts = []
Expand All @@ -717,35 +723,66 @@ def prepareLocal(self, dagSpecs, info, kw, inputFiles, subdags):
argDict['inputFiles'] = f"job_input_file_list_{dagspec['count']}.txt" #'job_input_file_list_1.txt'
argDict['runAndLumiMask'] = f"job_lumis_{dagspec['count']}.json"
argDict['CRAB_Id'] = dagspec['count'] #'1'
argDict['lheInputFiles'] = dagspec['lheInputFiles'] #False
argDict['firstEvent'] = dagspec['firstEvent'] #'None'
argDict['lastEvent'] = dagspec['lastEvent'] #'None'
argDict['firstLumi'] = dagspec['firstLumi'] #'None'
argDict['firstRun'] = dagspec['firstRun'] #'None'
argDict['CRAB_Archive'] = info['cachefilename_flatten'] #'sandbox.tar.gz'
argDict['CRAB_ISB'] = info['cacheurl_flatten'] #u'https://cmsweb.cern.ch/crabcache'
argDict['CRAB_JobSW'] = info['jobsw_flatten'] #u'CMSSW_9_2_5'
argDict['CRAB_JobArch'] = info['jobarch_flatten'] #u'slc6_amd64_gcc530'
argDict['lheInputFiles'] = dagspec['lheInputFiles'] # False
argDict['firstEvent'] = dagspec['firstEvent'] # 'None'
argDict['lastEvent'] = dagspec['lastEvent'] # 'None'
argDict['firstLumi'] = dagspec['firstLumi'] # 'None'
argDict['firstRun'] = dagspec['firstRun'] # 'None'
argDict['CRAB_Archive'] = task['tm_user_sandbox'] #'sandbox.tar.gz'
argDict['CRAB_ISB'] = task['tm_cache_url'] # 'https://cmsweb.cern.ch/crabcache'
argDict['CRAB_JobSW'] = task['tm_job_sw'] # 'CMSSW_9_2_5'
argDict['CRAB_JobArch'] = task['tm_job_arch'] # 'slc6_amd64_gcc530'
argDict['seeding'] = 'AutomaticSeeding'
argDict['scriptExe'] = kw['task']['tm_scriptexe'] #
argDict['eventsPerLumi'] = kw['task']['tm_events_per_lumi'] #
argDict['maxRuntime'] = kw['task']['max_runtime'] #-1
argDict['scriptArgs'] = kw['task']['tm_scriptargs']
argDict['CRAB_AdditionalOutputFiles'] = info['addoutputfiles_flatten']
#The following two are for fixing up job.submit files
argDict['CRAB_localOutputFiles'] = dagspec['localOutputFiles']
argDict['CRAB_Destination'] = dagspec['destination']
argDict['scriptExe'] = task['tm_scriptexe'] #
argDict['eventsPerLumi'] = task['tm_events_per_lumi'] #
argDict['maxRuntime'] = dagspec['maxRuntime'] # -1
argDict['scriptArgs'] = task['tm_scriptargs']
argDict['CRAB_AdditionalOutputFiles'] = "{}"
# The following two are for fixing up job.submit files
# SB argDict['CRAB_localOutputFiles'] = dagspec['localOutputFiles']
# SB argDict['CRAB_Destination'] = dagspec['destination']
argdicts.append(argDict)
return argdicts

with open('input_args.json', 'w', encoding='utf-8') as fd:
json.dump(argdicts, fd)
def prepareTarballForSched(self, filesForSched, subdags):
""" prepare a single "InputFiles.tar.gz" file with all the files to be moved
from the TW to the schedd.
This file will also be used by by the client preparelocal command.
"""

with tarfile.open('InputFiles.tar.gz', mode='w:gz') as tf:
for ifname in inputFiles + subdags + ['input_args.json']:
for ifname in filesForSched + subdags:
tf.add(ifname)

def createSubdag(self, splitterResult, **kwargs):
""" beware the "Sub" in the name ! This is used also for Main DAG """
""" beware the "Sub" in the name ! This is used also for Main DAG
Does the actual DAG file creation and writes out relevant files
Handles both conventional tasks (only one DAG created in the TW) and
automatic splitting (multiple subdags which will be added in the scheduler by
the PreDag.py script which calls this DagmanCreator
Returns:
info : dictionary : passes info to next action (DagmanSubmitter)
splitterResult : object : this is the output of previous action (Splitter) and is part of input
arguments to DagmanCreator ! As far as Stefano can tell returning it
here is a "perverse" way to pass it also to DagmanSubmitter
subdags : list : list of subdags files created which will need to be sent to scheduler
Stefano does not understans why it is needed since the subdags will be
overwritten by PreDag in the scheduler, maybe DAGMAN requires that the subdag
file indicated in the DAG exists, even if empty and eventually filled by the PreDag
e.g. the probe stage DAG ends with:
SUBDAG EXTERNAL Job0SubJobs RunJobs0.subdag NOOP
SCRIPT DEFER 4 300 PRE Job0SubJobs dag_bootstrap.sh PREDAG processing 5 0
Side effects - writes these files to cwd:
RunJobs.dag : the initial DAGMAN which will be submitted by DagmanSubmitter
RunJobs{0,1,2,3}.subdag : the DAGMAN description for processing and tail subdags
input_args.json : the arguments needed by CMSRunAnalysis.py for each job
datadiscovery.pkl : the object returned in output from DataDiscovery action, PreDag will need it
in order to call Splitter and create the correct subdags
taskinformation.pkl : the content of the task dictionary
taskworkerconfig.pkl : the content of the TaskWorkerConfig object
site.ad.json : sites assigned to jobs in each job group (info from Splitter)
"""

startjobid = kwargs.get('startjobid', 0)
parent = kwargs.get('parent', None)
Expand Down Expand Up @@ -1075,35 +1112,44 @@ def getBlacklistMsg():
shutil.rmtree(tempDir2)

if stage in ('probe', 'conventional'):
name = "RunJobs.dag"
## Cache data discovery
dagFileName = "RunJobs.dag"
with open("datadiscovery.pkl", "wb") as fd:
pickle.dump(splitterResult[1], fd)

## Cache task information
pickle.dump(splitterResult[1], fd) # Cache data discovery
with open("taskinformation.pkl", "wb") as fd:
pickle.dump(kwargs['task'], fd)

## Cache TaskWorker configuration
pickle.dump(kwargs['task'], fd) # Cache task information
with open("taskworkerconfig.pkl", "wb") as fd:
pickle.dump(self.config, fd)
pickle.dump(self.config, fd) # Cache TaskWorker configuration
elif stage == 'processing':
name = "RunJobs0.subdag"
dagFileName = "RunJobs0.subdag"
else:
name = f"RunJobs{parent}.subdag"
dagFileName = f"RunJobs{parent}.subdag"
argFileName = "input_args.json"

## Cache site information
with open("site.ad.json", "w", encoding='utf-8') as fd:
json.dump(siteinfo, fd)


## Save the DAG into a file.
with open(name, "w", encoding='utf-8') as fd:
with open(dagFileName, "w", encoding='utf-8') as fd:
fd.write(dag)

kwargs['task']['jobcount'] = len(dagSpecs)

info = self.makeJobSubmit(kwargs['task'])

# list of input arguments needed for each jobs
argdicts = self.prepareJobArguments(dagSpecs, kwargs['task'])
# save the input arguments to each job's CMSRunAnalysis.py in input_args.json file
if stage in ['processing', 'tail']: # add to argument list from previous stage
with open(argFileName, 'r', encoding='utf-8') as fd:
oldArgs = json.load(fd)
argdicts = oldArgs + argdicts
# no worry of overwriting, even in automatic splitting multiple DagmanCreator
# is executed inside PreDag which is wrapped with a lock
with open(argFileName, 'w', encoding='utf-8') as fd:
json.dump(argdicts, fd)

maxidle = getattr(self.config.TaskWorker, 'maxIdle', MAX_IDLE_JOBS)
if maxidle == -1:
maxidle = info['jobcount']
Expand All @@ -1127,7 +1173,7 @@ def getBlacklistMsg():
elif info.get('faillimit') < 0:
info['faillimit'] = -1

return info, splitterResult, subdags, dagSpecs
return info, splitterResult, subdags

def getHighPrioUsers(self, userProxy, workflow, egroups):
""" get the list of high priority users """
Expand Down Expand Up @@ -1169,7 +1215,7 @@ def executeInternal(self, *args, **kw):
sandboxTarBall = 'sandbox.tar.gz'

# Bootstrap the ISB if we are running in the TW
if self.crabserver:
if self.runningInTW:
username = kw['task']['tm_username']
taskname = kw['task']['tm_taskname']
sandboxName = kw['task']['tm_user_sandbox']
Expand All @@ -1194,19 +1240,20 @@ def executeInternal(self, *args, **kw):
kw['task']['dbinstance'] = self.crabserver.getDbInstance()
params = {}

inputFiles = ['gWMS-CMSRunAnalysis.sh', 'submit_env.sh', 'CMSRunAnalysis.sh', 'cmscp.py', 'cmscp.sh', 'RunJobs.dag', 'Job.submit', 'dag_bootstrap.sh',
'AdjustSites.py', 'site.ad.json', 'datadiscovery.pkl', 'taskinformation.pkl', 'taskworkerconfig.pkl',
'run_and_lumis.tar.gz', 'input_files.tar.gz']
# files to be transferred to remove WN's via Job.submit, could pack most in a tarball
filesForWN = ['submit_env.sh', 'CMSRunAnalysis.sh', 'cmscp.py', 'cmscp.sh', 'CMSRunAnalysis.tar.gz',
'run_and_lumis.tar.gz', 'input_files.tar.gz', 'input_args.json']
# files to be transferred to the scheduler by fDagmanSubmitter (these will all be placed in InputFiles.tar.gz)
filesForSched = filesForWN + \
['gWMS-CMSRunAnalysis.sh', 'RunJobs.dag', 'Job.submit', 'dag_bootstrap.sh',
'AdjustSites.py', 'site.ad.json', 'TaskManagerRun.tar.gz',
'datadiscovery.pkl', 'taskinformation.pkl', 'taskworkerconfig.pkl',]

if os.path.exists("CMSRunAnalysis.tar.gz"):
inputFiles.append("CMSRunAnalysis.tar.gz")
if os.path.exists("TaskManagerRun.tar.gz"):
inputFiles.append("TaskManagerRun.tar.gz")
if kw['task']['tm_input_dataset']:
inputFiles.append("input_dataset_lumis.json")
inputFiles.append("input_dataset_duplicate_lumis.json")
filesForSched.append("input_dataset_lumis.json")
filesForSched.append("input_dataset_duplicate_lumis.json")

info, splitterResult, subdags, dagSpecs = self.createSubdag(*args, **kw)
info, splitterResult, subdags = self.createSubdag(*args, **kw)

# as splitter summary is useful for dryrun, let's add it to the InputFiles tarball
jobGroups = splitterResult[0] # the first returned value of Splitter action is the splitterFactory output
Expand All @@ -1215,9 +1262,9 @@ def executeInternal(self, *args, **kw):
jobs = jobgroup.getJobs()
splittingSummary.addJobs(jobs)
splittingSummary.dump('splitting-summary.json')
inputFiles.append('splitting-summary.json')
filesForSched.append('splitting-summary.json')

self.prepareLocal(dagSpecs, info, kw, inputFiles, subdags)
self.prepareTarballForSched(filesForSched, subdags)

return info, params, ["InputFiles.tar.gz"], splitterResult

Expand Down

0 comments on commit a6115c5

Please sign in to comment.