From 118fa2bcaf429a1e70076deb5dfd44dc4f4ab75b Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Fri, 30 Aug 2019 21:00:58 +0200 Subject: [PATCH 1/2] Move getting completed job accounting to `retrieve` transport task The task of retrieving the detailed accounting info from the scheduler for completed jobs was erroneously placed within the scheduler status update cycle of the `JobsList`. This would have requested the detailed job info each update cycle where it not for a conditional that checked that the scheduler status was `DONE`. However, within the `JobsList` loop, completed jobs would not even appear in the result and hence the conditional would never be hit. This caused the detailed accounting never to be retrieved. A logically better place for this functionality is in the `retrieve` transport task after the `UPDATE` task, i.e. when the scheduler reports that the job has terminated. The job accounting will query the scheduler for the full accounting which, when implemented for the given scheduler plugin, will be stored as an attribute on the calculation job node. The `Scheduler.get_detailed_jobinfo` method is deprecated in favour of the `Scheduler.get_detailed_job_info`. The former was returning a string with the return value, stdout and stderr formatted within it, which is not very flexible. The replacing method returns a dictionary. --- aiida/engine/processes/calcjobs/manager.py | 12 +--- aiida/engine/processes/calcjobs/tasks.py | 55 ++++++++++--------- .../orm/nodes/process/calculation/calcjob.py | 24 ++++++-- aiida/schedulers/plugins/lsf.py | 4 +- aiida/schedulers/plugins/pbsbaseclasses.py | 4 +- aiida/schedulers/plugins/sge.py | 4 +- aiida/schedulers/plugins/slurm.py | 2 +- aiida/schedulers/plugins/test_sge.py | 2 +- aiida/schedulers/scheduler.py | 33 ++++++++++- 9 files changed, 89 insertions(+), 51 deletions(-) diff --git a/aiida/engine/processes/calcjobs/manager.py b/aiida/engine/processes/calcjobs/manager.py index 68b55518db..5f40eae886 100644 --- a/aiida/engine/processes/calcjobs/manager.py +++ b/aiida/engine/processes/calcjobs/manager.py @@ -14,8 +14,7 @@ from tornado import concurrent, gen -from aiida import schedulers -from aiida.common import exceptions, lang +from aiida.common import lang __all__ = ('JobsList', 'JobManager') @@ -112,15 +111,6 @@ def _get_jobs_from_scheduler(self): self.logger.info('AuthInfo<{}>: successfully retrieved status of active jobs'.format(self._authinfo.pk)) for job_id, job_info in scheduler_response.items(): - # If the job is done then get detailed job information - detailed_job_info = None - if job_info.job_state == schedulers.JobState.DONE: - try: - detailed_job_info = scheduler.get_detailed_jobinfo(job_id) - except exceptions.FeatureNotAvailable: - detailed_job_info = 'This scheduler does not implement get_detailed_jobinfo' - - job_info.detailedJobinfo = detailed_job_info jobs_cache[job_id] = job_info raise gen.Return(jobs_cache) diff --git a/aiida/engine/processes/calcjobs/tasks.py b/aiida/engine/processes/calcjobs/tasks.py index 0f8abf1178..fc19d2df61 100644 --- a/aiida/engine/processes/calcjobs/tasks.py +++ b/aiida/engine/processes/calcjobs/tasks.py @@ -16,7 +16,7 @@ import plumpy from aiida.common.datastructures import CalcJobState -from aiida.common.exceptions import TransportTaskException +from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException from aiida.engine.daemon import execmanager from aiida.engine.utils import exponential_backoff_retry, interruptable_task from aiida.schedulers.datastructures import JobState @@ -37,8 +37,7 @@ @coroutine def task_upload_job(node, transport_queue, calc_info, script_filename, cancellable): - """ - Transport task that will attempt to upload the files of a job calculation to the remote + """Transport task that will attempt to upload the files of a job calculation to the remote. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will @@ -86,8 +85,7 @@ def do_upload(): @coroutine def task_submit_job(node, transport_queue, calc_info, script_filename, cancellable): - """ - Transport task that will attempt to submit a job calculation + """Transport task that will attempt to submit a job calculation. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will @@ -136,8 +134,7 @@ def do_submit(): @coroutine def task_update_job(node, job_manager, cancellable): - """ - Transport task that will attempt to update the scheduler status of the job calculation + """Transport task that will attempt to update the scheduler status of the job calculation. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will @@ -198,8 +195,7 @@ def do_update(): @coroutine def task_retrieve_job(node, transport_queue, retrieved_temporary_folder, cancellable): - """ - Transport task that will attempt to retrieve all files of a completed job calculation + """Transport task that will attempt to retrieve all files of a completed job calculation. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will @@ -226,6 +222,21 @@ def task_retrieve_job(node, transport_queue, retrieved_temporary_folder, cancell def do_retrieve(): with transport_queue.request_transport(authinfo) as request: transport = yield cancellable.with_interrupt(request) + + # Perform the job accounting and set it on the node if successful. If the scheduler does not implement this + # still set the attribute but set it to `None`. This way we can distinguish calculation jobs for which the + # accounting was called but could not be set. + scheduler = node.computer.get_scheduler() + scheduler.set_transport(transport) + + try: + detailed_job_info = scheduler.get_detailed_job_info(node.get_job_id()) + except FeatureNotAvailable: + logger.info('detailed job info not available for scheduler of CalcJob<{}>'.format(node.pk)) + node.set_detailed_job_info(None) + else: + node.set_detailed_job_info(detailed_job_info) + raise Return(execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder)) try: @@ -245,8 +256,7 @@ def do_retrieve(): @coroutine def task_kill_job(node, transport_queue, cancellable): - """ - Transport task that will attempt to kill a job calculation + """Transport task that will attempt to kill a job calculation. The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will @@ -379,31 +389,26 @@ def _launch_task(self, coro, *args, **kwargs): def upload(self, calc_info, script_filename): """Return the `Waiting` state that will `upload` the `CalcJob`.""" - return self.create_state( - ProcessState.WAITING, - None, - msg='Waiting for calculation folder upload', - data=(UPLOAD_COMMAND, calc_info, script_filename)) + msg = 'Waiting for calculation folder upload' + return self.create_state(ProcessState.WAITING, None, msg=msg, data=(UPLOAD_COMMAND, calc_info, script_filename)) def submit(self, calc_info, script_filename): """Return the `Waiting` state that will `submit` the `CalcJob`.""" - return self.create_state( - ProcessState.WAITING, - None, - msg='Waiting for scheduler submission', - data=(SUBMIT_COMMAND, calc_info, script_filename)) + msg = 'Waiting for scheduler submission' + return self.create_state(ProcessState.WAITING, None, msg=msg, data=(SUBMIT_COMMAND, calc_info, script_filename)) def update(self): """Return the `Waiting` state that will `update` the `CalcJob`.""" - return self.create_state( - ProcessState.WAITING, None, msg='Waiting for scheduler update', data=UPDATE_COMMAND) + msg = 'Waiting for scheduler update' + return self.create_state(ProcessState.WAITING, None, msg=msg, data=UPDATE_COMMAND) def retrieve(self): """Return the `Waiting` state that will `retrieve` the `CalcJob`.""" - return self.create_state(ProcessState.WAITING, None, msg='Waiting to retrieve', data=RETRIEVE_COMMAND) + msg = 'Waiting to retrieve' + return self.create_state(ProcessState.WAITING, None, msg=msg, data=RETRIEVE_COMMAND) def parse(self, retrieved_temporary_folder): - """Return the `Running` state that will `parse` the `CalcJob`. + """Return the `Running` state that will parse the `CalcJob`. :param retrieved_temporary_folder: temporary folder used in retrieving that can be used during parsing. """ diff --git a/aiida/orm/nodes/process/calculation/calcjob.py b/aiida/orm/nodes/process/calculation/calcjob.py index 027669d3b7..ea90b51ffb 100644 --- a/aiida/orm/nodes/process/calculation/calcjob.py +++ b/aiida/orm/nodes/process/calculation/calcjob.py @@ -35,7 +35,8 @@ class CalcJobNode(CalculationNode): SCHEDULER_JOB_ID_KEY = 'job_id' SCHEDULER_STATE_KEY = 'scheduler_state' SCHEDULER_LAST_CHECK_TIME_KEY = 'scheduler_lastchecktime' - SCHEUDLER_LAST_JOB_INFO_KEY = 'last_jobinfo' + SCHEDULER_LAST_JOB_INFO_KEY = 'last_jobinfo' + SCHEDULER_DETAILED_JOB_INFO_KEY = 'detailed_job_info' # Base path within the repository where to put objects by default _repository_base_path = 'raw_input' @@ -85,7 +86,8 @@ def _updatable_attributes(cls): # pylint: disable=no-self-argument cls.SCHEDULER_JOB_ID_KEY, cls.SCHEDULER_STATE_KEY, cls.SCHEDULER_LAST_CHECK_TIME_KEY, - cls.SCHEUDLER_LAST_JOB_INFO_KEY, + cls.SCHEDULER_LAST_JOB_INFO_KEY, + cls.SCHEDULER_DETAILED_JOB_INFO_KEY, ) @classproperty @@ -415,12 +417,26 @@ def get_scheduler_lastchecktime(self): return value + def set_detailed_job_info(self, detailed_job_info): + """Set the detailed job info dictionary. + + :param detailed_job_info: a dictionary with metadata with the accounting of a completed job + """ + self.set_attribute(self.SCHEDULER_DETAILED_JOB_INFO_KEY, detailed_job_info) + + def get_detailed_job_info(self): + """Return the detailed job info dictionary. + + :return: the dictionary with detailed job info if defined or None + """ + return self.get_attribute(self.SCHEDULER_DETAILED_JOB_INFO_KEY, None) + def set_last_job_info(self, last_job_info): """Set the last job info. :param last_job_info: a `JobInfo` object """ - self.set_attribute(self.SCHEUDLER_LAST_JOB_INFO_KEY, last_job_info.serialize()) + self.set_attribute(self.SCHEDULER_LAST_JOB_INFO_KEY, last_job_info.serialize()) def get_last_job_info(self): """Return the last information asked to the scheduler about the status of the job. @@ -429,7 +445,7 @@ def get_last_job_info(self): """ from aiida.schedulers.datastructures import JobInfo - last_job_info_serialized = self.get_attribute(self.SCHEUDLER_LAST_JOB_INFO_KEY, None) + last_job_info_serialized = self.get_attribute(self.SCHEDULER_LAST_JOB_INFO_KEY, None) if last_job_info_serialized is not None: job_info = JobInfo() diff --git a/aiida/schedulers/plugins/lsf.py b/aiida/schedulers/plugins/lsf.py index 9b5ee6368e..2ad88d3d1a 100644 --- a/aiida/schedulers/plugins/lsf.py +++ b/aiida/schedulers/plugins/lsf.py @@ -277,14 +277,14 @@ def _get_joblist_command(self, jobs=None, user=None): self.logger.debug('bjobs command: {}'.format(comm)) return comm - def _get_detailed_jobinfo_command(self, jobid): + def _get_detailed_job_info_command(self, job_id): """ Return the command to run to get the detailed information on a job, even after the job has finished. The output text is just retrieved, and returned for logging purposes. """ - return 'bjobs -l {}'.format(escape_for_bash(jobid)) + return 'bjobs -l {}'.format(escape_for_bash(job_id)) def _get_submit_script_header(self, job_tmpl): """ diff --git a/aiida/schedulers/plugins/pbsbaseclasses.py b/aiida/schedulers/plugins/pbsbaseclasses.py index ac03bc4426..1344a7c5a6 100644 --- a/aiida/schedulers/plugins/pbsbaseclasses.py +++ b/aiida/schedulers/plugins/pbsbaseclasses.py @@ -175,14 +175,14 @@ def _get_joblist_command(self, jobs=None, user=None): _LOGGER.debug('qstat command: {}'.format(comm)) return comm - def _get_detailed_jobinfo_command(self, jobid): + def _get_detailed_job_info_command(self, job_id): """ Return the command to run to get the detailed information on a job, even after the job has finished. The output text is just retrieved, and returned for logging purposes. """ - return 'tracejob -v {}'.format(escape_for_bash(jobid)) + return 'tracejob -v {}'.format(escape_for_bash(job_id)) def _get_submit_script_header(self, job_tmpl): """ diff --git a/aiida/schedulers/plugins/sge.py b/aiida/schedulers/plugins/sge.py index b3954b3184..a896e248d8 100644 --- a/aiida/schedulers/plugins/sge.py +++ b/aiida/schedulers/plugins/sge.py @@ -132,8 +132,8 @@ def _get_joblist_command(self, jobs=None, user=None): return command # raise NotImplementedError - def _get_detailed_jobinfo_command(self, jobid): - command = 'qacct -j {}'.format(escape_for_bash(jobid)) + def _get_detailed_job_info_command(self, job_id): + command = 'qacct -j {}'.format(escape_for_bash(job_id)) return command def _get_submit_script_header(self, job_tmpl): diff --git a/aiida/schedulers/plugins/slurm.py b/aiida/schedulers/plugins/slurm.py index 71c888e40d..909882570f 100644 --- a/aiida/schedulers/plugins/slurm.py +++ b/aiida/schedulers/plugins/slurm.py @@ -217,7 +217,7 @@ def _get_joblist_command(self, jobs=None, user=None): self.logger.debug('squeue command: {}'.format(comm)) return comm - def _get_detailed_jobinfo_command(self, jobid): + def _get_detailed_job_info_command(self, job_id): """ Return the command to run to get the detailed information on a job, even after the job has finished. diff --git a/aiida/schedulers/plugins/test_sge.py b/aiida/schedulers/plugins/test_sge.py index 7f13c00b45..ef36797a2c 100644 --- a/aiida/schedulers/plugins/test_sge.py +++ b/aiida/schedulers/plugins/test_sge.py @@ -196,7 +196,7 @@ def test_get_joblist_command(self): def test_detailed_jobinfo_command(self): sge = SgeScheduler() - sge_get_djobinfo_command = sge._get_detailed_jobinfo_command('123456') + sge_get_djobinfo_command = sge._get_detailed_job_info_command('123456') self.assertTrue('123456' in sge_get_djobinfo_command) self.assertTrue('qacct' in sge_get_djobinfo_command) diff --git a/aiida/schedulers/scheduler.py b/aiida/schedulers/scheduler.py index b8064dc93e..14ddfe418e 100644 --- a/aiida/schedulers/scheduler.py +++ b/aiida/schedulers/scheduler.py @@ -272,7 +272,7 @@ def _get_joblist_command(self, jobs=None, user=None): """ raise NotImplementedError - def _get_detailed_jobinfo_command(self, jobid): + def _get_detailed_job_info_command(self, job_id): """ Return the command to run to get the detailed information on a job. This is typically called after the job has finished, to retrieve @@ -283,20 +283,47 @@ def _get_detailed_jobinfo_command(self, jobid): :raises: :class:`aiida.common.exceptions.FeatureNotAvailable` """ - # pylint: disable=no-self-use, not-callable, unused-argument + # pylint: disable=no-self-use,not-callable,unused-argument raise FeatureNotAvailable('Cannot get detailed job info') + def get_detailed_job_info(self, job_id): + """Return the detailed job info. + + This will be a dictionary with the return value, stderr and stdout content returned by calling the command that + is returned by `_get_detailed_job_info_command`. + + :param job_id: the job identifier + :return: dictionary with `retval`, `stdout` and `stderr`. + """ + command = self._get_detailed_job_info_command(job_id) # pylint: disable=assignment-from-no-return + with self.transport: + retval, stdout, stderr = self.transport.exec_command_wait(command) + + detailed_job_info = { + 'retval': retval, + 'stdout': stdout, + 'stderr': stderr, + } + + return detailed_job_info + def get_detailed_jobinfo(self, jobid): """ Return a string with the output of the detailed_jobinfo command. + .. deprecated:: 1.1.0 + Will be removed in `v2.0.0`, use :meth:`aiida.schedulers.scheduler.Scheduler.get_detailed_job_info` instead. + At the moment, the output text is just retrieved and stored for logging purposes, but no parsing is performed. :raises: :class:`aiida.common.exceptions.FeatureNotAvailable` """ + import warnings + from aiida.common.warnings import AiidaDeprecationWarning + warnings.warn('function is deprecated, use `get_detailed_job_info` instead', AiidaDeprecationWarning) # pylint: disable=no-member - command = self._get_detailed_jobinfo_command(jobid=jobid) # pylint: disable=assignment-from-no-return + command = self._get_detailed_job_info_command(job_id=jobid) # pylint: disable=assignment-from-no-return with self.transport: retval, stdout, stderr = self.transport.exec_command_wait(command) From 661b57c89559f046c5fff01912f097aa3b103914 Mon Sep 17 00:00:00 2001 From: Sebastiaan Huber Date: Tue, 10 Dec 2019 19:25:54 +0100 Subject: [PATCH 2/2] Factor out detailed job info fields for SLURM Scheduler implementation This is useful for future implementation of a method that can parse the string output into a dictionary. --- aiida/schedulers/plugins/slurm.py | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/aiida/schedulers/plugins/slurm.py b/aiida/schedulers/plugins/slurm.py index 909882570f..cb27dfe316 100644 --- a/aiida/schedulers/plugins/slurm.py +++ b/aiida/schedulers/plugins/slurm.py @@ -13,10 +13,8 @@ """ import re - -import aiida.schedulers from aiida.common.escaping import escape_for_bash -from aiida.schedulers import SchedulerError +from aiida.schedulers import Scheduler, SchedulerError from aiida.schedulers.datastructures import (JobInfo, JobState, NodeNumberJobResource) # This maps SLURM state codes to our own status list @@ -146,17 +144,26 @@ def __init__(self, *args, **kwargs): raise ValueError(value_error) -class SlurmScheduler(aiida.schedulers.Scheduler): +class SlurmScheduler(Scheduler): """ Support for the SLURM scheduler (http://slurm.schedmd.com/). """ - _logger = aiida.schedulers.Scheduler._logger.getChild('slurm') + _logger = Scheduler._logger.getChild('slurm') # Query only by list of jobs and not by user _features = { 'can_query_by_user': False, } + _detailed_job_info_fields = [ + 'AllocCPUS', 'Account', 'AssocID', 'AveCPU', 'AvePages', 'AveRSS', 'AveVMSize', 'Cluster', 'Comment', 'CPUTime', + 'CPUTimeRAW', 'DerivedExitCode', 'Elapsed', 'Eligible', 'End', 'ExitCode', 'GID', 'Group', 'JobID', 'JobName', + 'MaxRSS', 'MaxRSSNode', 'MaxRSSTask', 'MaxVMSize', 'MaxVMSizeNode', 'MaxVMSizeTask', 'MinCPU', 'MinCPUNode', + 'MinCPUTask', 'NCPUS', 'NNodes', 'NodeList', 'NTasks', 'Priority', 'Partition', 'QOSRAW', 'ReqCPUS', 'Reserved', + 'ResvCPU', 'ResvCPURAW', 'Start', 'State', 'Submit', 'Suspended', 'SystemCPU', 'Timelimit', 'TotalCPU', 'UID', + 'User', 'UserCPU' + ] + # The class to be used for the job resource. _job_resource_class = SlurmJobResource @@ -226,13 +233,8 @@ def _get_detailed_job_info_command(self, job_id): --parsable split the fields with a pipe (|), adding a pipe also at the end. """ - return 'sacct --format=AllocCPUS,Account,AssocID,AveCPU,AvePages,' \ - 'AveRSS,AveVMSize,Cluster,Comment,CPUTime,CPUTimeRAW,DerivedExitCode,' \ - 'Elapsed,Eligible,End,ExitCode,GID,Group,JobID,JobName,MaxRSS,MaxRSSNode,' \ - 'MaxRSSTask,MaxVMSize,MaxVMSizeNode,MaxVMSizeTask,MinCPU,MinCPUNode,' \ - 'MinCPUTask,NCPUS,NNodes,NodeList,NTasks,Priority,Partition,QOSRAW,ReqCPUS,' \ - 'Reserved,ResvCPU,ResvCPURAW,Start,State,Submit,Suspended,SystemCPU,Timelimit,' \ - 'TotalCPU,UID,User,UserCPU --parsable --jobs={}'.format(jobid) + fields = ','.join(self._detailed_job_info_fields) + return 'sacct --format={} --parsable --jobs={}'.format(fields, job_id) def _get_submit_script_header(self, job_tmpl): """