diff --git a/scripts/task_process/FTS_Transfers.py b/scripts/task_process/FTS_Transfers.py index 5af6df982f..717caab760 100644 --- a/scripts/task_process/FTS_Transfers.py +++ b/scripts/task_process/FTS_Transfers.py @@ -326,6 +326,9 @@ def submitToFTS(logger, ftsContext, files, jobids, toUpdate): "userDN": files[0][5], "taskname": files[0][6]}, copy_pin_lifetime=-1, + bring_online=None, + source_spacetoken=None, + spacetoken=None, # max time for job in the FTS queue in hours. From FTS experts in # https://cern.service-now.com/service-portal?id=ticket&table=incident&n=INC2776329 # The max_time_in_queue applies per job, not per retry. diff --git a/src/python/HTCondorLocator.py b/src/python/HTCondorLocator.py index 6b6ceadef9..94640f6c9a 100644 --- a/src/python/HTCondorLocator.py +++ b/src/python/HTCondorLocator.py @@ -121,7 +121,8 @@ def getSchedd(self, chooserFunction=capacityMetricsChoicesHybrid): collector = self.getCollector() schedd = None try: - htcondor.param['COLLECTOR_HOST'] = collector + collParam = 'COLLECTOR_HOST' + htcondor.param[collParam] = collector.encode('ascii', 'ignore') coll = htcondor.Collector() # select from collector crabschedds and pull some add values # this call returns a list of schedd objects. @@ -129,7 +130,7 @@ def getSchedd(self, chooserFunction=capacityMetricsChoicesHybrid): ['Name', 'DetectedMemory', 'TotalFreeMemoryMB', 'TransferQueueNumUploading', 'TransferQueueMaxUploading','TotalRunningJobs', 'JobsRunning', 'MaxJobsRunning', 'IsOK']) if not schedds: - raise Exception(f"No CRAB schedds returned by collector query. COLLECTOR_HOST parameter is {htcondor.param['COLLECTOR_HOST']}. Try later") + raise Exception(f"No CRAB schedds returned by collector query. {collParam} parameter is {htcondor.param['COLLECTOR_HOST']}. Try later") # Get only those schedds that are listed in our external REST configuration if self.config and "htcondorSchedds" in self.config: @@ -180,7 +181,7 @@ def getScheddObjNew(self, schedd): Return a tuple (schedd, address) containing an object representing the remote schedd and its corresponding address. """ - htcondor.param['COLLECTOR_HOST'] = self.getCollector() + htcondor.param['COLLECTOR_HOST'] = self.getCollector().encode('ascii', 'ignore') coll = htcondor.Collector() schedds = coll.query(htcondor.AdTypes.Schedd, f"Name=?={classad.quote(schedd)}", ["AddressV1", "CondorPlatform", "CondorVersion", "Machine", "MyAddress", "Name", "MyType", diff --git a/src/python/TaskWorker/Actions/DagmanSubmitter.py b/src/python/TaskWorker/Actions/DagmanSubmitter.py index 7571937d51..104fef7c7f 100644 --- a/src/python/TaskWorker/Actions/DagmanSubmitter.py +++ b/src/python/TaskWorker/Actions/DagmanSubmitter.py @@ -5,6 +5,7 @@ import os import copy +import json import time import sys @@ -33,67 +34,80 @@ ## These are the CRAB attributes that we want to add to the job class ad when ## using the submitDirect() method. SUBMIT_INFO = [ \ - ('+CRAB_ReqName', 'requestname'), - ('+CRAB_Workflow', 'workflow'), - ('+CMS_JobType', 'jobtype'), - ('+CRAB_JobSW', 'jobsw'), - ('+CRAB_JobArch', 'jobarch'), - ('+DESIRED_CMSDataset', 'inputdata'), - ('+CRAB_DBSURL', 'dbsurl'), - ('+CRAB_PublishName', 'publishname'), - ('+CRAB_Publish', 'publication'), - ('+CRAB_PublishDBSURL', 'publishdbsurl'), - ('+CRAB_PrimaryDataset', 'primarydataset'), - ('+CRAB_ISB', 'cacheurl'), - ('+CRAB_AdditionalOutputFiles', 'addoutputfiles'), - ('+CRAB_EDMOutputFiles', 'edmoutfiles'), - ('+CRAB_TFileOutputFiles', 'tfileoutfiles'), - ('+CRAB_TransferOutputs', 'saveoutput'), - ('+CRAB_SaveLogsFlag', 'savelogsflag'), - ('+CRAB_UserDN', 'userdn'), - ('+CRAB_UserHN', 'userhn'), - ('+CRAB_AsyncDest', 'asyncdest'), - #('+CRAB_StageoutPolicy', 'stageoutpolicy'), - ('+CRAB_UserRole', 'tm_user_role'), - ('+CRAB_UserGroup', 'tm_user_group'), - ('+CRAB_TaskWorker', 'worker_name'), - ('+CRAB_RetryOnASOFailures', 'retry_aso'), - ('+CRAB_ASOTimeout', 'aso_timeout'), - ('+CRAB_RestHost', 'resthost'), - ('+CRAB_DbInstance', 'dbinstance'), - ('+CRAB_NumAutomJobRetries', 'numautomjobretries'), - ('+CRAB_SplitAlgo', 'splitalgo'), - ('+CRAB_AlgoArgs', 'algoargs'), - ('+CRAB_LumiMask', 'lumimask'), - ('+CRAB_JobCount', 'jobcount'), - ('+CRAB_UserVO', 'tm_user_vo'), - ('+CRAB_SiteBlacklist', 'siteblacklist'), - ('+CRAB_SiteWhitelist', 'sitewhitelist'), - ('+RequestMemory', 'tm_maxmemory'), - ('+RequestCpus', 'tm_numcores'), - ('+MaxWallTimeMins', 'tm_maxjobruntime'), - ('+MaxWallTimeMinsRun', 'tm_maxjobruntime'), - ('+MaxWallTimeMinsProbe', 'maxproberuntime'), - ('+MaxWallTimeMinsTail', 'maxtailruntime'), + ('CRAB_ReqName', 'requestname'), + ('CRAB_Workflow', 'workflow'), + ('CMS_JobType', 'jobtype'), + ('CRAB_JobSW', 'jobsw'), + ('CRAB_JobArch', 'jobarch'), + ('DESIRED_CMSDataset', 'inputdata'), + ('CRAB_DBSURL', 'dbsurl'), + ('CRAB_PublishName', 'publishname'), + ('CRAB_Publish', 'publication'), + ('CRAB_PublishDBSURL', 'publishdbsurl'), + ('CRAB_PrimaryDataset', 'primarydataset'), + ('CRAB_ISB', 'cacheurl'), + ('CRAB_AdditionalOutputFiles', 'addoutputfiles'), + ('CRAB_EDMOutputFiles', 'edmoutfiles'), + ('CRAB_TFileOutputFiles', 'tfileoutfiles'), + ('CRAB_TransferOutputs', 'saveoutput'), + ('CRAB_SaveLogsFlag', 'savelogsflag'), + ('CRAB_UserDN', 'userdn'), + ('CRAB_UserHN', 'userhn'), + ('CRAB_AsyncDest', 'asyncdest'), + #('CRAB_StageoutPolicy', 'stageoutpolicy'), + ('CRAB_UserRole', 'tm_user_role'), + ('CRAB_UserGroup', 'tm_user_group'), + ('CRAB_TaskWorker', 'worker_name'), + ('CRAB_RetryOnASOFailures', 'retry_aso'), + ('CRAB_ASOTimeout', 'aso_timeout'), + ('CRAB_RestHost', 'resthost'), + ('CRAB_DbInstance', 'dbinstance'), + ('CRAB_NumAutomJobRetries', 'numautomjobretries'), + ('CRAB_SplitAlgo', 'splitalgo'), + ('CRAB_AlgoArgs', 'algoargs'), + ('CRAB_LumiMask', 'lumimask'), + ('CRAB_JobCount', 'jobcount'), + ('CRAB_UserVO', 'tm_user_vo'), + ('CRAB_SiteBlacklist', 'siteblacklist'), + ('CRAB_SiteWhitelist', 'sitewhitelist'), + ('RequestMemory', 'tm_maxmemory'), + ('RequestCpus', 'tm_numcores'), + ('MaxWallTimeMins', 'tm_maxjobruntime'), + ('MaxWallTimeMinsRun', 'tm_maxjobruntime'), + ('MaxWallTimeMinsProbe', 'maxproberuntime'), + ('MaxWallTimeMinsTail', 'maxtailruntime'), ('JobPrio', 'tm_priority'), - ('+CRAB_FailedNodeLimit', 'faillimit'), - ('+CRAB_DashboardTaskType', 'taskType'), - ('+CRAB_MaxIdle', 'maxidle'), - ('+CRAB_MaxPost', 'maxpost'), - ('+CMS_Type', 'cms_type'), - ('+CMS_WMTool', 'cms_wmtool'), - ('+CMS_TaskType', 'cms_tasktype'), + ('CRAB_FailedNodeLimit', 'faillimit'), + ('CRAB_DashboardTaskType', 'taskType'), + ('CRAB_MaxIdle', 'maxidle'), + ('CRAB_MaxPost', 'maxpost'), + ('CMS_Type', 'cms_type'), + ('CMS_WMTool', 'cms_wmtool'), + ('CMS_TaskType', 'cms_tasktype'), ] - -def addCRABInfoToJobJDL(jdl, info): +def addCRABInfoToClassAd(ad, info): """ - given a submit objecty, add in the appropriate CRAB_& attributes + Given a submit ClassAd, add in the appropriate CRAB_* attributes from the info directory """ for adName, dictName in SUBMIT_INFO: if dictName in info and (info[dictName] is not None): - jdl[adName] = info[dictName] + ad[adName] = classad.ExprTree(str(info[dictName])) + if 'extra_jdl' in info and info['extra_jdl']: + for jdl in info['extra_jdl'].split('\n'): + adName, adVal = jdl.lstrip('+').split('=', 1) + # remove spaces which would break schedd.submit #8420 + adName = adName.strip() + adVal = adVal.strip() + ad[adName] = adVal + if 'accelerator_jdl' in info and info['accelerator_jdl']: + for jdl in info['accelerator_jdl'].split('\n'): + adName, adVal = jdl.lstrip('+').split('=', 1) + # remove spaces which would break schedd.submit #8420 + adName = adName.strip() + adVal = adVal.strip() + ad[adName] = classad.ExprTree(str(adVal)) class ScheddStats(dict): @@ -318,7 +332,7 @@ def duplicateCheck(self, task): '&& (isUndefined(CRAB_Attempt) || CRAB_Attempt == 0)' self.logger.debug("Duplicate check is querying the schedd: %s", rootConst) - results = list(schedd.query(rootConst, [])) + results = list(schedd.xquery(rootConst, [])) self.logger.debug("Schedd queried %s", results) except Exception as exp: msg = "The CRAB server backend was not able to contact the Grid scheduler." @@ -383,7 +397,7 @@ def executeInternal(self, info, inputFiles, **kwargs): os.chdir(kwargs['tempDir']) info['start_time'] = task['tm_start_time'] - info['inputFilesString'] = ", ".join(inputFiles + ['subdag.jdl']) + info['inputFilesString'] = ", ".join(inputFiles + ['subdag.ad']) outputFiles = ["RunJobs.dag.dagman.out", "RunJobs.dag.rescue.001"] info['outputFilesString'] = ", ".join(outputFiles) arg = "RunJobs.dag" @@ -461,73 +475,89 @@ def submitDirect(self, schedd, cmd, arg, info): #pylint: disable=R0201 """ Submit directly to the schedd using the HTCondor module """ - jobJDL = htcondor.Submit() - addCRABInfoToJobJDL(jobJDL, info) + dagAd = classad.ClassAd() + addCRABInfoToClassAd(dagAd, info) if info["CMSGroups"]: - jobJDL["+CMSGroups"] = classad.quote(','.join(info["CMSGroups"])) + dagAd["CMSGroups"] = ','.join(info["CMSGroups"]) else: - jobJDL["+CMSGroups"] = classad.Value.Undefined + dagAd["CMSGroups"] = classad.Value.Undefined # NOTE: Changes here must be synchronized with the job_submit in DagmanCreator.py in CAFTaskWorker - jobJDL["+CRAB_Attempt"] = 0 - jobJDL["+CMS_SubmissionTool"] = "CRAB" + dagAd["CRAB_Attempt"] = 0 + dagAd["CMS_SubmissionTool"] = "CRAB" # We switched from local to scheduler universe. Why? It seems there's no way in the # local universe to change the hold signal at runtime. That's fairly important for our # resubmit implementation. - #jobJDL["JobUniverse"] = 12 - jobJDL["JobUniverse"] = 7 - jobJDL["HoldKillSig"] = "SIGUSR1" - jobJDL["X509UserProxy"] = info['user_proxy'] - jobJDL["Requirements"] = "TARGET.Cpus == 1" # see https://github.com/dmwm/CRABServer/issues/8456#issuecomment-2145887432 - environmentString = "PATH=/usr/bin:/bin CRAB3_VERSION=3.3.0-pre1" - environmentString += " CONDOR_ID=$(ClusterId).$(ProcId)" - environmentString += " " + " ".join(info['additional_environment_options'].split(';')) - # Environment command in JDL requires proper quotes https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html#environment - jobJDL["Environment"] = classad.quote(environmentString) - jobJDL["+RemoteCondorSetup"] = info['remote_condor_setup'] - jobJDL["+CRAB_TaskSubmitTime"] = info['start_time'] - jobJDL['+CRAB_TaskLifetimeDays'] = TASKLIFETIME // 24 // 60 // 60 - jobJDL['+CRAB_TaskEndTime'] = int(info['start_time']) + TASKLIFETIME + #dagAd["JobUniverse"] = 12 + dagAd["JobUniverse"] = 7 + dagAd["HoldKillSig"] = "SIGUSR1" + dagAd["X509UserProxy"] = info['user_proxy'] + dagAd["Requirements"] = classad.ExprTree('true || false') + # SB April 2024 the following horrible linw could be replace in current HTConcdor by + # dagAd["Environmnet"] = "PATH=...; CRAB3_VERSION=...; ...." see + # https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html#environment + dagAd["Environment"] = classad.ExprTree('strcat("PATH=/usr/bin:/bin CRAB3_VERSION=3.3.0-pre1 CONDOR_ID=", ClusterId, ".", ProcId," %s")' % " ".join(info['additional_environment_options'].split(";"))) + dagAd["RemoteCondorSetup"] = info['remote_condor_setup'] + + dagAd["CRAB_TaskSubmitTime"] = info['start_time'] + dagAd['CRAB_TaskLifetimeDays'] = TASKLIFETIME // 24 // 60 // 60 + dagAd['CRAB_TaskEndTime'] = int(info['start_time']) + TASKLIFETIME #For task management info see https://github.com/dmwm/CRABServer/issues/4681#issuecomment-302336451 - jobJDL["LeaveJobInQueue"] = "True" - jobJDL["PeriodicHold"] = "time() > CRAB_TaskEndTime" - jobJDL["transfer_output_files"] = info['outputFilesString'] - jobJDL["OnExitHold"] = "(ExitCode =!= UNDEFINED && ExitCode != 0)" - jobJDL["OnExitRemove"] = "( ExitSignal =?= 11 || (ExitCode =!= UNDEFINED && ExitCode >=0 && ExitCode <= 2))" - # jobJDL["OtherJobRemoveRequirements"] = "DAGManJobId =?= ClusterId" # appears unused SB - jobJDL["RemoveKillSig"] = "SIGUSR1" - - # prepare a jobJDL fragment to be used when running in the scheduler - # to create subdags for automatic splitting. A crucial change is location of the proxy - subdagJDL = htcondor.Submit() # submit object does not have a copy method - for k,v in jobJDL.items(): # so we have to create a new object and - subdagJDL[k] = v # fill it one element at a time - subdagJDL['X509UserProxy'] = os.path.basename(jobJDL['X509UserProxy']) # proxy in scheduler will be in cwd - with open('subdag.jdl', 'w', encoding='utf-8') as fd: - print(subdagJDL, file=fd) - - jobJDL["+TaskType"] = "ROOT" - jobJDL["output"] = os.path.join(info['scratch'], "request.out") - jobJDL["error"] = os.path.join(info['scratch'], "request.err") - jobJDL["Cmd"] = cmd - jobJDL['Args'] = arg - jobJDL["transfer_input_files"] = info['inputFilesString'] + dagAd["LeaveJobInQueue"] = classad.ExprTree("true") + dagAd["PeriodicHold"] = classad.ExprTree("time() > CRAB_TaskEndTime") + dagAd["TransferOutput"] = info['outputFilesString'] + dagAd["OnExitHold"] = classad.ExprTree("(ExitCode =!= UNDEFINED && ExitCode != 0)") + dagAd["OnExitRemove"] = classad.ExprTree("( ExitSignal =?= 11 || (ExitCode =!= UNDEFINED && ExitCode >=0 && ExitCode <= 2))") + dagAd["OtherJobRemoveRequirements"] = classad.ExprTree("DAGManJobId =?= ClusterId") + dagAd["RemoveKillSig"] = "SIGUSR1" + + # SB April 2024: the following write can likely be replaced by + # print(dagAd, file=fd) which is the same as fd.write(str(dagAd)) + # letting internal classad code do the formatting (in new format, which is fine + # since now we read with classad.parseOne which handles both old and new) + with open('subdag.ad', 'w', encoding='utf-8') as fd: + for k, v in dagAd.items(): + if k == 'X509UserProxy': + v = os.path.basename(v) + if isinstance(v, str): + value = classad.quote(v) + elif isinstance(v, bytes): + # we only expect strings in the code above, but.. just in case, + # be prarped for bytes in case it requires a different handling at some point + value = classad.quote(v) + elif isinstance(v, classad.ExprTree): + value = repr(v) + elif isinstance(v, list): + value = "{{{0}}}".format(json.dumps(v)[1:-1]) + else: + value = v + fd.write(f"+{k} = {value}\n") + + dagAd["TaskType"] = "ROOT" + dagAd["Out"] = str(os.path.join(info['scratch'], "request.out")) + dagAd["Err"] = str(os.path.join(info['scratch'], "request.err")) + dagAd["Cmd"] = cmd + dagAd['Args'] = arg + dagAd["TransferInput"] = str(info['inputFilesString']) htcondor.param['DELEGATE_FULL_JOB_GSI_CREDENTIALS'] = 'true' htcondor.param['DELEGATE_JOB_GSI_CREDENTIALS_LIFETIME'] = '0' + resultAds = [] try: - submitResult = schedd.submit(description=jobJDL, count=1, spool=True) - clusterId = submitResult.cluster() - numProcs = submitResult.num_procs() - # firstProc = submitResult.first_proc() - # htcId = f"{clusterId}.{firstProc}" # out cluster has only 1 job - # resultAds = submitResult.clusterad() - myjobs = jobJDL.jobs(count=numProcs, clusterid=clusterId) - schedd.spool(list(myjobs)) + clusterId = schedd.submit(dagAd, 1, True, resultAds) + schedd.spool(resultAds) except htcondor.HTCondorException as hte: raise TaskWorkerException(f"Submission failed with:\n{hte}") from hte + if 'ClusterId' in resultAds[0]: + htcId = f"{resultAds[0]['ClusterId']}.{resultAds[0]['ProcId']}" + schedd.edit([htcId], "LeaveJobInQueue", classad.ExprTree("true")) + else: + raise TaskWorkerException("Submission failed: no ClusterId was returned") + + # notice that the clusterId might be set even if there was a failure. + # e.g. if the schedd.submit succeded, but the spool call failed self.logger.debug("Condor cluster ID returned from submit is: %s", clusterId) return clusterId diff --git a/src/python/TaskWorker/Actions/PostJob.py b/src/python/TaskWorker/Actions/PostJob.py index 5cc59b96f3..17c235edf8 100644 --- a/src/python/TaskWorker/Actions/PostJob.py +++ b/src/python/TaskWorker/Actions/PostJob.py @@ -995,7 +995,7 @@ def updateOrInsertDoc(self, doc, toTransfer): #if not os.path.exists('task_process/rest_filetransfers.txt'): restInfo = {'host':self.rest_host, 'dbInstance': self.db_instance, - 'proxyfile': os.path.basename(self.proxy)} + 'proxyfile': self.proxy} with open('task_process/RestInfoForFileTransfers.json', 'w') as fp: json.dump(restInfo, fp) else: @@ -1014,7 +1014,7 @@ def updateOrInsertDoc(self, doc, toTransfer): #if not os.path.exists('task_process/rest_filetransfers.txt'): restInfo = {'host':self.rest_host, 'dbInstance': self.db_instance, - 'proxyfile': os.path.basename(self.proxy)} + 'proxyfile': self.proxy} with open('task_process/RestInfoForFileTransfers.json','w') as fp: json.dump(restInfo, fp) return returnMsg diff --git a/src/python/TaskWorker/Actions/PreDAG.py b/src/python/TaskWorker/Actions/PreDAG.py index 8d3ec8b724..1efde60586 100644 --- a/src/python/TaskWorker/Actions/PreDAG.py +++ b/src/python/TaskWorker/Actions/PreDAG.py @@ -333,7 +333,7 @@ def submitSubdag(subdag, maxidle, maxpost, stage): """ Submit a subdag """ subprocess.check_call(['condor_submit_dag', '-DoRecov', '-AutoRescue', '0', '-MaxPre', '20', '-MaxIdle', str(maxidle), - '-MaxPost', str(maxpost), '-insert_sub_file', 'subdag.jdl', + '-MaxPost', str(maxpost), '-insert_sub_file', 'subdag.ad', '-append', '+Environment = strcat(Environment," _CONDOR_DAGMAN_LOG={0}/{1}.dagman.out")'.format(os.getcwd(), subdag), '-append', '+TaskType = "{0}"'.format(stage.upper()), subdag]) diff --git a/src/python/TaskWorker/Actions/Recurring/RenewRemoteProxies.py b/src/python/TaskWorker/Actions/Recurring/RenewRemoteProxies.py index 9a4110add4..e5afa96dca 100644 --- a/src/python/TaskWorker/Actions/Recurring/RenewRemoteProxies.py +++ b/src/python/TaskWorker/Actions/Recurring/RenewRemoteProxies.py @@ -150,7 +150,7 @@ def execute_schedd(self, schedd_name, collector): self.logger.debug("Schedd found at %s", schedd_ad['MyAddress']) schedd = htcondor.Schedd(schedd_ad) self.logger.debug("Querying schedd for CRAB3 tasks.") - task_ads = list(schedd.query('TaskType =?= "ROOT" && CRAB_HC =!= "True"', QUERY_ATTRS)) + task_ads = list(schedd.xquery('TaskType =?= "ROOT" && CRAB_HC =!= "True"', QUERY_ATTRS)) self.logger.info("There were %d tasks found.", len(task_ads)) ads = {} now = time.time() diff --git a/src/script/Monitor/GenerateMONIT.py b/src/script/Monitor/GenerateMONIT.py index 07574f59f7..32dd9099cf 100644 --- a/src/script/Monitor/GenerateMONIT.py +++ b/src/script/Monitor/GenerateMONIT.py @@ -292,11 +292,11 @@ def execute(self): continue schedd = htcondor.Schedd(scheddAdd) try: - idleDags = list(schedd.query(pickSchedulerIdle)) + idleDags = list(schedd.xquery(pickSchedulerIdle)) except Exception: idleDags = [] try: - runningTPs = list(schedd.query(pickLocalRunning)) + runningTPs = list(schedd.xquery(pickLocalRunning)) except Exception: runningTPs = [] numDagIdle = len(idleDags) diff --git a/test/makeTests.py b/test/makeTests.py index 54c68b0967..4a7591ec07 100644 --- a/test/makeTests.py +++ b/test/makeTests.py @@ -599,7 +599,7 @@ # collector name = 'collector' confChangesList = [] -changeDict = {'param': name, 'value': '"cmsgwms-collector-itb.fnal.gov,cmsgwms-collector-itb.cern.ch"', 'section': 'Debug'} +changeDict = {'param': name, 'value': '"cmsgwms-collector-itb.cern.ch,cmsgwms-collector-itb.fnal.gov"', 'section': 'Debug'} confChangesList.append(changeDict) changeDict = {'param': 'scheddName', 'value': '"crab3@vocms068.cern.ch"', 'section': 'Debug'} confChangesList.append(changeDict)