Skip to content

Commit

Permalink
Revert to v3.240530 (dmwm#8478)
Browse files Browse the repository at this point in the history
* Revert "Revert "Run test jobs on crab sched 903 (dmwm#8472)" (dmwm#8474)"

This reverts commit 0665454.

* Revert "Run test jobs on crab sched 903 (dmwm#8472)"

This reverts commit c5ec3ef.

* Revert "ensure proxyfile in RestInfoForFileTransfers.json is a filename w/o path. Fix dmwm#8464 (dmwm#8467)"

This reverts commit 9ced4fd.

* Revert "workaround for dmwm#8456 (dmwm#8466)"

This reverts commit 602f8d6.

* Revert "Update makeTests.py: collector param does not allow port #. Simply put FNAL first"

This reverts commit 7ac2b90.

* Revert "Update makeTests.py: add collector port for ITB"

This reverts commit f6c01eb.

* Revert "do not set RequestCpus in task submission JDL. Fix dmwm#8456 (dmwm#8457)"

This reverts commit 198e2d3.

* Revert "pass string, not bytes, to htcondor.param Fix dmwm#8450 (dmwm#8452)"

This reverts commit 856d1ef.

* Revert "schedd.xquery is deprecated. Use schedd.query. Fix dmwm#8447 (dmwm#8449)"

This reverts commit b129645.

* Revert "new format of schedd.submit)/spool() fix dmwm#8336 fix dmwm#8333 (dmwm#8448)"

This reverts commit 806226a.

* Revert "do not indicate unused args in FTS calls. Fix dmwm#8460 (dmwm#8475)"

This reverts commit 20d4f90.
  • Loading branch information
belforte committed Jun 28, 2024
1 parent 6b1e451 commit 7cd7bb7
Show file tree
Hide file tree
Showing 8 changed files with 150 additions and 116 deletions.
3 changes: 3 additions & 0 deletions scripts/task_process/FTS_Transfers.py
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ def submitToFTS(logger, ftsContext, files, jobids, toUpdate):
"userDN": files[0][5],
"taskname": files[0][6]},
copy_pin_lifetime=-1,
bring_online=None,
source_spacetoken=None,
spacetoken=None,
# max time for job in the FTS queue in hours. From FTS experts in
# https://cern.service-now.com/service-portal?id=ticket&table=incident&n=INC2776329
# The max_time_in_queue applies per job, not per retry.
Expand Down
7 changes: 4 additions & 3 deletions src/python/HTCondorLocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,16 @@ def getSchedd(self, chooserFunction=capacityMetricsChoicesHybrid):
collector = self.getCollector()
schedd = None
try:
htcondor.param['COLLECTOR_HOST'] = collector
collParam = 'COLLECTOR_HOST'
htcondor.param[collParam] = collector.encode('ascii', 'ignore')
coll = htcondor.Collector()
# select from collector crabschedds and pull some add values
# this call returns a list of schedd objects.
schedds = coll.query(htcondor.AdTypes.Schedd, 'CMSGWMS_Type=?="crabschedd"',
['Name', 'DetectedMemory', 'TotalFreeMemoryMB', 'TransferQueueNumUploading',
'TransferQueueMaxUploading','TotalRunningJobs', 'JobsRunning', 'MaxJobsRunning', 'IsOK'])
if not schedds:
raise Exception(f"No CRAB schedds returned by collector query. COLLECTOR_HOST parameter is {htcondor.param['COLLECTOR_HOST']}. Try later")
raise Exception(f"No CRAB schedds returned by collector query. {collParam} parameter is {htcondor.param['COLLECTOR_HOST']}. Try later")

# Get only those schedds that are listed in our external REST configuration
if self.config and "htcondorSchedds" in self.config:
Expand Down Expand Up @@ -180,7 +181,7 @@ def getScheddObjNew(self, schedd):
Return a tuple (schedd, address) containing an object representing the
remote schedd and its corresponding address.
"""
htcondor.param['COLLECTOR_HOST'] = self.getCollector()
htcondor.param['COLLECTOR_HOST'] = self.getCollector().encode('ascii', 'ignore')
coll = htcondor.Collector()
schedds = coll.query(htcondor.AdTypes.Schedd, f"Name=?={classad.quote(schedd)}",
["AddressV1", "CondorPlatform", "CondorVersion", "Machine", "MyAddress", "Name", "MyType",
Expand Down
242 changes: 136 additions & 106 deletions src/python/TaskWorker/Actions/DagmanSubmitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import os
import copy
import json
import time
import sys

Expand Down Expand Up @@ -33,67 +34,80 @@
## These are the CRAB attributes that we want to add to the job class ad when
## using the submitDirect() method.
SUBMIT_INFO = [ \
('+CRAB_ReqName', 'requestname'),
('+CRAB_Workflow', 'workflow'),
('+CMS_JobType', 'jobtype'),
('+CRAB_JobSW', 'jobsw'),
('+CRAB_JobArch', 'jobarch'),
('+DESIRED_CMSDataset', 'inputdata'),
('+CRAB_DBSURL', 'dbsurl'),
('+CRAB_PublishName', 'publishname'),
('+CRAB_Publish', 'publication'),
('+CRAB_PublishDBSURL', 'publishdbsurl'),
('+CRAB_PrimaryDataset', 'primarydataset'),
('+CRAB_ISB', 'cacheurl'),
('+CRAB_AdditionalOutputFiles', 'addoutputfiles'),
('+CRAB_EDMOutputFiles', 'edmoutfiles'),
('+CRAB_TFileOutputFiles', 'tfileoutfiles'),
('+CRAB_TransferOutputs', 'saveoutput'),
('+CRAB_SaveLogsFlag', 'savelogsflag'),
('+CRAB_UserDN', 'userdn'),
('+CRAB_UserHN', 'userhn'),
('+CRAB_AsyncDest', 'asyncdest'),
#('+CRAB_StageoutPolicy', 'stageoutpolicy'),
('+CRAB_UserRole', 'tm_user_role'),
('+CRAB_UserGroup', 'tm_user_group'),
('+CRAB_TaskWorker', 'worker_name'),
('+CRAB_RetryOnASOFailures', 'retry_aso'),
('+CRAB_ASOTimeout', 'aso_timeout'),
('+CRAB_RestHost', 'resthost'),
('+CRAB_DbInstance', 'dbinstance'),
('+CRAB_NumAutomJobRetries', 'numautomjobretries'),
('+CRAB_SplitAlgo', 'splitalgo'),
('+CRAB_AlgoArgs', 'algoargs'),
('+CRAB_LumiMask', 'lumimask'),
('+CRAB_JobCount', 'jobcount'),
('+CRAB_UserVO', 'tm_user_vo'),
('+CRAB_SiteBlacklist', 'siteblacklist'),
('+CRAB_SiteWhitelist', 'sitewhitelist'),
('+RequestMemory', 'tm_maxmemory'),
('+RequestCpus', 'tm_numcores'),
('+MaxWallTimeMins', 'tm_maxjobruntime'),
('+MaxWallTimeMinsRun', 'tm_maxjobruntime'),
('+MaxWallTimeMinsProbe', 'maxproberuntime'),
('+MaxWallTimeMinsTail', 'maxtailruntime'),
('CRAB_ReqName', 'requestname'),
('CRAB_Workflow', 'workflow'),
('CMS_JobType', 'jobtype'),
('CRAB_JobSW', 'jobsw'),
('CRAB_JobArch', 'jobarch'),
('DESIRED_CMSDataset', 'inputdata'),
('CRAB_DBSURL', 'dbsurl'),
('CRAB_PublishName', 'publishname'),
('CRAB_Publish', 'publication'),
('CRAB_PublishDBSURL', 'publishdbsurl'),
('CRAB_PrimaryDataset', 'primarydataset'),
('CRAB_ISB', 'cacheurl'),
('CRAB_AdditionalOutputFiles', 'addoutputfiles'),
('CRAB_EDMOutputFiles', 'edmoutfiles'),
('CRAB_TFileOutputFiles', 'tfileoutfiles'),
('CRAB_TransferOutputs', 'saveoutput'),
('CRAB_SaveLogsFlag', 'savelogsflag'),
('CRAB_UserDN', 'userdn'),
('CRAB_UserHN', 'userhn'),
('CRAB_AsyncDest', 'asyncdest'),
#('CRAB_StageoutPolicy', 'stageoutpolicy'),
('CRAB_UserRole', 'tm_user_role'),
('CRAB_UserGroup', 'tm_user_group'),
('CRAB_TaskWorker', 'worker_name'),
('CRAB_RetryOnASOFailures', 'retry_aso'),
('CRAB_ASOTimeout', 'aso_timeout'),
('CRAB_RestHost', 'resthost'),
('CRAB_DbInstance', 'dbinstance'),
('CRAB_NumAutomJobRetries', 'numautomjobretries'),
('CRAB_SplitAlgo', 'splitalgo'),
('CRAB_AlgoArgs', 'algoargs'),
('CRAB_LumiMask', 'lumimask'),
('CRAB_JobCount', 'jobcount'),
('CRAB_UserVO', 'tm_user_vo'),
('CRAB_SiteBlacklist', 'siteblacklist'),
('CRAB_SiteWhitelist', 'sitewhitelist'),
('RequestMemory', 'tm_maxmemory'),
('RequestCpus', 'tm_numcores'),
('MaxWallTimeMins', 'tm_maxjobruntime'),
('MaxWallTimeMinsRun', 'tm_maxjobruntime'),
('MaxWallTimeMinsProbe', 'maxproberuntime'),
('MaxWallTimeMinsTail', 'maxtailruntime'),
('JobPrio', 'tm_priority'),
('+CRAB_FailedNodeLimit', 'faillimit'),
('+CRAB_DashboardTaskType', 'taskType'),
('+CRAB_MaxIdle', 'maxidle'),
('+CRAB_MaxPost', 'maxpost'),
('+CMS_Type', 'cms_type'),
('+CMS_WMTool', 'cms_wmtool'),
('+CMS_TaskType', 'cms_tasktype'),
('CRAB_FailedNodeLimit', 'faillimit'),
('CRAB_DashboardTaskType', 'taskType'),
('CRAB_MaxIdle', 'maxidle'),
('CRAB_MaxPost', 'maxpost'),
('CMS_Type', 'cms_type'),
('CMS_WMTool', 'cms_wmtool'),
('CMS_TaskType', 'cms_tasktype'),
]


def addCRABInfoToJobJDL(jdl, info):
def addCRABInfoToClassAd(ad, info):
"""
given a submit objecty, add in the appropriate CRAB_& attributes
Given a submit ClassAd, add in the appropriate CRAB_* attributes
from the info directory
"""
for adName, dictName in SUBMIT_INFO:
if dictName in info and (info[dictName] is not None):
jdl[adName] = info[dictName]
ad[adName] = classad.ExprTree(str(info[dictName]))
if 'extra_jdl' in info and info['extra_jdl']:
for jdl in info['extra_jdl'].split('\n'):
adName, adVal = jdl.lstrip('+').split('=', 1)
# remove spaces which would break schedd.submit #8420
adName = adName.strip()
adVal = adVal.strip()
ad[adName] = adVal
if 'accelerator_jdl' in info and info['accelerator_jdl']:
for jdl in info['accelerator_jdl'].split('\n'):
adName, adVal = jdl.lstrip('+').split('=', 1)
# remove spaces which would break schedd.submit #8420
adName = adName.strip()
adVal = adVal.strip()
ad[adName] = classad.ExprTree(str(adVal))


class ScheddStats(dict):
Expand Down Expand Up @@ -318,7 +332,7 @@ def duplicateCheck(self, task):
'&& (isUndefined(CRAB_Attempt) || CRAB_Attempt == 0)'

self.logger.debug("Duplicate check is querying the schedd: %s", rootConst)
results = list(schedd.query(rootConst, []))
results = list(schedd.xquery(rootConst, []))
self.logger.debug("Schedd queried %s", results)
except Exception as exp:
msg = "The CRAB server backend was not able to contact the Grid scheduler."
Expand Down Expand Up @@ -383,7 +397,7 @@ def executeInternal(self, info, inputFiles, **kwargs):
os.chdir(kwargs['tempDir'])

info['start_time'] = task['tm_start_time']
info['inputFilesString'] = ", ".join(inputFiles + ['subdag.jdl'])
info['inputFilesString'] = ", ".join(inputFiles + ['subdag.ad'])
outputFiles = ["RunJobs.dag.dagman.out", "RunJobs.dag.rescue.001"]
info['outputFilesString'] = ", ".join(outputFiles)
arg = "RunJobs.dag"
Expand Down Expand Up @@ -461,73 +475,89 @@ def submitDirect(self, schedd, cmd, arg, info): #pylint: disable=R0201
"""
Submit directly to the schedd using the HTCondor module
"""
jobJDL = htcondor.Submit()
addCRABInfoToJobJDL(jobJDL, info)
dagAd = classad.ClassAd()
addCRABInfoToClassAd(dagAd, info)

if info["CMSGroups"]:
jobJDL["+CMSGroups"] = classad.quote(','.join(info["CMSGroups"]))
dagAd["CMSGroups"] = ','.join(info["CMSGroups"])
else:
jobJDL["+CMSGroups"] = classad.Value.Undefined
dagAd["CMSGroups"] = classad.Value.Undefined

# NOTE: Changes here must be synchronized with the job_submit in DagmanCreator.py in CAFTaskWorker
jobJDL["+CRAB_Attempt"] = 0
jobJDL["+CMS_SubmissionTool"] = "CRAB"
dagAd["CRAB_Attempt"] = 0
dagAd["CMS_SubmissionTool"] = "CRAB"
# We switched from local to scheduler universe. Why? It seems there's no way in the
# local universe to change the hold signal at runtime. That's fairly important for our
# resubmit implementation.
#jobJDL["JobUniverse"] = 12
jobJDL["JobUniverse"] = 7
jobJDL["HoldKillSig"] = "SIGUSR1"
jobJDL["X509UserProxy"] = info['user_proxy']
jobJDL["Requirements"] = "TARGET.Cpus == 1" # see https://github.com/dmwm/CRABServer/issues/8456#issuecomment-2145887432
environmentString = "PATH=/usr/bin:/bin CRAB3_VERSION=3.3.0-pre1"
environmentString += " CONDOR_ID=$(ClusterId).$(ProcId)"
environmentString += " " + " ".join(info['additional_environment_options'].split(';'))
# Environment command in JDL requires proper quotes https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html#environment
jobJDL["Environment"] = classad.quote(environmentString)
jobJDL["+RemoteCondorSetup"] = info['remote_condor_setup']
jobJDL["+CRAB_TaskSubmitTime"] = info['start_time']
jobJDL['+CRAB_TaskLifetimeDays'] = TASKLIFETIME // 24 // 60 // 60
jobJDL['+CRAB_TaskEndTime'] = int(info['start_time']) + TASKLIFETIME
#dagAd["JobUniverse"] = 12
dagAd["JobUniverse"] = 7
dagAd["HoldKillSig"] = "SIGUSR1"
dagAd["X509UserProxy"] = info['user_proxy']
dagAd["Requirements"] = classad.ExprTree('true || false')
# SB April 2024 the following horrible linw could be replace in current HTConcdor by
# dagAd["Environmnet"] = "PATH=...; CRAB3_VERSION=...; ...." see
# https://htcondor.readthedocs.io/en/latest/man-pages/condor_submit.html#environment
dagAd["Environment"] = classad.ExprTree('strcat("PATH=/usr/bin:/bin CRAB3_VERSION=3.3.0-pre1 CONDOR_ID=", ClusterId, ".", ProcId," %s")' % " ".join(info['additional_environment_options'].split(";")))
dagAd["RemoteCondorSetup"] = info['remote_condor_setup']

dagAd["CRAB_TaskSubmitTime"] = info['start_time']
dagAd['CRAB_TaskLifetimeDays'] = TASKLIFETIME // 24 // 60 // 60
dagAd['CRAB_TaskEndTime'] = int(info['start_time']) + TASKLIFETIME
#For task management info see https://github.com/dmwm/CRABServer/issues/4681#issuecomment-302336451
jobJDL["LeaveJobInQueue"] = "True"
jobJDL["PeriodicHold"] = "time() > CRAB_TaskEndTime"
jobJDL["transfer_output_files"] = info['outputFilesString']
jobJDL["OnExitHold"] = "(ExitCode =!= UNDEFINED && ExitCode != 0)"
jobJDL["OnExitRemove"] = "( ExitSignal =?= 11 || (ExitCode =!= UNDEFINED && ExitCode >=0 && ExitCode <= 2))"
# jobJDL["OtherJobRemoveRequirements"] = "DAGManJobId =?= ClusterId" # appears unused SB
jobJDL["RemoveKillSig"] = "SIGUSR1"

# prepare a jobJDL fragment to be used when running in the scheduler
# to create subdags for automatic splitting. A crucial change is location of the proxy
subdagJDL = htcondor.Submit() # submit object does not have a copy method
for k,v in jobJDL.items(): # so we have to create a new object and
subdagJDL[k] = v # fill it one element at a time
subdagJDL['X509UserProxy'] = os.path.basename(jobJDL['X509UserProxy']) # proxy in scheduler will be in cwd
with open('subdag.jdl', 'w', encoding='utf-8') as fd:
print(subdagJDL, file=fd)

jobJDL["+TaskType"] = "ROOT"
jobJDL["output"] = os.path.join(info['scratch'], "request.out")
jobJDL["error"] = os.path.join(info['scratch'], "request.err")
jobJDL["Cmd"] = cmd
jobJDL['Args'] = arg
jobJDL["transfer_input_files"] = info['inputFilesString']
dagAd["LeaveJobInQueue"] = classad.ExprTree("true")
dagAd["PeriodicHold"] = classad.ExprTree("time() > CRAB_TaskEndTime")
dagAd["TransferOutput"] = info['outputFilesString']
dagAd["OnExitHold"] = classad.ExprTree("(ExitCode =!= UNDEFINED && ExitCode != 0)")
dagAd["OnExitRemove"] = classad.ExprTree("( ExitSignal =?= 11 || (ExitCode =!= UNDEFINED && ExitCode >=0 && ExitCode <= 2))")
dagAd["OtherJobRemoveRequirements"] = classad.ExprTree("DAGManJobId =?= ClusterId")
dagAd["RemoveKillSig"] = "SIGUSR1"

# SB April 2024: the following write can likely be replaced by
# print(dagAd, file=fd) which is the same as fd.write(str(dagAd))
# letting internal classad code do the formatting (in new format, which is fine
# since now we read with classad.parseOne which handles both old and new)
with open('subdag.ad', 'w', encoding='utf-8') as fd:
for k, v in dagAd.items():
if k == 'X509UserProxy':
v = os.path.basename(v)
if isinstance(v, str):
value = classad.quote(v)
elif isinstance(v, bytes):
# we only expect strings in the code above, but.. just in case,
# be prarped for bytes in case it requires a different handling at some point
value = classad.quote(v)
elif isinstance(v, classad.ExprTree):
value = repr(v)
elif isinstance(v, list):
value = "{{{0}}}".format(json.dumps(v)[1:-1])
else:
value = v
fd.write(f"+{k} = {value}\n")

dagAd["TaskType"] = "ROOT"
dagAd["Out"] = str(os.path.join(info['scratch'], "request.out"))
dagAd["Err"] = str(os.path.join(info['scratch'], "request.err"))
dagAd["Cmd"] = cmd
dagAd['Args'] = arg
dagAd["TransferInput"] = str(info['inputFilesString'])

htcondor.param['DELEGATE_FULL_JOB_GSI_CREDENTIALS'] = 'true'
htcondor.param['DELEGATE_JOB_GSI_CREDENTIALS_LIFETIME'] = '0'
resultAds = []
try:
submitResult = schedd.submit(description=jobJDL, count=1, spool=True)
clusterId = submitResult.cluster()
numProcs = submitResult.num_procs()
# firstProc = submitResult.first_proc()
# htcId = f"{clusterId}.{firstProc}" # out cluster has only 1 job
# resultAds = submitResult.clusterad()
myjobs = jobJDL.jobs(count=numProcs, clusterid=clusterId)
schedd.spool(list(myjobs))
clusterId = schedd.submit(dagAd, 1, True, resultAds)
schedd.spool(resultAds)
except htcondor.HTCondorException as hte:
raise TaskWorkerException(f"Submission failed with:\n{hte}") from hte

if 'ClusterId' in resultAds[0]:
htcId = f"{resultAds[0]['ClusterId']}.{resultAds[0]['ProcId']}"
schedd.edit([htcId], "LeaveJobInQueue", classad.ExprTree("true"))
else:
raise TaskWorkerException("Submission failed: no ClusterId was returned")

# notice that the clusterId might be set even if there was a failure.
# e.g. if the schedd.submit succeded, but the spool call failed
self.logger.debug("Condor cluster ID returned from submit is: %s", clusterId)

return clusterId
4 changes: 2 additions & 2 deletions src/python/TaskWorker/Actions/PostJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -995,7 +995,7 @@ def updateOrInsertDoc(self, doc, toTransfer):
#if not os.path.exists('task_process/rest_filetransfers.txt'):
restInfo = {'host':self.rest_host,
'dbInstance': self.db_instance,
'proxyfile': os.path.basename(self.proxy)}
'proxyfile': self.proxy}
with open('task_process/RestInfoForFileTransfers.json', 'w') as fp:
json.dump(restInfo, fp)
else:
Expand All @@ -1014,7 +1014,7 @@ def updateOrInsertDoc(self, doc, toTransfer):
#if not os.path.exists('task_process/rest_filetransfers.txt'):
restInfo = {'host':self.rest_host,
'dbInstance': self.db_instance,
'proxyfile': os.path.basename(self.proxy)}
'proxyfile': self.proxy}
with open('task_process/RestInfoForFileTransfers.json','w') as fp:
json.dump(restInfo, fp)
return returnMsg
Expand Down
2 changes: 1 addition & 1 deletion src/python/TaskWorker/Actions/PreDAG.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ def submitSubdag(subdag, maxidle, maxpost, stage):
""" Submit a subdag
"""
subprocess.check_call(['condor_submit_dag', '-DoRecov', '-AutoRescue', '0', '-MaxPre', '20', '-MaxIdle', str(maxidle),
'-MaxPost', str(maxpost), '-insert_sub_file', 'subdag.jdl',
'-MaxPost', str(maxpost), '-insert_sub_file', 'subdag.ad',
'-append', '+Environment = strcat(Environment," _CONDOR_DAGMAN_LOG={0}/{1}.dagman.out")'.format(os.getcwd(), subdag),
'-append', '+TaskType = "{0}"'.format(stage.upper()), subdag])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def execute_schedd(self, schedd_name, collector):
self.logger.debug("Schedd found at %s", schedd_ad['MyAddress'])
schedd = htcondor.Schedd(schedd_ad)
self.logger.debug("Querying schedd for CRAB3 tasks.")
task_ads = list(schedd.query('TaskType =?= "ROOT" && CRAB_HC =!= "True"', QUERY_ATTRS))
task_ads = list(schedd.xquery('TaskType =?= "ROOT" && CRAB_HC =!= "True"', QUERY_ATTRS))
self.logger.info("There were %d tasks found.", len(task_ads))
ads = {}
now = time.time()
Expand Down
4 changes: 2 additions & 2 deletions src/script/Monitor/GenerateMONIT.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,11 @@ def execute(self):
continue
schedd = htcondor.Schedd(scheddAdd)
try:
idleDags = list(schedd.query(pickSchedulerIdle))
idleDags = list(schedd.xquery(pickSchedulerIdle))
except Exception:
idleDags = []
try:
runningTPs = list(schedd.query(pickLocalRunning))
runningTPs = list(schedd.xquery(pickLocalRunning))
except Exception:
runningTPs = []
numDagIdle = len(idleDags)
Expand Down
Loading

0 comments on commit 7cd7bb7

Please sign in to comment.