Skip to content

Commit

Permalink
CalcJob: add the option to stash files after job completion (#4424)
Browse files Browse the repository at this point in the history
A new namespace `stash` is added to the `metadata.options` input
namespace of the `CalcJob` process. This option namespace allows a user
to specify certain files that are created by the calculation job to be
stashed somewhere on the remote. This can be useful if those files need
to be stored for a longer time than the scratch space where the job was
run is typically not cleaned for, but need to be kept on the remote
machine and not retrieved. Examples are files that are necessary to
restart a calculation but are too big to be retrieved and stored
permanently in the local file repository.

The files that are to be stashed are specified through their relative
filepaths within the working directory in the `stash.source_list`
option. For now, the only supported option is to have AiiDA's engine
copy the files to another location on the same filesystem as the working
directory of the calculation job. The base path is defined through the
`stash.target_base` option. In the future, other methods may be
implemented, such as placing all files in a (compressed) tarball or even
stash files on tape. Which mode is to be used is communicated through
the enum `aiida.common.datastructures.StashMode` which for now therefore
only has the `COPY` value.

If the `stash` option namespace is defined for a calculation job, the
daemon will perform the stashing operations before the files are
retrieved. This also means that the stashing also happens before the
parsing of the output files (which occurs after the retrieving step)
which means that the files will be stashed independent of the final
exit status that the parser will assign to the calculation job. This
may cause files to be stashed of calculations that will later be
considered to have failed. However, the stashed files can always be
deleted manually by the user afterwards if needed.

Finally, the stashed files are represented by an output node that is
attached to the calculation node through the label `remote_stash`. Just
like the `remote_folder` node, this represents a location or files on a
remote machine and so is merely a "symbolic link" of sorts. AiiDA does
not actually own the files and the contents may disappear at some point.
To be able to distinguish the stashed folder from the remote folder, a
new data plugin is used, the `RemoteStashFolderData`. The base class is
`RemoteStashData` which is not instantiable, but will merely serve as a
base class for future subclasses, one for each `StashMode` value. The
reason is that the way files need to be accessed depend on the way they
were stashed and so it is good to have separate classes for this.

It was considered to give `RemoteFolderData` and `RemoteData` the same
base class (changing the type of the `remote_folder` to a new subclass
`RemoteFolderData`) but this would introduce breaking changes and so this
was relegated to a potential future major release.
  • Loading branch information
sphuber authored Mar 10, 2021
1 parent 241f251 commit 9359b07
Show file tree
Hide file tree
Showing 15 changed files with 481 additions and 56 deletions.
23 changes: 22 additions & 1 deletion .github/system_tests/test_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@
###########################################################################
# pylint: disable=no-name-in-module
"""Tests to run with a running daemon."""
import os
import shutil
import subprocess
import sys
import tempfile
import time

from aiida.common import exceptions
from aiida.common import exceptions, StashMode
from aiida.engine import run, submit
from aiida.engine.daemon.client import get_daemon_client
from aiida.engine.persistence import ObjectLoader
Expand Down Expand Up @@ -415,6 +418,24 @@ def launch_all():
print('Running the `MultiplyAddWorkChain`')
run_multiply_add_workchain()

# Testing the stashing functionality
process, inputs, expected_result = create_calculation_process(code=code_doubler, inputval=1)
with tempfile.TemporaryDirectory() as tmpdir:

# Delete the temporary directory to test that the stashing functionality will create it if necessary
shutil.rmtree(tmpdir, ignore_errors=True)

source_list = ['output.txt', 'triple_value.tmp']
inputs['metadata']['options']['stash'] = {'target_base': tmpdir, 'source_list': source_list}
_, node = run.get_node(process, **inputs)
assert node.is_finished_ok
assert 'remote_stash' in node.outputs
remote_stash = node.outputs.remote_stash
assert remote_stash.stash_mode == StashMode.COPY
assert remote_stash.target_basepath.startswith(tmpdir)
assert sorted(remote_stash.source_list) == sorted(source_list)
assert sorted(p for p in os.listdir(remote_stash.target_basepath)) == sorted(source_list)

# Submitting the calcfunction through the launchers
print('Submitting calcfunction to the daemon')
proc, expected_result = launch_calcfunction(inputval=1)
Expand Down
9 changes: 8 additions & 1 deletion aiida/common/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@

from .extendeddicts import DefaultFieldsAttributeDict

__all__ = ('CalcJobState', 'CalcInfo', 'CodeInfo', 'CodeRunMode')
__all__ = ('StashMode', 'CalcJobState', 'CalcInfo', 'CodeInfo', 'CodeRunMode')


class StashMode(Enum):
"""Mode to use when stashing files from the working directory of a completed calculation job for safekeeping."""

COPY = 'copy'


class CalcJobState(Enum):
Expand All @@ -21,6 +27,7 @@ class CalcJobState(Enum):
UPLOADING = 'uploading'
SUBMITTING = 'submitting'
WITHSCHEDULER = 'withscheduler'
STASHING = 'stashing'
RETRIEVING = 'retrieving'
PARSING = 'parsing'

Expand Down
59 changes: 59 additions & 0 deletions aiida/engine/daemon/execmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,65 @@ def submit_calculation(calculation: CalcJobNode, transport: Transport) -> str:
return job_id


def stash_calculation(calculation: CalcJobNode, transport: Transport) -> None:
"""Stash files from the working directory of a completed calculation to a permanent remote folder.
After a calculation has been completed, optionally stash files from the work directory to a storage location on the
same remote machine. This is useful if one wants to keep certain files from a completed calculation to be removed
from the scratch directory, because they are necessary for restarts, but that are too heavy to retrieve.
Instructions of which files to copy where are retrieved from the `stash.source_list` option.
:param calculation: the calculation job node.
:param transport: an already opened transport.
"""
from aiida.common.datastructures import StashMode
from aiida.orm import RemoteStashFolderData

logger_extra = get_dblogger_extra(calculation)

stash_options = calculation.get_option('stash')
stash_mode = stash_options.get('mode', StashMode.COPY.value)
source_list = stash_options.get('source_list', [])

if not source_list:
return

if stash_mode != StashMode.COPY.value:
EXEC_LOGGER.warning(f'stashing mode {stash_mode} is not implemented yet.')
return

cls = RemoteStashFolderData

EXEC_LOGGER.debug(f'stashing files for calculation<{calculation.pk}>: {source_list}', extra=logger_extra)

uuid = calculation.uuid
target_basepath = os.path.join(stash_options['target_base'], uuid[:2], uuid[2:4], uuid[4:])

for source_filename in source_list:

source_filepath = os.path.join(calculation.get_remote_workdir(), source_filename)
target_filepath = os.path.join(target_basepath, source_filename)

# If the source file is in a (nested) directory, create those directories first in the target directory
target_dirname = os.path.dirname(target_filepath)
transport.makedirs(target_dirname, ignore_existing=True)

try:
transport.copy(source_filepath, target_filepath)
except (IOError, ValueError) as exception:
EXEC_LOGGER.warning(f'failed to stash {source_filepath} to {target_filepath}: {exception}')
else:
EXEC_LOGGER.debug(f'stashed {source_filepath} to {target_filepath}')

remote_stash = cls(
computer=calculation.computer,
target_basepath=target_basepath,
stash_mode=StashMode(stash_mode),
source_list=source_list,
).store()
remote_stash.add_incoming(calculation, link_type=LinkType.CREATE, link_label='remote_stash')


def retrieve_calculation(calculation: CalcJobNode, transport: Transport, retrieved_temporary_folder: str) -> None:
"""Retrieve all the files of a completed job calculation using the given transport.
Expand Down
67 changes: 51 additions & 16 deletions aiida/engine/processes/calcjobs/calcjob.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,44 @@ def validate_calc_job(inputs: Any, ctx: PortNamespace) -> Optional[str]: # pyli
return None


def validate_stash_options(stash_options: Any, _: Any) -> Optional[str]:
"""Validate the ``stash`` options."""
from aiida.common.datastructures import StashMode

target_base = stash_options.get('target_base', None)
source_list = stash_options.get('source_list', None)
stash_mode = stash_options.get('mode', StashMode.COPY.value)

if not isinstance(target_base, str) or not os.path.isabs(target_base):
return f'`metadata.options.stash.target_base` should be an absolute filepath, got: {target_base}'

if (
not isinstance(source_list, (list, tuple)) or
any(not isinstance(src, str) or os.path.isabs(src) for src in source_list)
):
port = 'metadata.options.stash.source_list'
return f'`{port}` should be a list or tuple of relative filepaths, got: {source_list}'

try:
StashMode(stash_mode)
except ValueError:
port = 'metadata.options.stash.mode'
return f'`{port}` should be a member of aiida.common.datastructures.StashMode, got: {stash_mode}'

return None


def validate_parser(parser_name: Any, _: Any) -> Optional[str]:
"""Validate the parser.
:return: string with error message in case the inputs are invalid
"""
from aiida.plugins import ParserFactory

if parser_name is not plumpy.ports.UNSPECIFIED:
try:
ParserFactory(parser_name)
except exceptions.EntryPointError as exception:
return f'invalid parser specified: {exception}'
try:
ParserFactory(parser_name)
except exceptions.EntryPointError as exception:
return f'invalid parser specified: {exception}'

return None

Expand All @@ -118,9 +144,6 @@ def validate_additional_retrieve_list(additional_retrieve_list: Any, _: Any) ->
:return: string with error message in case the input is invalid.
"""
if additional_retrieve_list is plumpy.ports.UNSPECIFIED:
return None

if any(not isinstance(value, str) or os.path.isabs(value) for value in additional_retrieve_list):
return f'`additional_retrieve_list` should only contain relative filepaths but got: {additional_retrieve_list}'

Expand Down Expand Up @@ -216,9 +239,21 @@ def define(cls, spec: CalcJobProcessSpec) -> None: # type: ignore[override]
spec.input('metadata.options.additional_retrieve_list', required=False,
valid_type=(list, tuple), validator=validate_additional_retrieve_list,
help='List of relative file paths that should be retrieved in addition to what the plugin specifies.')
spec.input_namespace('metadata.options.stash', required=False, populate_defaults=False,
validator=validate_stash_options,
help='Optional directives to stash files after the calculation job has completed.')
spec.input('metadata.options.stash.target_base', valid_type=str, required=False,
help='The base location to where the files should be stashd. For example, for the `copy` stash mode, this '
'should be an absolute filepath on the remote computer.')
spec.input('metadata.options.stash.source_list', valid_type=(tuple, list), required=False,
help='Sequence of relative filepaths representing files in the remote directory that should be stashed.')
spec.input('metadata.options.stash.stash_mode', valid_type=str, required=False,
help='Mode with which to perform the stashing, should be value of `aiida.common.datastructures.StashMode.')

spec.output('remote_folder', valid_type=orm.RemoteData,
help='Input files necessary to run the process will be stored in this folder node.')
spec.output('remote_stash', valid_type=orm.RemoteStashData, required=False,
help='Contents of the `stash.source_list` option are stored in this remote folder after job completion.')
spec.output(cls.link_label_retrieved, valid_type=orm.FolderData, pass_to_parser=True,
help='Files that are retrieved by the daemon will be stored in this node. By default the stdout and stderr '
'of the scheduler will be added, but one can add more by specifying them in `CalcInfo.retrieve_list`.')
Expand Down Expand Up @@ -653,29 +688,29 @@ def presubmit(self, folder: Folder) -> CalcInfo:
local_copy_list = calc_info.local_copy_list
try:
validate_list_of_string_tuples(local_copy_list, tuple_length=3)
except ValidationError as exc:
except ValidationError as exception:
raise PluginInternalError(
f'[presubmission of calc {this_pk}] local_copy_list format problem: {exc}'
) from exc
f'[presubmission of calc {this_pk}] local_copy_list format problem: {exception}'
) from exception

remote_copy_list = calc_info.remote_copy_list
try:
validate_list_of_string_tuples(remote_copy_list, tuple_length=3)
except ValidationError as exc:
except ValidationError as exception:
raise PluginInternalError(
f'[presubmission of calc {this_pk}] remote_copy_list format problem: {exc}'
) from exc
f'[presubmission of calc {this_pk}] remote_copy_list format problem: {exception}'
) from exception

for (remote_computer_uuid, _, dest_rel_path) in remote_copy_list:
try:
Computer.objects.get(uuid=remote_computer_uuid) # pylint: disable=unused-variable
except exceptions.NotExistent as exc:
except exceptions.NotExistent as exception:
raise PluginInternalError(
'[presubmission of calc {}] '
'The remote copy requires a computer with UUID={}'
'but no such computer was found in the '
'database'.format(this_pk, remote_computer_uuid)
) from exc
) from exception
if os.path.isabs(dest_rel_path):
raise PluginInternalError(
'[presubmission of calc {}] '
Expand Down
Loading

0 comments on commit 9359b07

Please sign in to comment.