From 806226a154f55968c7d3007f513ab12fe9586088 Mon Sep 17 00:00:00 2001 From: Stefano Belforte Date: Fri, 31 May 2024 16:36:52 +0200 Subject: [PATCH] new format of schedd.submit)/spool() fix #8336 fix #8333 (#8448) * new format of schedd.submit)/spool() fix #8336 fix #8333 * pylint --- .../TaskWorker/Actions/DagmanSubmitter.py | 240 ++++++++---------- src/python/TaskWorker/Actions/PreDAG.py | 2 +- 2 files changed, 106 insertions(+), 136 deletions(-) diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index 104fef7c7f..87c2501322 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -5,7 +5,6 @@ import os import copy -import json import time import sys @@ -34,80 +33,67 @@ ## These are the CRAB attributes that we want to add to the job class ad when ## using the submitDirect() method. SUBMIT_INFO = [ \ - ('CRAB_ReqName', 'requestname'), - ('CRAB_Workflow', 'workflow'), - ('CMS_JobType', 'jobtype'), - ('CRAB_JobSW', 'jobsw'), - ('CRAB_JobArch', 'jobarch'), - ('DESIRED_CMSDataset', 'inputdata'), - ('CRAB_DBSURL', 'dbsurl'), - ('CRAB_PublishName', 'publishname'), - ('CRAB_Publish', 'publication'), - ('CRAB_PublishDBSURL', 'publishdbsurl'), - ('CRAB_PrimaryDataset', 'primarydataset'), - ('CRAB_ISB', 'cacheurl'), - ('CRAB_AdditionalOutputFiles', 'addoutputfiles'), - ('CRAB_EDMOutputFiles', 'edmoutfiles'), - ('CRAB_TFileOutputFiles', 'tfileoutfiles'), - ('CRAB_TransferOutputs', 'saveoutput'), - ('CRAB_SaveLogsFlag', 'savelogsflag'), - ('CRAB_UserDN', 'userdn'), - ('CRAB_UserHN', 'userhn'), - ('CRAB_AsyncDest', 'asyncdest'), - #('CRAB_StageoutPolicy', 'stageoutpolicy'), - ('CRAB_UserRole', 'tm_user_role'), - ('CRAB_UserGroup', 'tm_user_group'), - ('CRAB_TaskWorker', 'worker_name'), - ('CRAB_RetryOnASOFailures', 'retry_aso'), - ('CRAB_ASOTimeout', 'aso_timeout'), - ('CRAB_RestHost', 'resthost'), - ('CRAB_DbInstance', 'dbinstance'), - ('CRAB_NumAutomJobRetries', 'numautomjobretries'), - ('CRAB_SplitAlgo', 'splitalgo'), - ('CRAB_AlgoArgs', 'algoargs'), - ('CRAB_LumiMask', 'lumimask'), - ('CRAB_JobCount', 'jobcount'), - ('CRAB_UserVO', 'tm_user_vo'), - ('CRAB_SiteBlacklist', 'siteblacklist'), - ('CRAB_SiteWhitelist', 'sitewhitelist'), - ('RequestMemory', 'tm_maxmemory'), - ('RequestCpus', 'tm_numcores'), - ('MaxWallTimeMins', 'tm_maxjobruntime'), - ('MaxWallTimeMinsRun', 'tm_maxjobruntime'), - ('MaxWallTimeMinsProbe', 'maxproberuntime'), - ('MaxWallTimeMinsTail', 'maxtailruntime'), + ('+CRAB_ReqName', 'requestname'), + ('+CRAB_Workflow', 'workflow'), + ('+CMS_JobType', 'jobtype'), + ('+CRAB_JobSW', 'jobsw'), + ('+CRAB_JobArch', 'jobarch'), + ('+DESIRED_CMSDataset', 'inputdata'), + ('+CRAB_DBSURL', 'dbsurl'), + ('+CRAB_PublishName', 'publishname'), + ('+CRAB_Publish', 'publication'), + ('+CRAB_PublishDBSURL', 'publishdbsurl'), + ('+CRAB_PrimaryDataset', 'primarydataset'), + ('+CRAB_ISB', 'cacheurl'), + ('+CRAB_AdditionalOutputFiles', 'addoutputfiles'), + ('+CRAB_EDMOutputFiles', 'edmoutfiles'), + ('+CRAB_TFileOutputFiles', 'tfileoutfiles'), + ('+CRAB_TransferOutputs', 'saveoutput'), + ('+CRAB_SaveLogsFlag', 'savelogsflag'), + ('+CRAB_UserDN', 'userdn'), + ('+CRAB_UserHN', 'userhn'), + ('+CRAB_AsyncDest', 'asyncdest'), + #('+CRAB_StageoutPolicy', 'stageoutpolicy'), + ('+CRAB_UserRole', 'tm_user_role'), + ('+CRAB_UserGroup', 'tm_user_group'), + ('+CRAB_TaskWorker', 'worker_name'), + ('+CRAB_RetryOnASOFailures', 'retry_aso'), + ('+CRAB_ASOTimeout', 'aso_timeout'), + ('+CRAB_RestHost', 'resthost'), + ('+CRAB_DbInstance', 'dbinstance'), + ('+CRAB_NumAutomJobRetries', 'numautomjobretries'), + ('+CRAB_SplitAlgo', 'splitalgo'), + ('+CRAB_AlgoArgs', 'algoargs'), + ('+CRAB_LumiMask', 'lumimask'), + ('+CRAB_JobCount', 'jobcount'), + ('+CRAB_UserVO', 'tm_user_vo'), + ('+CRAB_SiteBlacklist', 'siteblacklist'), + ('+CRAB_SiteWhitelist', 'sitewhitelist'), + ('+RequestMemory', 'tm_maxmemory'), + ('+RequestCpus', 'tm_numcores'), + ('+MaxWallTimeMins', 'tm_maxjobruntime'), + ('+MaxWallTimeMinsRun', 'tm_maxjobruntime'), + ('+MaxWallTimeMinsProbe', 'maxproberuntime'), + ('+MaxWallTimeMinsTail', 'maxtailruntime'), ('JobPrio', 'tm_priority'), - ('CRAB_FailedNodeLimit', 'faillimit'), - ('CRAB_DashboardTaskType', 'taskType'), - ('CRAB_MaxIdle', 'maxidle'), - ('CRAB_MaxPost', 'maxpost'), - ('CMS_Type', 'cms_type'), - ('CMS_WMTool', 'cms_wmtool'), - ('CMS_TaskType', 'cms_tasktype'), + ('+CRAB_FailedNodeLimit', 'faillimit'), + ('+CRAB_DashboardTaskType', 'taskType'), + ('+CRAB_MaxIdle', 'maxidle'), + ('+CRAB_MaxPost', 'maxpost'), + ('+CMS_Type', 'cms_type'), + ('+CMS_WMTool', 'cms_wmtool'), + ('+CMS_TaskType', 'cms_tasktype'), ] -def addCRABInfoToClassAd(ad, info): + +def addCRABInfoToJobJDL(jdl, info): """ - Given a submit ClassAd, add in the appropriate CRAB_* attributes + given a submit objecty, add in the appropriate CRAB_& attributes from the info directory """ for adName, dictName in SUBMIT_INFO: if dictName in info and (info[dictName] is not None): - ad[adName] = classad.ExprTree(str(info[dictName])) - if 'extra_jdl' in info and info['extra_jdl']: - for jdl in info['extra_jdl'].split('\n'): - adName, adVal = jdl.lstrip('+').split('=', 1) - # remove spaces which would break schedd.submit #8420 - adName = adName.strip() - adVal = adVal.strip() - ad[adName] = adVal - if 'accelerator_jdl' in info and info['accelerator_jdl']: - for jdl in info['accelerator_jdl'].split('\n'): - adName, adVal = jdl.lstrip('+').split('=', 1) - # remove spaces which would break schedd.submit #8420 - adName = adName.strip() - adVal = adVal.strip() - ad[adName] = classad.ExprTree(str(adVal)) + jdl[adName] = info[dictName] class ScheddStats(dict): @@ -397,7 +383,7 @@ def executeInternal(self, info, inputFiles, **kwargs): os.chdir(kwargs['tempDir']) info['start_time'] = task['tm_start_time'] - info['inputFilesString'] = ", ".join(inputFiles + ['subdag.ad']) + info['inputFilesString'] = ", ".join(inputFiles + ['subdag.jdl']) outputFiles = ["RunJobs.dag.dagman.out", "RunJobs.dag.rescue.001"] info['outputFilesString'] = ", ".join(outputFiles) arg = "RunJobs.dag" @@ -475,89 +461,73 @@ def submitDirect(self, schedd, cmd, arg, info): #pylint: disable=R0201 """ Submit directly to the schedd using the HTCondor module """ - dagAd = classad.ClassAd() - addCRABInfoToClassAd(dagAd, info) + jobJDL = htcondor.Submit() + addCRABInfoToJobJDL(jobJDL, info) if info["CMSGroups"]: - dagAd["CMSGroups"] = ','.join(info["CMSGroups"]) + jobJDL["+CMSGroups"] = classad.quote(','.join(info["CMSGroups"])) else: - dagAd["CMSGroups"] = classad.Value.Undefined + jobJDL["+CMSGroups"] = classad.Value.Undefined # NOTE: Changes here must be synchronized with the job_submit in DagmanCreator.py in CAFTaskWorker - dagAd["CRAB_Attempt"] = 0 - dagAd["CMS_SubmissionTool"] = "CRAB" + jobJDL["+CRAB_Attempt"] = 0 + jobJDL["+CMS_SubmissionTool"] = "CRAB" # We switched from local to scheduler universe. Why? It seems there's no way in the # local universe to change the hold signal at runtime. That's fairly important for our # resubmit implementation. - #dagAd["JobUniverse"] = 12 - dagAd["JobUniverse"] = 7 - dagAd["HoldKillSig"] = "SIGUSR1" - dagAd["X509UserProxy"] = info['user_proxy'] - dagAd["Requirements"] = classad.ExprTree('true || false') - # SB April 2024 the following horrible linw could be replace in current HTConcdor by - # dagAd["Environmnet"] = "PATH=...; CRAB3_VERSION=...; ...." see - # https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html#environment - dagAd["Environment"] = classad.ExprTree('strcat("PATH=/usr/bin:/bin CRAB3_VERSION=3.3.0-pre1 CONDOR_ID=", ClusterId, ".", ProcId," %s")' % " ".join(info['additional_environment_options'].split(";"))) - dagAd["RemoteCondorSetup"] = info['remote_condor_setup'] - - dagAd["CRAB_TaskSubmitTime"] = info['start_time'] - dagAd['CRAB_TaskLifetimeDays'] = TASKLIFETIME // 24 // 60 // 60 - dagAd['CRAB_TaskEndTime'] = int(info['start_time']) + TASKLIFETIME + #jobJDL["JobUniverse"] = 12 + jobJDL["JobUniverse"] = 7 + jobJDL["HoldKillSig"] = "SIGUSR1" + jobJDL["X509UserProxy"] = info['user_proxy'] + jobJDL["Requirements"] = "True || False" + environmentString = "PATH=/usr/bin:/bin CRAB3_VERSION=3.3.0-pre1" + environmentString += " CONDOR_ID=$(ClusterId).$(ProcId)" + environmentString += " " + " ".join(info['additional_environment_options'].split(';')) + # Environment command in JDL requires proper quotes https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html#environment + jobJDL["Environment"] = classad.quote(environmentString) + jobJDL["+RemoteCondorSetup"] = info['remote_condor_setup'] + jobJDL["+CRAB_TaskSubmitTime"] = info['start_time'] + jobJDL['+CRAB_TaskLifetimeDays'] = TASKLIFETIME // 24 // 60 // 60 + jobJDL['+CRAB_TaskEndTime'] = int(info['start_time']) + TASKLIFETIME #For task management info see https://github.com/dmwm/CRABServer/issues/4681#issuecomment-302336451 - dagAd["LeaveJobInQueue"] = classad.ExprTree("true") - dagAd["PeriodicHold"] = classad.ExprTree("time() > CRAB_TaskEndTime") - dagAd["TransferOutput"] = info['outputFilesString'] - dagAd["OnExitHold"] = classad.ExprTree("(ExitCode =!= UNDEFINED && ExitCode != 0)") - dagAd["OnExitRemove"] = classad.ExprTree("( ExitSignal =?= 11 || (ExitCode =!= UNDEFINED && ExitCode >=0 && ExitCode <= 2))") - dagAd["OtherJobRemoveRequirements"] = classad.ExprTree("DAGManJobId =?= ClusterId") - dagAd["RemoveKillSig"] = "SIGUSR1" - - # SB April 2024: the following write can likely be replaced by - # print(dagAd, file=fd) which is the same as fd.write(str(dagAd)) - # letting internal classad code do the formatting (in new format, which is fine - # since now we read with classad.parseOne which handles both old and new) - with open('subdag.ad', 'w', encoding='utf-8') as fd: - for k, v in dagAd.items(): - if k == 'X509UserProxy': - v = os.path.basename(v) - if isinstance(v, str): - value = classad.quote(v) - elif isinstance(v, bytes): - # we only expect strings in the code above, but.. just in case, - # be prarped for bytes in case it requires a different handling at some point - value = classad.quote(v) - elif isinstance(v, classad.ExprTree): - value = repr(v) - elif isinstance(v, list): - value = "{{{0}}}".format(json.dumps(v)[1:-1]) - else: - value = v - fd.write(f"+{k} = {value}\n") - - dagAd["TaskType"] = "ROOT" - dagAd["Out"] = str(os.path.join(info['scratch'], "request.out")) - dagAd["Err"] = str(os.path.join(info['scratch'], "request.err")) - dagAd["Cmd"] = cmd - dagAd['Args'] = arg - dagAd["TransferInput"] = str(info['inputFilesString']) + jobJDL["LeaveJobInQueue"] = "True" + jobJDL["PeriodicHold"] = "time() > CRAB_TaskEndTime" + jobJDL["transfer_output_files"] = info['outputFilesString'] + jobJDL["OnExitHold"] = "(ExitCode =!= UNDEFINED && ExitCode != 0)" + jobJDL["OnExitRemove"] = "( ExitSignal =?= 11 || (ExitCode =!= UNDEFINED && ExitCode >=0 && ExitCode <= 2))" + # jobJDL["OtherJobRemoveRequirements"] = "DAGManJobId =?= ClusterId" # appears unused SB + jobJDL["RemoveKillSig"] = "SIGUSR1" + + # prepare a jobJDL fragment to be used when running in the scheduler + # to create subdags for automatic splitting. A crucial change is location of the proxy + subdagJDL = htcondor.Submit() # submit object does not have a copy method + for k,v in jobJDL.items(): # so we have to create a new object and + subdagJDL[k] = v # fill it one element at a time + subdagJDL['X509UserProxy'] = os.path.basename(jobJDL['X509UserProxy']) # proxy in scheduler will be in cwd + with open('subdag.jdl', 'w', encoding='utf-8') as fd: + print(subdagJDL, file=fd) + + jobJDL["+TaskType"] = "ROOT" + jobJDL["output"] = os.path.join(info['scratch'], "request.out") + jobJDL["error"] = os.path.join(info['scratch'], "request.err") + jobJDL["Cmd"] = cmd + jobJDL['Args'] = arg + jobJDL["transfer_input_files"] = info['inputFilesString'] htcondor.param['DELEGATE_FULL_JOB_GSI_CREDENTIALS'] = 'true' htcondor.param['DELEGATE_JOB_GSI_CREDENTIALS_LIFETIME'] = '0' - resultAds = [] try: - clusterId = schedd.submit(dagAd, 1, True, resultAds) - schedd.spool(resultAds) + submitResult = schedd.submit(description=jobJDL, count=1, spool=True) + clusterId = submitResult.cluster() + numProcs = submitResult.num_procs() + # firstProc = submitResult.first_proc() + # htcId = f"{clusterId}.{firstProc}" # out cluster has only 1 job + # resultAds = submitResult.clusterad() + myjobs = jobJDL.jobs(count=numProcs, clusterid=clusterId) + schedd.spool(list(myjobs)) except htcondor.HTCondorException as hte: raise TaskWorkerException(f"Submission failed with:\n{hte}") from hte - if 'ClusterId' in resultAds[0]: - htcId = f"{resultAds[0]['ClusterId']}.{resultAds[0]['ProcId']}" - schedd.edit([htcId], "LeaveJobInQueue", classad.ExprTree("true")) - else: - raise TaskWorkerException("Submission failed: no ClusterId was returned") - - # notice that the clusterId might be set even if there was a failure. - # e.g. if the schedd.submit succeded, but the spool call failed self.logger.debug("Condor cluster ID returned from submit is: %s", clusterId) return clusterId diff --git a/src/python/TaskWorker/Actions/PreDAG.py b/src/python/TaskWorker/Actions/PreDAG.py index 1efde60586..8d3ec8b724 100644 --- a/src/python/TaskWorker/Actions/PreDAG.py +++ b/src/python/TaskWorker/Actions/PreDAG.py @@ -333,7 +333,7 @@ def submitSubdag(subdag, maxidle, maxpost, stage): """ Submit a subdag """ subprocess.check_call(['condor_submit_dag', '-DoRecov', '-AutoRescue', '0', '-MaxPre', '20', '-MaxIdle', str(maxidle), - '-MaxPost', str(maxpost), '-insert_sub_file', 'subdag.ad', + '-MaxPost', str(maxpost), '-insert_sub_file', 'subdag.jdl', '-append', '+Environment = strcat(Environment," _CONDOR_DAGMAN_LOG={0}/{1}.dagman.out")'.format(os.getcwd(), subdag), '-append', '+TaskType = "{0}"'.format(stage.upper()), subdag])