diff --git a/src/python/Publisher/PublisherMaster.py b/src/python/Publisher/PublisherMaster.py index f7c079e08d..8b0d207a02 100644 --- a/src/python/Publisher/PublisherMaster.py +++ b/src/python/Publisher/PublisherMaster.py @@ -1,4 +1,4 @@ -# pylint: disable=C0103, W0703, R0912, R0914, R0915 +# pylint: disable=invalid-name # have a lot of snake_case varaibles here from "old times" """ Here's the algorithm @@ -9,15 +9,8 @@ 4. spawn a process per task that publish their files """ -from __future__ import division -from __future__ import print_function import argparse -import logging -from logging import FileHandler -from logging.handlers import TimedRotatingFileHandler import os -import traceback -import sys import json import pickle import tempfile @@ -25,57 +18,20 @@ import time from pathlib import Path from multiprocessing import Process -from MultiProcessingLog import MultiProcessingLog from WMCore.Configuration import loadConfigurationFile from WMCore.Services.Requests import Requests -from RESTInteractions import CRABRest from ServerUtilities import getColumn, encodeRequest, oracleOutputMapping, executeCommand -from ServerUtilities import SERVICE_INSTANCES +from ServerUtilities import getHashLfn from ServerUtilities import getProxiedWebDir -from TaskWorker import __version__ -from TaskWorker.WorkerExceptions import ConfigException - - -def chunks(l, n): - """ - Yield successive n-sized chunks from l. - :param l: list to splitt in chunks - :param n: chunk size - :return: yield the next list chunk - """ - for i in range(0, len(l), n): - yield l[i:i + n] - -def setMasterLogger(logsDir, name='master'): - """ Set the logger for the master process. The file used for it is logs/processes/proc.name.txt and it - can be retrieved with logging.getLogger(name) in other parts of the code - """ - logger = logging.getLogger(name) - fileName = os.path.join(logsDir, 'processes', "proc.c3id_%s.pid_%s.txt" % (name, os.getpid())) - handler = TimedRotatingFileHandler(fileName, 'midnight', backupCount=30) - formatter = logging.Formatter("%(asctime)s:%(levelname)s:"+name+":%(message)s") - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger - -def setSlaveLogger(logsDir, name): - """ Set the logger for a single slave process. The file used for it is logs/processes/proc.name.txt and it - can be retrieved with logging.getLogger(name) in other parts of the code - """ - logger = logging.getLogger(name) - fileName = os.path.join(logsDir, 'processes', "proc.c3id_%s.txt" % name) - #handler = TimedRotatingFileHandler(fileName, 'midnight', backupCount=30) - # slaves are short lived, use one log file for each - handler = FileHandler(fileName) - formatter = logging.Formatter("%(asctime)s:%(levelname)s:"+"slave"+":%(message)s") - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger - - -class Master(object): +from TaskWorker.WorkerUtilities import getCrabserver + +from PublisherUtils import createLogdir, setRootLogger, setSlaveLogger, logVersionAndConfig +from PublisherUtils import getInfoFromFMD + + +class Master(): # pylint: disable=too-many-instance-attributes """I am the main daemon kicking off all Publisher work via slave Publishers""" def __init__(self, confFile=None, quiet=False, debug=True, testMode=False): @@ -88,75 +44,6 @@ def __init__(self, confFile=None, quiet=False, debug=True, testMode=False): :arg bool testMode: it tells if to run in test (no subprocesses) mode. """ - def createLogdir(dirname): - """ Create the directory dirname ignoring erors in case it exists. Exit if - the directory cannot be created. - """ - try: - os.makedirs(dirname) - except OSError as ose: - if ose.errno != 17: #ignore the "Directory already exists error" - print(str(ose)) - print("The Publisher Worker needs to access the '%s' directory" % dirname) - sys.exit(1) - - def setRootLogger(logsDir, quiet=False, debug=True, console=False): - """Sets the root logger with the desired verbosity level - The root logger logs to logs/log.txt and every single - logging instruction is propagated to it (not really nice - to read) - - :arg bool quiet: it tells if a quiet logger is needed - :arg bool debug: it tells if needs a verbose logger - :arg bool console: it tells if to direct all printoput to console rather then files, useful for debug - :return logger: a logger with the appropriate logger level.""" - - createLogdir(logsDir) - createLogdir(os.path.join(logsDir, 'processes')) - createLogdir(os.path.join(logsDir, 'tasks')) - - if console: - # if we are testing log to the console is easier - logging.getLogger().addHandler(logging.StreamHandler()) - else: - logHandler = MultiProcessingLog(os.path.join(logsDir, 'log.txt'), when='midnight') - logFormatter = logging.Formatter("%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s") - logHandler.setFormatter(logFormatter) - logging.getLogger().addHandler(logHandler) - loglevel = logging.INFO - if quiet: - loglevel = logging.WARNING - if debug: - loglevel = logging.DEBUG - logging.getLogger().setLevel(loglevel) - logger = setMasterLogger(logsDir) - logger.debug("PID %s.", os.getpid()) - logger.debug("Logging level initialized to %s.", loglevel) - return logger - - def logVersionAndConfig(config=None, logger=None): - """ - log version number and major config. parameters - args: config : a configuration object loaded from file - args: logger : the logger instance to use - """ - pubstartDict = {} - pubstartDict['version'] = __version__ - pubstartDict['asoworker'] = config.General.asoworker - pubstartDict['instance'] = config.General.instance - if config.General.instance == 'other': - pubstartDict['restHost'] = config.General.restHost - pubstartDict['dbInstance'] = config.General.dbInstance - pubstartDict['max_slaves'] = config.General.max_slaves - pubstartDict['DBShost'] = config.TaskPublisher.DBShost - pubstartDict['dryRun'] = config.TaskPublisher.dryRun - # one line for automatic parsing - logger.info('PUBSTART: %s', json.dumps(pubstartDict)) - # multiple lines for humans to read - for k, v in pubstartDict.items(): - logger.info('%s: %s', k, v) - return - self.configurationFile = confFile # remember this, will have to pass it to TaskPublish config = loadConfigurationFile(confFile) self.config = config.General @@ -179,45 +66,12 @@ def logVersionAndConfig(config=None, logger=None): createLogdir(self.blackListedTaskDir) self.logger = setRootLogger(self.config.logsDir, quiet=quiet, debug=debug, console=self.TestMode) - logVersionAndConfig(config, self.logger) - from WMCore.Credential.Proxy import Proxy - proxy = Proxy({'logger':self.logger}) - from ServerUtilities import tempSetLogLevel - with tempSetLogLevel(self.logger, logging.ERROR): - self.myDN = proxy.getSubjectFromCert(certFile=self.config.serviceCert) - - try: - instance = self.config.instance - except: - msg = "No instance provided: need to specify config.General.instance in the configuration" - raise ConfigException(msg) - - if instance in SERVICE_INSTANCES: - self.logger.info('Will connect to CRAB service: %s', instance) - restHost = SERVICE_INSTANCES[instance]['restHost'] - dbInstance = SERVICE_INSTANCES[instance]['dbInstance'] - else: - msg = "Invalid instance value '%s'" % instance - raise ConfigException(msg) - if instance == 'other': - self.logger.info('Will use restHost and dbInstance from config file') - try: - restHost = self.config.restHost - dbInstance = self.config.dbInstance - except: - msg = "Need to specify config.General.restHost and dbInstance in the configuration" - raise ConfigException(msg) + # CRAB REST API + self.crabServer = getCrabserver(restConfig=config.REST, agentName='CRABPublisher', logger=self.logger) - self.logger.info('Will connect to CRAB Data Base %s instance via URL: https://%s', dbInstance, restHost) - - # CRAB REST API's self.max_files_per_block = self.config.max_files_per_block - self.crabServer = CRABRest(hostname=restHost, localcert=self.config.serviceCert, - localkey=self.config.serviceKey, retry=3, - userAgent='CRABPublisher') - self.crabServer.setDbInstance(dbInstance=dbInstance) self.startTime = time.time() # tasks which are too loarge for us to deal with are @@ -252,12 +106,12 @@ def active_tasks(self, crabServer): data = encodeRequest(fileDoc) try: result = crabServer.post(api='filetransfers', data=data) # pylint: disable=unused-variable - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except self.logger.error("Failed to acquire publications from crabserver: %s", ex) return [] self.logger.debug("Retrieving max.100000 acquired publications from oracleDB") - fileDoc = dict() + fileDoc = {} fileDoc['asoworker'] = asoworker fileDoc['subresource'] = 'acquiredPublication' fileDoc['grouping'] = 0 @@ -265,80 +119,27 @@ def active_tasks(self, crabServer): data = encodeRequest(fileDoc) try: results = crabServer.get(api='filetransfers', data=data) - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except self.logger.error("Failed to acquire publications from crabserver: %s", ex) return [] files = oracleOutputMapping(results) self.logger.info("%s acquired publications retrieved for asoworker %s", len(files), asoworker) filesToPublish.extend(files) - - # TODO: join query for publisher (same of submitter) + # TO DO: join query for publisher (same of submitter) unique_tasks = [list(i) for i in set(tuple([x['username'], x['user_group'], x['user_role'], x['taskname']] - ) for x in filesToPublish if x['transfer_state'] == 3)] + ) for x in filesToPublish if x['transfer_state'] == 3)] info = [] for task in unique_tasks: info.append([x for x in filesToPublish if x['taskname'] == task[3]]) return list(zip(unique_tasks, info)) - def getPublDescFiles(self, workflow, lfn_ready, logger): - """ - Download and read the files describing what needs to be published - do on a small number of LFNs at a time (numFilesAtOneTime) to avoid - hitting the URL length limit in CMSWEB/Apache - """ - out = [] - metadataList = [] - dataDict = {} - dataDict['taskname'] = workflow - i = 0 - numFilesAtOneTime = 10 - logger.debug('FMDATA: will retrieve data for %d files', len(lfn_ready)) - while i < len(lfn_ready): - dataDict['lfnList'] = lfn_ready[i:i+numFilesAtOneTime] - data = encodeRequest(dataDict) - i += numFilesAtOneTime - try: - t1 = time.time() - res = self.crabServer.get(api='filemetadata', data=data) - # res is a 3-plu: (result, exit code, status) - res = res[0] - t2 = time.time() - elapsed = int(t2-t1) - fmdata = int(len(str(res))/1e6) # convert dict. to string to get actual length in HTTP call - logger.debug('FMDATA: retrieved data for %d files', len(res['result'])) - logger.debug('FMDATA: retrieved: %d MB in %d sec for %s', fmdata, elapsed, workflow) - if elapsed > 60 and fmdata > 100: # more than 1 minute and more than 100MB - self.taskBlackList.append(workflow) # notify this slave - filepath = Path(os.path.join(self.blackListedTaskDir, workflow)) - filepath.touch() # notify other slaves - logger.debug('++++++++ BLACKLIST2 TASK %s ++', workflow) - except Exception as ex: - t2 = time.time() - elapsed = int(t2-t1) - logger.error("Error during metadata2 retrieving from crabserver:\n%s", ex) - if elapsed > 290: - logger.debug('FMDATA gave error after > 290 secs. Most likely it timed out') - self.taskBlackList.append(workflow) # notify this slave - filepath = Path(os.path.join(self.blackListedTaskDir, workflow)) - filepath.touch() # notify other slaves - logger.debug('++++++++ BLACKLIST TASK %s ++', workflow) - return [] - metadataList = [json.loads(md) for md in res['result']] # CRAB REST returns a list of JSON objects - for md in metadataList: - out.append(md) - - logger.info('Got filemetadata for %d LFNs', len(out)) - # sort the list by jobId, makes it easier to compare https://stackoverflow.com/a/73050 - # sort by jobid as strings w/o converting to int becasue of https://github.com/dmwm/CRABServer/issues/7246 - sortedOut = sorted(out, key=lambda md: md['jobid']) - return sortedOut - def getTaskStatusFromSched(self, workflow, logger): + """ find task status (need to know if it is terminal) """ def translateStatus(statusToTr): """Translate from DAGMan internal integer status to a string. @@ -347,7 +148,7 @@ def translateStatus(statusToTr): user killing the task. See: https://htcondor.readthedocs.io/en/latest/users-manual/dagman-workflows.html#capturing-the-status-of-nodes-in-a-file """ - status = {0:'PENDING', 1: 'SUBMITTED', 2: 'SUBMITTED', 3: 'SUBMITTED', + status = {0: 'PENDING', 1: 'SUBMITTED', 2: 'SUBMITTED', 3: 'SUBMITTED', 4: 'SUBMITTED', 5: 'COMPLETED', 6: 'FAILED'}[statusToTr] return status @@ -391,7 +192,7 @@ def check_queued(statusOrSUBMITTED): return check_queued(translateStatus(subDagInfos[0]['DagStatus'])) return check_queued(translateStatus(dagInfo['DagStatus'])) - crabDBInfo, _, _ = self.crabServer.get(api='task', data={'subresource':'search', 'workflow':workflow}) + crabDBInfo, _, _ = self.crabServer.get(api='task', data={'subresource': 'search', 'workflow': workflow}) dbStatus = getColumn(crabDBInfo, 'tm_task_status') if dbStatus == 'KILLED': return 'KILLED' @@ -401,11 +202,11 @@ def check_queued(statusOrSUBMITTED): url = proxiedWebDir + "/status_cache.pkl" # this host is dummy since we will pass full url to downloadFile but WMCore.Requests needs it host = 'https://cmsweb.cern.ch' - cdict = {'cert':self.config.serviceCert, 'key':self.config.serviceKey} + cdict = {'cert': self.config.serviceCert, 'key': self.config.serviceKey} req = Requests(url=host, idict=cdict) _, ret = req.downloadFile(local_status_cache_pkl, url) - if not ret.status == 200: - raise Exception('download attempt returned HTTP code %d' % ret.status) + if ret.status != 200: + raise Exception(f"download attempt returned HTTP code {ret.status}") with open(local_status_cache_pkl, 'rb') as fp: statusCache = pickle.load(fp) os.close(local_status_cache_fd) @@ -417,7 +218,7 @@ def check_queued(statusOrSUBMITTED): status = dagStatus return status - def algorithm(self): + def algorithm(self): # pylint: disable=too-many-branches """ 1. Get a list of files to publish from the REST and organize by taskname 2. For each taks get a suitably sized input for publish @@ -444,11 +245,11 @@ def algorithm(self): for task in tasks: taskName = task[0][3] acquiredFiles = len(task[1]) - flag = ' OK' if acquiredFiles < 1000 else 'WARN' # mark suspicious tasks + flag = ' OK' if acquiredFiles < 1000 else 'WARN' # mark suspicious tasks self.logger.info('acquired_files: %s %5d : %s', flag, acquiredFiles, taskName) processes = [] - try: + try: # pylint: disable=too-many-nested-blocks for task in tasks: taskname = str(task[0][3]) # this IF is for testing on preprod or dev DB's, which are full of old unpublished tasks @@ -462,39 +263,39 @@ def algorithm(self): if self.TestMode: self.startSlave(task) # sequentially do one task after another continue - else: # deal with each task in a separate process - p = Process(target=self.startSlave, args=(task,)) - p.start() - self.logger.info('Starting process %s pid=%s', p, p.pid) - self.logger.info('PID %s will work on task %s', p.pid, taskname) - processes.append(p) + # else deal with each task in a separate process + p = Process(target=self.startSlave, args=(task,)) + p.start() + self.logger.info('Starting process %s pid=%s', p, p.pid) + self.logger.info('PID %s will work on task %s', p.pid, taskname) + processes.append(p) if len(processes) == maxSlaves: while len(processes) == maxSlaves: # wait until one process has completed time.sleep(10) - for proc in processes: + for proc in processes.copy(): if not proc.is_alive(): self.logger.info('Terminated: %s pid=%s', proc, proc.pid) processes.remove(proc) - except Exception: + except Exception: # pylint: disable=broad-except self.logger.exception("Error during process mapping") self.logger.info('No more tasks to care for. Wait for remaining %d processes to terminate', len(processes)) - while (processes): + while processes: time.sleep(10) - for proc in processes: + for proc in processes.copy(): if not proc.is_alive(): self.logger.info('Terminated: %s pid=%s', proc, proc.pid) processes.remove(proc) self.logger.info("Algorithm iteration completed") self.logger.info("Wait %d sec for next cycle", self.pollInterval()) - newStartTime = time.strftime("%H:%M:%S", time.localtime(time.time()+self.pollInterval())) + newStartTime = time.strftime("%H:%M:%S", time.localtime(time.time() + self.pollInterval())) # BEWARE: any change to message in next line needs to be synchronized with # a change in Publisher/stop.sh otherwise that script will break self.logger.info("Next cycle will start at %s", newStartTime) - def startSlave(self, task): + def startSlave(self, task): # pylint: disable=too-many-branches, too-many-locals, too-many-statements """ start a slave process to deal with publication for a single task :param task: one tupla describing a task as returned by active_tasks() @@ -512,17 +313,17 @@ def startSlave(self, task): if len(task[1]) > self.max_files_per_block: self.force_publication = True - msg = "All datasets have more than %s ready files." % (self.max_files_per_block) + msg = f"All datasets have more than {self.max_files_per_block} ready files." msg += " No need to retrieve task status nor last publication time." logger.info(msg) else: - msg = "At least one dataset has less than %s ready files. Retrieve task status" % (self.max_files_per_block) + msg = f"At least one dataset has less than {self.max_files_per_block} ready files. Retrieve task status" logger.info(msg) try: workflow_status = self.getTaskStatusFromSched(workflow, logger) - except Exception as ex: - logger.warn('Error retrieving status cache from sched for %s:\n%s', workflow, str(ex)) - logger.warn('Assuming COMPLETED in order to force pending publications if any') + except Exception as ex: # pylint: disable=broad-except + logger.warning('Error retrieving status cache from sched for %s:\n%s', workflow, str(ex)) + logger.warning('Assuming COMPLETED in order to force pending publications if any') workflow_status = 'COMPLETED' logger.info('Task status from DAG info: %s', workflow_status) # If the workflow status is terminal, go ahead and publish all the ready files @@ -534,26 +335,25 @@ def startSlave(self, task): msg = "Considering task status as terminal. Will force publication." logger.info(msg) # Otherwise... - else: ## TODO put this else in a function like def checkForPublication() + else: # TO DO put this else in a function like def checkForPublication() msg = "Task status is not considered terminal. Will check last publication time." logger.info(msg) # Get when was the last time a publication was done for this workflow (this # should be more or less independent of the output dataset in case there are # more than one). last_publication_time = None - data = encodeRequest({'workflow':workflow, 'subresource':'search'}) + data = encodeRequest({'workflow': workflow, 'subresource': 'search'}) try: result = self.crabServer.get(api='task', data=data) - #logger.debug("task: %s ", str(result[0])) last_publication_time = getColumn(result[0], 'tm_last_publication') - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except logger.error("Error during task info retrieving:\n%s", ex) if last_publication_time: - date = last_publication_time # datetime in Oracle format - timetuple = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f").timetuple() # convert to time tuple - last_publication_time = time.mktime(timetuple) # convert to seconds since Epoch (float) + date = last_publication_time # datetime in Oracle format + timetuple = datetime.strptime(date, "%Y-%m-%d %H:%M:%S.%f").timetuple() + last_publication_time = time.mktime(timetuple) # convert to seconds since Epoch (float) - msg = "Last publication time: %s." % str(last_publication_time) + msg = f"Last publication time: {last_publication_time}." logger.debug(msg) # If this is the first time a publication would be done for this workflow, go # ahead and publish. @@ -564,23 +364,23 @@ def startSlave(self, task): # Otherwise... else: last = last_publication_time - msg = "Last published block time: %s" % last + msg = f"Last published block time: {last}" logger.debug(msg) # If the last publication was long time ago (> our block publication timeout), # go ahead and publish. now = int(time.time()) - time.timezone time_since_last_publication = now - last - hours = int(time_since_last_publication/60/60) - minutes = int((time_since_last_publication - hours*60*60)/60) - timeout_hours = int(self.block_publication_timeout/60/60) - timeout_minutes = int((self.block_publication_timeout - timeout_hours*60*60)/60) - msg = "Last publication was %sh:%sm ago" % (hours, minutes) + hours = int(time_since_last_publication / 60 / 60) + minutes = int((time_since_last_publication - hours * 60 * 60) / 60) + timeout_hours = int(self.block_publication_timeout / 60 / 60) + timeout_minutes = int((self.block_publication_timeout - timeout_hours * 60 * 60) / 60) + msg = f"Last publication was {hours}h:{minutes}m ago" if time_since_last_publication > self.block_publication_timeout: self.force_publication = True - msg += " (more than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes) + msg += f" (more than the timeout of {timeout_hours}h:{timeout_minutes}m)." msg += " Will force publication." else: - msg += " (less than the timeout of %sh:%sm)." % (timeout_hours, timeout_minutes) + msg += F" (less than the timeout of {timeout_hours}h:{timeout_minutes}m)." msg += " Not enough to force publication." logger.info(msg) @@ -598,7 +398,7 @@ def startSlave(self, task): x['input_dataset'], x['dbs_url'], x['last_update'] - ]} + ]} for x in task[1] if x['transfer_state'] == 3 and x['publication_state'] not in [2, 3, 5]] lfn_ready = [] @@ -627,7 +427,14 @@ def startSlave(self, task): workflow) else: # retrieve information from FileMetadata - publDescFiles_list = self.getPublDescFiles(workflow, lfn_ready, logger) + (publDescFiles_list, blackList) = getInfoFromFMD( + crabServer=self.crabServer, taskname=workflow, lfns=lfn_ready, logger=logger) + if blackList: + self.taskBlackList.append(workflow) # notify this slave + filepath = Path(os.path.join(self.blackListedTaskDir, workflow)) + filepath.touch() # notify other slaves + logger.debug('++++++++ BLACKLIST TASK %s ++', workflow) + # now combine the info from FMD with the info from transfersdb (in active_) # to create the JSON files (one per task) to be put in Publisher_Files and # used by TaskPublish.py as input @@ -639,9 +446,9 @@ def startSlave(self, task): # 'swversion', 'inevents', 'globaltag', 'publishname', 'location', # 'tmplocation', 'runlumi', # 'adler32', 'cksum', 'md5', 'lfn'(*), 'filesize', - # 'parents', 'state', 'created', 'tmplfn', 'User'(*), 'Group'(*), - # 'Role'(*), 'UserDN'(*), 'Destination'(*), 'SourceLFN(*)' - # Clearly `taskname`, 'acquisitionera', 'swversion', 'globaltag', 'UserDN' are common, + # 'parents', 'state', 'created', 'tmplfn', 'User'(*) + # 'Destination'(*), 'SourceLFN(*)' + # Clearly `taskname`, 'acquisitionera', 'swversion', 'globaltag' are common, # others could be different from one file to another (even if we do not support multiple # outputdataset at this moment @@ -655,9 +462,6 @@ def startSlave(self, task): # logger.info(doc) if doc["lfn"] == file_["value"][2]: doc["User"] = username - doc["Group"] = file_["key"][1] - doc["Role"] = file_["key"][2] - doc["UserDN"] = self.myDN doc["Destination"] = file_["value"][0] doc["SourceLFN"] = file_["value"][1] toPublish.append(doc) @@ -667,19 +471,18 @@ def startSlave(self, task): # at same files over and over if not metadataFound: toFail.append(file_["value"][1]) - with open(self.taskFilesDir + workflow + '.json', 'w') as outfile: + with open(self.taskFilesDir + workflow + '.json', 'w', encoding='utf-8') as outfile: json.dump(toPublish, outfile) logger.debug('Unitarity check: active_:%d toPublish:%d toFail:%d', len(active_), len(toPublish), len(toFail)) if len(toPublish) + len(toFail) != len(active_): logger.error("SOMETHING WRONG IN toPublish vs toFail !!") if toFail: logger.info('Did not find useful metadata for %d files. Mark as failed', len(toFail)) - from ServerUtilities import getHashLfn nMarked = 0 for lfn in toFail: source_lfn = lfn docId = getHashLfn(source_lfn) - data = dict() + data = {} data['asoworker'] = self.config.asoworker data['subresource'] = 'updatePublication' data['list_of_ids'] = docId @@ -688,21 +491,19 @@ def startSlave(self, task): data['list_of_failure_reason'] = 'File type not EDM or metadata not found' try: result = self.crabServer.post(api='filetransfers', data=encodeRequest(data)) - #logger.debug("updated DocumentId: %s lfn: %s Result %s", docId, source_lfn, result) - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except logger.error("Error updating status for DocumentId: %s lfn: %s", docId, source_lfn) logger.error("Error reason: %s", ex) nMarked += 1 - #if nMarked % 10 == 0: logger.info('marked %d files as Failed', nMarked) # find the location in the current environment of the script we want to run - import Publisher.TaskPublish as tp + import Publisher.TaskPublish as tp # pylint: disable=import-outside-toplevel taskPublishScript = tp.__file__ - cmd = "python3 %s " % taskPublishScript - cmd += " --configFile=%s" % self.configurationFile - cmd += " --taskname=%s" % workflow + cmd = f"python3 {taskPublishScript} " + cmd += f" --configFile={self.configurationFile}" + cmd += f" --taskname={workflow}" if self.TPconfig.dryRun: cmd += " --dry" logger.info("Now execute: %s", cmd) @@ -710,11 +511,10 @@ def startSlave(self, task): if exitcode != 0: errorMsg = f"Failed to execute command: {cmd}.\n StdErr:\n {stderr}\n" raise Exception(errorMsg) - else: - logger.info('TaskPublishScript done : %s', stdout) + logger.info('TaskPublishScript done : %s', stdout) jsonSummary = stdout.split()[-1] - with open(jsonSummary, 'r') as fd: + with open(jsonSummary, 'r', encoding='utf-8') as fd: summary = json.load(fd) result = summary['result'] reason = summary['reason'] @@ -724,10 +524,10 @@ def startSlave(self, task): if reason == 'NOTHING TO DO': logger.info('Taskname %s is OK. Nothing to do', taskname) else: - msg = 'Taskname %s is OK. Published %d files in %d blocks.' % \ - (taskname, summary['publishedFiles'], summary['publishedBlocks']) + msg = f"Taskname {taskname} is OK. Published {summary['publishedFiles']} " + msg += f"in {summary['publishedBlocks']} blocks." if summary['nextIterFiles']: - msg += ' %d files left for next iteration.' % summary['nextIterFiles'] + msg += f" {summary['nextIterFiles']} files left for next iteration." logger.info(msg) if result == 'FAIL': logger.error('Taskname %s : TaskPublish failed with: %s', taskname, reason) @@ -736,7 +536,7 @@ def startSlave(self, task): taskname, summary['failedBlocks'], summary['failedFiles']) logger.error('Taskname %s : Failed block(s) details have been saved in %s', taskname, summary['failedBlockDumps']) - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except logger.exception("Exception when calling TaskPublish!\n%s", str(ex)) return 0 @@ -760,7 +560,7 @@ def pollInterval(self): parser.add_argument('--config', help='Publisher config file', default='PublisherConfig.py') args = parser.parse_args() - #need to pass the configuration file path to the slaves + # need to pass the configuration file path to the slaves configurationFile = os.path.abspath(args.config) master = Master(confFile=configurationFile) diff --git a/src/python/Publisher/PublisherMasterRucio.py b/src/python/Publisher/PublisherMasterRucio.py index 653d421a17..19002996d9 100644 --- a/src/python/Publisher/PublisherMasterRucio.py +++ b/src/python/Publisher/PublisherMasterRucio.py @@ -1,4 +1,4 @@ -# pylint: disable=C0103, W0703, R0912, R0914, R0915 +# pylint: disable=invalid-name # have a lot of snake_case varaibles here from "old times" """ Here's the algorithm @@ -9,66 +9,21 @@ 4. spawn a process per task that publish their files """ -from __future__ import division -from __future__ import print_function import argparse -import logging -from logging import FileHandler -from logging.handlers import TimedRotatingFileHandler import os -import sys import json import time from pathlib import Path from multiprocessing import Process -from MultiProcessingLog import MultiProcessingLog from WMCore.Configuration import loadConfigurationFile from ServerUtilities import encodeRequest, oracleOutputMapping, executeCommand -from ServerUtilities import getHashLfn -from TaskWorker import __version__ from TaskWorker.WorkerUtilities import getCrabserver - -def chunks(aList, n): - """ - Yield successive n-sized chunks from aList. - :param aList: list to split in chunks - :param n: chunk size - :return: yield the next list chunk - """ - for i in range(0, len(aList), n): - yield aList[i:i + n] - - -def setMasterLogger(logsDir, name='master'): - """ Set the logger for the master process. The file used for it is logs/processes/proc.name.txt and it - can be retrieved with logging.getLogger(name) in other parts of the code - """ - logger = logging.getLogger(name) - fileName = os.path.join(logsDir, 'processes', f"proc.c3id_{name}.pid_{os.getpid()}.txt") - handler = TimedRotatingFileHandler(fileName, 'midnight', backupCount=30) - formatter = logging.Formatter("%(asctime)s:%(levelname)s:" + name + ":%(message)s") - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger - - -def setSlaveLogger(logsDir, name): - """ Set the logger for a single slave process. The file used for it is logs/processes/proc.name.txt and it - can be retrieved with logging.getLogger(name) in other parts of the code - """ - logger = logging.getLogger(name) - fileName = os.path.join(logsDir, 'processes', f"proc.c3id_{name}.txt") - # handler = TimedRotatingFileHandler(fileName, 'midnight', backupCount=30) - # slaves are short lived, use one log file for each - handler = FileHandler(fileName) - formatter = logging.Formatter("%(asctime)s:%(levelname)s:" + "slave" + ":%(message)s") - handler.setFormatter(formatter) - logger.addHandler(handler) - return logger +from PublisherUtils import createLogdir, setRootLogger, setSlaveLogger, logVersionAndConfig +from PublisherUtils import getInfoFromFMD, markFailed class Master(): # pylint: disable=too-many-instance-attributes @@ -84,74 +39,6 @@ def __init__(self, confFile=None, quiet=False, debug=True, testMode=False): :arg bool testMode: it tells if to run in test (no subprocesses) mode. """ - def createLogdir(dirname): - """ Create the directory dirname ignoring erors in case it exists. Exit if - the directory cannot be created. - """ - try: - os.makedirs(dirname) - except OSError as ose: - if ose.errno != 17: # ignore the "Directory already exists error" - print(str(ose)) - print(f"The Publisher Worker needs to access the '{dirname}' directory") - sys.exit(1) - - def setRootLogger(logsDir, quiet=False, debug=True, console=False): - """Sets the root logger with the desired verbosity level - The root logger logs to logs/log.txt and every single - logging instruction is propagated to it (not really nice - to read) - - :arg bool quiet: it tells if a quiet logger is needed - :arg bool debug: it tells if needs a verbose logger - :arg bool console: it tells if to direct all printoput to console rather then files, useful for debug - :return logger: a logger with the appropriate logger level.""" - - createLogdir(logsDir) - createLogdir(os.path.join(logsDir, 'processes')) - createLogdir(os.path.join(logsDir, 'tasks')) - - if console: - # if we are testing log to the console is easier - logging.getLogger().addHandler(logging.StreamHandler()) - else: - logHandler = MultiProcessingLog(os.path.join(logsDir, 'log.txt'), when='midnight') - logFormatter = logging.Formatter("%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s") - logHandler.setFormatter(logFormatter) - logging.getLogger().addHandler(logHandler) - loglevel = logging.INFO - if quiet: - loglevel = logging.WARNING - if debug: - loglevel = logging.DEBUG - logging.getLogger().setLevel(loglevel) - logger = setMasterLogger(logsDir) - logger.debug("PID %s.", os.getpid()) - logger.debug("Logging level initialized to %s.", loglevel) - return logger - - def logVersionAndConfig(config=None, logger=None): - """ - log version number and major config. parameters - args: config : a configuration object loaded from file - args: logger : the logger instance to use - """ - pubstartDict = {} - pubstartDict['version'] = __version__ - pubstartDict['asoworker'] = config.General.asoworker - pubstartDict['instance'] = config.General.instance - if config.General.instance == 'other': - pubstartDict['restHost'] = config.General.restHost - pubstartDict['dbInstance'] = config.General.dbInstance - pubstartDict['max_slaves'] = config.General.max_slaves - pubstartDict['DBShost'] = config.TaskPublisher.DBShost - pubstartDict['dryRun'] = config.TaskPublisher.dryRun - # one line for automatic parsing - logger.info('PUBSTART: %s', json.dumps(pubstartDict)) - # multiple lines for humans to read - for k, v in pubstartDict.items(): - logger.info('%s: %s', k, v) - self.configurationFile = confFile # remember this, will have to pass it to TaskPublish config = loadConfigurationFile(confFile) self.config = config.General @@ -171,8 +58,8 @@ def logVersionAndConfig(config=None, logger=None): createLogdir(os.path.join(self.taskFilesDir, 'FailedBlocks')) # need a persistent place on disk to communicate among slaves self.blackListedTaskDir = os.path.join(self.taskFilesDir, 'BlackListedTasks') - createLogdir(self.blackListedTaskDir) + self.logger = setRootLogger(self.config.logsDir, quiet=quiet, debug=debug, console=self.TestMode) logVersionAndConfig(config, self.logger) @@ -210,7 +97,7 @@ def active_tasks(self, crabServer): try: # select files with transfer DONE and publication NEW and set publication to ACQUIRED result = crabServer.post(api='filetransfers', data=data) # pylint: disable=unused-variable - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except self.logger.error("Failed to acquire publications from crabserver: %s", ex) return [] @@ -223,7 +110,7 @@ def active_tasks(self, crabServer): data = encodeRequest(fileDoc) try: results = crabServer.get(api='filetransfers', data=data) - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except self.logger.error("Failed to acquire publications from crabserver: %s", ex) return [] files = oracleOutputMapping(results) @@ -246,57 +133,6 @@ def active_tasks(self, crabServer): taskList.append(taskDict) return taskList - def getInfoFromFMD(self, workflow, lfns, logger): - """ - Download and read the files describing what needs to be published - do on a small number of LFNs at a time (numFilesAtOneTime) to avoid - hitting the URL length limit in CMSWEB/Apache - input: lfns : a list of LFNs - returns: a list of dictionaries, one per file, sorted by CRAB JobID - """ - out = [] - dataDict = {} - dataDict['taskname'] = workflow - i = 0 - numFilesAtOneTime = 10 - logger.debug('FMDATA: will retrieve data for %d files', len(lfns)) - while i < len(lfns): - dataDict['lfnList'] = lfns[i: i + numFilesAtOneTime] - data = encodeRequest(dataDict) - i += numFilesAtOneTime - try: - t1 = time.time() - res = self.crabServer.get(api='filemetadata', data=data) - # res is a 3-plu: (result, exit code, status) - res = res[0] - t2 = time.time() - elapsed = int(t2 - t1) - fmdata = int(len(str(res)) / 1e6) # convert dict. to string to get actual length in HTTP call - logger.debug('FMDATA: retrieved data for %d files', len(res['result'])) - logger.debug('FMDATA: retrieved: %d MB in %d sec for %s', fmdata, elapsed, workflow) - if elapsed > 60 and fmdata > 100: # more than 1 minute and more than 100MB - self.taskBlackList.append(workflow) # notify this slave - filepath = Path(os.path.join(self.blackListedTaskDir, workflow)) - filepath.touch() # notify other slaves - logger.debug('++++++++ BLACKLIST2 TASK %s ++', workflow) - except Exception as ex: - t2 = time.time() - elapsed = int(t2 - t1) - logger.error("Error during metadata2 retrieving from crabserver:\n%s", ex) - if elapsed > 290: - logger.debug('FMDATA gave error after > 290 secs. Most likely it timed out') - self.taskBlackList.append(workflow) # notify this slave - filepath = Path(os.path.join(self.blackListedTaskDir, workflow)) - filepath.touch() # notify other slaves - logger.debug('++++++++ BLACKLIST TASK %s ++', workflow) - return [] - metadataList = [json.loads(md) for md in res['result']] # CRAB REST returns a list of JSON objects - for md in metadataList: - out.append(md) - - logger.info('Got filemetadata for %d LFNs', len(out)) - return out - def runTaskPublish(self, workflow, logger): """ forks a process which will run TaskPublishRucio.py @@ -342,7 +178,7 @@ def runTaskPublish(self, workflow, logger): logger.error('Taskname %s : Failed block(s) details have been saved in %s', taskname, summary['failedBlockDumps']) - def algorithm(self): + def algorithm(self): # pylint: disable=too-many-branches """ 1. Get a list of files to publish from the REST and organize by taskname 2. For each taks get a suitably sized input for publish @@ -397,20 +233,20 @@ def algorithm(self): while len(processes) == maxSlaves: # wait until one process has completed time.sleep(10) - for proc in processes: + for proc in processes.copy(): if not proc.is_alive(): self.logger.info('Terminated: %s pid=%s', proc, proc.pid) - processes.remove(proc) # pylint: disable=modified-iterating-list + processes.remove(proc) - except Exception: + except Exception: # pylint: disable=broad-except self.logger.exception("Error during process mapping") self.logger.info('No more tasks to care for. Wait for remaining %d processes to terminate', len(processes)) while processes: time.sleep(10) - for proc in processes: + for proc in processes.copy(): if not proc.is_alive(): self.logger.info('Terminated: %s pid=%s', proc, proc.pid) - processes.remove(proc) # pylint: disable=modified-iterating-list + processes.remove(proc) self.logger.info("Algorithm iteration completed") self.logger.info("Wait %d sec for next cycle", self.pollInterval()) @@ -419,7 +255,7 @@ def algorithm(self): # a change in Publisher/stop.sh otherwise that script will break self.logger.info("Next cycle will start at %s", newStartTime) - def startSlave(self, task): + def startSlave(self, task): # pylint: disable=too-many-branches """ start a slave process to deal with publication for a single task :param task: one tuple describing a task as returned by active_tasks() @@ -464,11 +300,13 @@ def startSlave(self, task): self.markAsFailed(lfns=lfnsToPublish, reason='Blacklisted Task') return 0 # get filemetadata info for all files which needs to be published - infoFMD = self.getInfoFromFMD(workflow, lfnsToPublish, logger) - # sort the list by lfn, makes it easier to compare https://stackoverflow.com/a/73050 - filesInfoFromFMD = sorted(infoFMD, key=lambda md: md['lfn']) - - print(filesInfoFromFMD[0]) + (filesInfoFromFMD, blackList) = getInfoFromFMD( + crabServer=self.crabServer, taskname=workflow, lfns=lfnsToPublish, logger=logger) + if blackList: + self.taskBlackList.append(workflow) # notify this slave + filepath = Path(os.path.join(self.blackListedTaskDir, workflow)) + filepath.touch() # notify other slaves + logger.debug('++++++++ BLACKLIST TASK %s ++', workflow) blockDictsToPublish = [] # initialise the structure which will be dumped as JSON toPublish = [] @@ -521,34 +359,17 @@ def startSlave(self, task): # call taskPublishRucio self.runTaskPublish(workflow, logger) - except Exception as ex: + except Exception as ex: # pylint: disable=broad-except logger.exception("Exception when calling TaskPublish!\n%s", str(ex)) return 0 def markAsFailed(self, lfns=None, reason=None): """ - could this be replaced by PublisherUtils/markFailed ? + handy wrapper for PublisherUtils/markFailed """ - nMarked = 0 - for lfn in lfns: - source_lfn = lfn - docId = getHashLfn(source_lfn) - data = {} - data['asoworker'] = self.config.asoworker - data['subresource'] = 'updatePublication' - data['list_of_ids'] = docId - data['list_of_publication_state'] = 'FAILED' - data['list_of_retry_value'] = 1 - data['list_of_failure_reason'] = reason - try: - self.crabServer.post(api='filetransfers', data=encodeRequest(data)) - # if nMarked % 10 == 0: - # logger.debug("updated DocumentId: %s lfn: %s Result %s", docId, source_lfn, result) - except Exception as ex: - self.logger.error("Error updating status for DocumentId: %s lfn: %s", docId, source_lfn) - self.logger.error("Error reason: %s", ex) - nMarked += 1 + nMarked = markFailed(files=lfns, crabserver=self.crabserver, failureReason=reason, + asoworker=self.config.asoworker, logger=self.logger) return nMarked def pollInterval(self): diff --git a/src/python/Publisher/PublisherUtils.py b/src/python/Publisher/PublisherUtils.py index 53f6a40ed9..8b186d9d20 100644 --- a/src/python/Publisher/PublisherUtils.py +++ b/src/python/Publisher/PublisherUtils.py @@ -6,11 +6,16 @@ import os import sys +import time import logging +from logging import FileHandler +from logging.handlers import TimedRotatingFileHandler import json +from MultiProcessingLog import MultiProcessingLog from ServerUtilities import getHashLfn, encodeRequest +from TaskWorker import __version__ def createLogdir(dirname): @@ -58,6 +63,91 @@ def setupLogging(config, taskname, verbose, console): return log +def setMasterLogger(logsDir, name='master'): + """ Set the logger for the master process. The file used for it is logs/processes/proc.name.txt and it + can be retrieved with logging.getLogger(name) in other parts of the code + """ + logger = logging.getLogger(name) + fileName = os.path.join(logsDir, 'processes', f"proc.c3id_{name}.pid_{os.getpid()}.txt") + handler = TimedRotatingFileHandler(fileName, 'midnight', backupCount=30) + formatter = logging.Formatter("%(asctime)s:%(levelname)s:" + name + ":%(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + +def setSlaveLogger(logsDir, name): + """ Set the logger for a single slave process. The file used for it is logs/processes/proc.name.txt and it + can be retrieved with logging.getLogger(name) in other parts of the code + """ + logger = logging.getLogger(name) + fileName = os.path.join(logsDir, 'processes', f"proc.c3id_{name}.txt") + # slaves are short lived, use one log file for each + handler = FileHandler(fileName) + formatter = logging.Formatter("%(asctime)s:%(levelname)s:" + "slave" + ":%(message)s") + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger + + +def setRootLogger(logsDir, quiet=False, debug=True, console=False): + """Sets the root logger with the desired verbosity level + The root logger logs to logs/log.txt and every single + logging instruction is propagated to it (not really nice + to read) + + :arg bool quiet: it tells if a quiet logger is needed + :arg bool debug: it tells if needs a verbose logger + :arg bool console: it tells if to direct all printoput to console rather then files, useful for debug + :return logger: a logger with the appropriate logger level.""" + + createLogdir(logsDir) + createLogdir(os.path.join(logsDir, 'processes')) + createLogdir(os.path.join(logsDir, 'tasks')) + + if console: + # if we are testing log to the console is easier + logging.getLogger().addHandler(logging.StreamHandler()) + else: + logHandler = MultiProcessingLog(os.path.join(logsDir, 'log.txt'), when='midnight') + logFormatter = logging.Formatter("%(asctime)s:%(levelname)s:%(module)s,%(lineno)d:%(message)s") + logHandler.setFormatter(logFormatter) + logging.getLogger().addHandler(logHandler) + loglevel = logging.INFO + if quiet: + loglevel = logging.WARNING + if debug: + loglevel = logging.DEBUG + logging.getLogger().setLevel(loglevel) + logger = setMasterLogger(logsDir) + logger.debug("PID %s.", os.getpid()) + logger.debug("Logging level initialized to %s.", loglevel) + return logger + + +def logVersionAndConfig(config=None, logger=None): + """ + log version number and major config. parameters + args: config : a configuration object loaded from file + args: logger : the logger instance to use + """ + pubstartDict = {} + pubstartDict['version'] = __version__ + pubstartDict['asoworker'] = config.General.asoworker + pubstartDict['instance'] = config.General.instance + if config.General.instance == 'other': + pubstartDict['restHost'] = config.General.restHost + pubstartDict['dbInstance'] = config.General.dbInstance + pubstartDict['max_slaves'] = config.General.max_slaves + pubstartDict['DBShost'] = config.TaskPublisher.DBShost + pubstartDict['dryRun'] = config.TaskPublisher.dryRun + # one line for automatic parsing + logger.info('PUBSTART: %s', json.dumps(pubstartDict)) + # multiple lines for humans to read + for k, v in pubstartDict.items(): + logger.info('%s: %s', k, v) + + def prepareDummySummary(taskname): """ prepare a dummy summary JSON file in case there's nothing to do """ nothingToDo = {} @@ -123,7 +213,7 @@ def markGood(files=None, crabServer=None, asoworker=None, logger=None): nMarked += 1 if nMarked % 10 == 0: logger.info('marked %d files', nMarked) - logger.info('total of %s files, marked as Good', nMarked) + logger.info('total of %s files, marked as Good', nMarked) def markFailed(files=None, crabServer=None, failureReason="", asoworker=None, logger=None): @@ -160,6 +250,61 @@ def markFailed(files=None, crabServer=None, failureReason="", asoworker=None, lo logger.info('total of %s files, marked as Failed', nMarked) +def getInfoFromFMD(crabServer=None, taskname=None, lfns=None, logger=None): + """ + Download and read the files describing what needs to be published + do on a small number of LFNs at a time (numFilesAtOneTime) to avoid + hitting the URL length limit in CMSWEB/Apache + input: lfns : list of strings - a list of LFNs + taskname : string - the name of the task, needed to retrieve FMD + crabServer: an instance of CRABRest - as configured via WorkerUtilities/getCrabserver + logger: a logger + returns: a tuple: (FMDs, blackList) + FMDs: a list of dictionaries, one per file, sorted by CRAB JobID + blackList: boolean, True if this task has problem with FMD access and we need to blacklist it + """ + out = [] + dataDict = {} + blackList = False + dataDict['taskname'] = taskname + i = 0 + numFilesAtOneTime = 10 + logger.debug('FMDATA: will retrieve data for %d files', len(lfns)) + while i < len(lfns): + dataDict['lfnList'] = lfns[i: i + numFilesAtOneTime] + data = encodeRequest(dataDict) + i += numFilesAtOneTime + t1 = time.time() + try: + res = crabServer.get(api='filemetadata', data=data) + # res is a 3-ple: (result, exit code, status) + res = res[0] + t2 = time.time() + elapsed = int(t2 - t1) + fmdata = int(len(str(res)) / 1e6) # convert dict. to string to get actual length in HTTP call + logger.debug('FMDATA: retrieved data for %d files', len(res['result'])) + logger.debug('FMDATA: retrieved: %d MB in %d sec for %s', fmdata, elapsed, taskname) + if elapsed > 60 and fmdata > 100: # more than 1 minute and more than 100MB + blackList = True + return ([], blackList) + except Exception as ex: # pylint: disable=broad-except + t2 = time.time() + elapsed = int(t2 - t1) + logger.error("Error during metadata retrieving from crabserver:\n%s", ex) + if elapsed > 290: + blackList = True + return ([], blackList) + metadataList = [json.loads(md) for md in res['result']] # CRAB REST returns a list of JSON objects + for md in metadataList: + out.append(md) + + logger.info('Got filemetadata for %d LFNs', len(out)) + # sort the list by jobId, makes it easier to compare https://stackoverflow.com/a/73050 + # sort by jobid as strings w/o converting to int becasue of https://github.com/dmwm/CRABServer/issues/7246 + sortedOut = sorted(out, key=lambda md: md['jobid']) + return (sortedOut, blackList) + + def getDBSInputInformation(taskname=None, crabServer=None): """ LOOK UP Input dataset info in CRAB DB task table