Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move getting completed job accounting to retrieve transport task #3639

Merged
merged 2 commits into from
Dec 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 1 addition & 11 deletions aiida/engine/processes/calcjobs/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@

from tornado import concurrent, gen

from aiida import schedulers
from aiida.common import exceptions, lang
from aiida.common import lang

__all__ = ('JobsList', 'JobManager')

Expand Down Expand Up @@ -112,15 +111,6 @@ def _get_jobs_from_scheduler(self):
self.logger.info('AuthInfo<{}>: successfully retrieved status of active jobs'.format(self._authinfo.pk))

for job_id, job_info in scheduler_response.items():
# If the job is done then get detailed job information
detailed_job_info = None
if job_info.job_state == schedulers.JobState.DONE:
try:
detailed_job_info = scheduler.get_detailed_jobinfo(job_id)
except exceptions.FeatureNotAvailable:
detailed_job_info = 'This scheduler does not implement get_detailed_jobinfo'

job_info.detailedJobinfo = detailed_job_info
jobs_cache[job_id] = job_info

raise gen.Return(jobs_cache)
Expand Down
55 changes: 30 additions & 25 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import plumpy

from aiida.common.datastructures import CalcJobState
from aiida.common.exceptions import TransportTaskException
from aiida.common.exceptions import FeatureNotAvailable, TransportTaskException
from aiida.engine.daemon import execmanager
from aiida.engine.utils import exponential_backoff_retry, interruptable_task
from aiida.schedulers.datastructures import JobState
Expand All @@ -37,8 +37,7 @@

@coroutine
def task_upload_job(node, transport_queue, calc_info, script_filename, cancellable):
"""
Transport task that will attempt to upload the files of a job calculation to the remote
"""Transport task that will attempt to upload the files of a job calculation to the remote.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
Expand Down Expand Up @@ -86,8 +85,7 @@ def do_upload():

@coroutine
def task_submit_job(node, transport_queue, calc_info, script_filename, cancellable):
"""
Transport task that will attempt to submit a job calculation
"""Transport task that will attempt to submit a job calculation.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
Expand Down Expand Up @@ -136,8 +134,7 @@ def do_submit():

@coroutine
def task_update_job(node, job_manager, cancellable):
"""
Transport task that will attempt to update the scheduler status of the job calculation
"""Transport task that will attempt to update the scheduler status of the job calculation.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
Expand Down Expand Up @@ -198,8 +195,7 @@ def do_update():

@coroutine
def task_retrieve_job(node, transport_queue, retrieved_temporary_folder, cancellable):
"""
Transport task that will attempt to retrieve all files of a completed job calculation
"""Transport task that will attempt to retrieve all files of a completed job calculation.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
Expand All @@ -226,6 +222,21 @@ def task_retrieve_job(node, transport_queue, retrieved_temporary_folder, cancell
def do_retrieve():
with transport_queue.request_transport(authinfo) as request:
transport = yield cancellable.with_interrupt(request)

# Perform the job accounting and set it on the node if successful. If the scheduler does not implement this
# still set the attribute but set it to `None`. This way we can distinguish calculation jobs for which the
# accounting was called but could not be set.
scheduler = node.computer.get_scheduler()
scheduler.set_transport(transport)

try:
detailed_job_info = scheduler.get_detailed_job_info(node.get_job_id())
except FeatureNotAvailable:
logger.info('detailed job info not available for scheduler of CalcJob<{}>'.format(node.pk))
node.set_detailed_job_info(None)
else:
node.set_detailed_job_info(detailed_job_info)

raise Return(execmanager.retrieve_calculation(node, transport, retrieved_temporary_folder))

try:
Expand All @@ -245,8 +256,7 @@ def do_retrieve():

@coroutine
def task_kill_job(node, transport_queue, cancellable):
"""
Transport task that will attempt to kill a job calculation
"""Transport task that will attempt to kill a job calculation.

The task will first request a transport from the queue. Once the transport is yielded, the relevant execmanager
function is called, wrapped in the exponential_backoff_retry coroutine, which, in case of a caught exception, will
Expand Down Expand Up @@ -379,31 +389,26 @@ def _launch_task(self, coro, *args, **kwargs):

def upload(self, calc_info, script_filename):
"""Return the `Waiting` state that will `upload` the `CalcJob`."""
return self.create_state(
ProcessState.WAITING,
None,
msg='Waiting for calculation folder upload',
data=(UPLOAD_COMMAND, calc_info, script_filename))
msg = 'Waiting for calculation folder upload'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=(UPLOAD_COMMAND, calc_info, script_filename))

def submit(self, calc_info, script_filename):
"""Return the `Waiting` state that will `submit` the `CalcJob`."""
return self.create_state(
ProcessState.WAITING,
None,
msg='Waiting for scheduler submission',
data=(SUBMIT_COMMAND, calc_info, script_filename))
msg = 'Waiting for scheduler submission'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=(SUBMIT_COMMAND, calc_info, script_filename))

def update(self):
"""Return the `Waiting` state that will `update` the `CalcJob`."""
return self.create_state(
ProcessState.WAITING, None, msg='Waiting for scheduler update', data=UPDATE_COMMAND)
msg = 'Waiting for scheduler update'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=UPDATE_COMMAND)

def retrieve(self):
"""Return the `Waiting` state that will `retrieve` the `CalcJob`."""
return self.create_state(ProcessState.WAITING, None, msg='Waiting to retrieve', data=RETRIEVE_COMMAND)
msg = 'Waiting to retrieve'
return self.create_state(ProcessState.WAITING, None, msg=msg, data=RETRIEVE_COMMAND)

def parse(self, retrieved_temporary_folder):
"""Return the `Running` state that will `parse` the `CalcJob`.
"""Return the `Running` state that will parse the `CalcJob`.

:param retrieved_temporary_folder: temporary folder used in retrieving that can be used during parsing.
"""
Expand Down
24 changes: 20 additions & 4 deletions aiida/orm/nodes/process/calculation/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ class CalcJobNode(CalculationNode):
SCHEDULER_JOB_ID_KEY = 'job_id'
SCHEDULER_STATE_KEY = 'scheduler_state'
SCHEDULER_LAST_CHECK_TIME_KEY = 'scheduler_lastchecktime'
SCHEUDLER_LAST_JOB_INFO_KEY = 'last_jobinfo'
SCHEDULER_LAST_JOB_INFO_KEY = 'last_jobinfo'
SCHEDULER_DETAILED_JOB_INFO_KEY = 'detailed_job_info'

# Base path within the repository where to put objects by default
_repository_base_path = 'raw_input'
Expand Down Expand Up @@ -85,7 +86,8 @@ def _updatable_attributes(cls): # pylint: disable=no-self-argument
cls.SCHEDULER_JOB_ID_KEY,
cls.SCHEDULER_STATE_KEY,
cls.SCHEDULER_LAST_CHECK_TIME_KEY,
cls.SCHEUDLER_LAST_JOB_INFO_KEY,
cls.SCHEDULER_LAST_JOB_INFO_KEY,
cls.SCHEDULER_DETAILED_JOB_INFO_KEY,
)

@classproperty
Expand Down Expand Up @@ -415,12 +417,26 @@ def get_scheduler_lastchecktime(self):

return value

def set_detailed_job_info(self, detailed_job_info):
"""Set the detailed job info dictionary.

:param detailed_job_info: a dictionary with metadata with the accounting of a completed job
"""
self.set_attribute(self.SCHEDULER_DETAILED_JOB_INFO_KEY, detailed_job_info)

def get_detailed_job_info(self):
"""Return the detailed job info dictionary.

:return: the dictionary with detailed job info if defined or None
"""
return self.get_attribute(self.SCHEDULER_DETAILED_JOB_INFO_KEY, None)

def set_last_job_info(self, last_job_info):
"""Set the last job info.

:param last_job_info: a `JobInfo` object
"""
self.set_attribute(self.SCHEUDLER_LAST_JOB_INFO_KEY, last_job_info.serialize())
self.set_attribute(self.SCHEDULER_LAST_JOB_INFO_KEY, last_job_info.serialize())

def get_last_job_info(self):
"""Return the last information asked to the scheduler about the status of the job.
Expand All @@ -429,7 +445,7 @@ def get_last_job_info(self):
"""
from aiida.schedulers.datastructures import JobInfo

last_job_info_serialized = self.get_attribute(self.SCHEUDLER_LAST_JOB_INFO_KEY, None)
last_job_info_serialized = self.get_attribute(self.SCHEDULER_LAST_JOB_INFO_KEY, None)

if last_job_info_serialized is not None:
job_info = JobInfo()
Expand Down
4 changes: 2 additions & 2 deletions aiida/schedulers/plugins/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,14 +277,14 @@ def _get_joblist_command(self, jobs=None, user=None):
self.logger.debug('bjobs command: {}'.format(comm))
return comm

def _get_detailed_jobinfo_command(self, jobid):
def _get_detailed_job_info_command(self, job_id):
"""
Return the command to run to get the detailed information on a job,
even after the job has finished.

The output text is just retrieved, and returned for logging purposes.
"""
return 'bjobs -l {}'.format(escape_for_bash(jobid))
return 'bjobs -l {}'.format(escape_for_bash(job_id))

def _get_submit_script_header(self, job_tmpl):
"""
Expand Down
4 changes: 2 additions & 2 deletions aiida/schedulers/plugins/pbsbaseclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,14 +175,14 @@ def _get_joblist_command(self, jobs=None, user=None):
_LOGGER.debug('qstat command: {}'.format(comm))
return comm

def _get_detailed_jobinfo_command(self, jobid):
def _get_detailed_job_info_command(self, job_id):
"""
Return the command to run to get the detailed information on a job,
even after the job has finished.

The output text is just retrieved, and returned for logging purposes.
"""
return 'tracejob -v {}'.format(escape_for_bash(jobid))
return 'tracejob -v {}'.format(escape_for_bash(job_id))

def _get_submit_script_header(self, job_tmpl):
"""
Expand Down
4 changes: 2 additions & 2 deletions aiida/schedulers/plugins/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,8 @@ def _get_joblist_command(self, jobs=None, user=None):
return command
# raise NotImplementedError

def _get_detailed_jobinfo_command(self, jobid):
command = 'qacct -j {}'.format(escape_for_bash(jobid))
def _get_detailed_job_info_command(self, job_id):
command = 'qacct -j {}'.format(escape_for_bash(job_id))
return command

def _get_submit_script_header(self, job_tmpl):
Expand Down
28 changes: 15 additions & 13 deletions aiida/schedulers/plugins/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,8 @@
"""
import re


import aiida.schedulers
from aiida.common.escaping import escape_for_bash
from aiida.schedulers import SchedulerError
from aiida.schedulers import Scheduler, SchedulerError
from aiida.schedulers.datastructures import (JobInfo, JobState, NodeNumberJobResource)

# This maps SLURM state codes to our own status list
Expand Down Expand Up @@ -146,17 +144,26 @@ def __init__(self, *args, **kwargs):
raise ValueError(value_error)


class SlurmScheduler(aiida.schedulers.Scheduler):
class SlurmScheduler(Scheduler):
"""
Support for the SLURM scheduler (http://slurm.schedmd.com/).
"""
_logger = aiida.schedulers.Scheduler._logger.getChild('slurm')
_logger = Scheduler._logger.getChild('slurm')

# Query only by list of jobs and not by user
_features = {
'can_query_by_user': False,
}

_detailed_job_info_fields = [
'AllocCPUS', 'Account', 'AssocID', 'AveCPU', 'AvePages', 'AveRSS', 'AveVMSize', 'Cluster', 'Comment', 'CPUTime',
'CPUTimeRAW', 'DerivedExitCode', 'Elapsed', 'Eligible', 'End', 'ExitCode', 'GID', 'Group', 'JobID', 'JobName',
'MaxRSS', 'MaxRSSNode', 'MaxRSSTask', 'MaxVMSize', 'MaxVMSizeNode', 'MaxVMSizeTask', 'MinCPU', 'MinCPUNode',
'MinCPUTask', 'NCPUS', 'NNodes', 'NodeList', 'NTasks', 'Priority', 'Partition', 'QOSRAW', 'ReqCPUS', 'Reserved',
'ResvCPU', 'ResvCPURAW', 'Start', 'State', 'Submit', 'Suspended', 'SystemCPU', 'Timelimit', 'TotalCPU', 'UID',
'User', 'UserCPU'
]

# The class to be used for the job resource.
_job_resource_class = SlurmJobResource

Expand Down Expand Up @@ -217,7 +224,7 @@ def _get_joblist_command(self, jobs=None, user=None):
self.logger.debug('squeue command: {}'.format(comm))
return comm

def _get_detailed_jobinfo_command(self, jobid):
def _get_detailed_job_info_command(self, job_id):
"""
Return the command to run to get the detailed information on a job,
even after the job has finished.
Expand All @@ -226,13 +233,8 @@ def _get_detailed_jobinfo_command(self, jobid):
--parsable split the fields with a pipe (|), adding a pipe also at
the end.
"""
return 'sacct --format=AllocCPUS,Account,AssocID,AveCPU,AvePages,' \
'AveRSS,AveVMSize,Cluster,Comment,CPUTime,CPUTimeRAW,DerivedExitCode,' \
'Elapsed,Eligible,End,ExitCode,GID,Group,JobID,JobName,MaxRSS,MaxRSSNode,' \
'MaxRSSTask,MaxVMSize,MaxVMSizeNode,MaxVMSizeTask,MinCPU,MinCPUNode,' \
'MinCPUTask,NCPUS,NNodes,NodeList,NTasks,Priority,Partition,QOSRAW,ReqCPUS,' \
'Reserved,ResvCPU,ResvCPURAW,Start,State,Submit,Suspended,SystemCPU,Timelimit,' \
'TotalCPU,UID,User,UserCPU --parsable --jobs={}'.format(jobid)
fields = ','.join(self._detailed_job_info_fields)
return 'sacct --format={} --parsable --jobs={}'.format(fields, job_id)

def _get_submit_script_header(self, job_tmpl):
"""
Expand Down
2 changes: 1 addition & 1 deletion aiida/schedulers/plugins/test_sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_get_joblist_command(self):
def test_detailed_jobinfo_command(self):
sge = SgeScheduler()

sge_get_djobinfo_command = sge._get_detailed_jobinfo_command('123456')
sge_get_djobinfo_command = sge._get_detailed_job_info_command('123456')

self.assertTrue('123456' in sge_get_djobinfo_command)
self.assertTrue('qacct' in sge_get_djobinfo_command)
Expand Down
33 changes: 30 additions & 3 deletions aiida/schedulers/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def _get_joblist_command(self, jobs=None, user=None):
"""
raise NotImplementedError

def _get_detailed_jobinfo_command(self, jobid):
def _get_detailed_job_info_command(self, job_id):
"""
Return the command to run to get the detailed information on a job.
This is typically called after the job has finished, to retrieve
Expand All @@ -283,20 +283,47 @@ def _get_detailed_jobinfo_command(self, jobid):

:raises: :class:`aiida.common.exceptions.FeatureNotAvailable`
"""
# pylint: disable=no-self-use, not-callable, unused-argument
# pylint: disable=no-self-use,not-callable,unused-argument
raise FeatureNotAvailable('Cannot get detailed job info')

def get_detailed_job_info(self, job_id):
"""Return the detailed job info.

This will be a dictionary with the return value, stderr and stdout content returned by calling the command that
is returned by `_get_detailed_job_info_command`.

:param job_id: the job identifier
:return: dictionary with `retval`, `stdout` and `stderr`.
"""
command = self._get_detailed_job_info_command(job_id) # pylint: disable=assignment-from-no-return
with self.transport:
retval, stdout, stderr = self.transport.exec_command_wait(command)

detailed_job_info = {
'retval': retval,
'stdout': stdout,
'stderr': stderr,
}

return detailed_job_info

def get_detailed_jobinfo(self, jobid):
"""
Return a string with the output of the detailed_jobinfo command.

.. deprecated:: 1.1.0
Will be removed in `v2.0.0`, use :meth:`aiida.schedulers.scheduler.Scheduler.get_detailed_job_info` instead.

At the moment, the output text is just retrieved
and stored for logging purposes, but no parsing is performed.

:raises: :class:`aiida.common.exceptions.FeatureNotAvailable`
"""
import warnings
from aiida.common.warnings import AiidaDeprecationWarning
warnings.warn('function is deprecated, use `get_detailed_job_info` instead', AiidaDeprecationWarning) # pylint: disable=no-member

command = self._get_detailed_jobinfo_command(jobid=jobid) # pylint: disable=assignment-from-no-return
command = self._get_detailed_job_info_command(job_id=jobid) # pylint: disable=assignment-from-no-return
with self.transport:
retval, stdout, stderr = self.transport.exec_command_wait(command)

Expand Down