Skip to content

Commit

Permalink
Add the TransferCalcJob plugin (#4194)
Browse files Browse the repository at this point in the history
This calcjob allows the user to copy files between a remote machine and
the local machine running AiiDA. More specifically, it can do any of the
following:

* Take any number of files from any number of `RemoteData` folders in
a remote machine and copy them in the local repository of a single
newly created `FolderData` node.

* Take any number of files from any number of `FolderData` nodes in the
local machine and copy them in a single newly created `RemoteData` folder
in a given remote machine.

These are the main two use cases, but there are also other more complex
combinations allowed by the current implementation.

Co-authored-by: Sebastiaan Huber <mail@sphuber.net>
  • Loading branch information
ramirezfranciscof and sphuber authored Dec 9, 2020
1 parent a873332 commit 8260b59
Show file tree
Hide file tree
Showing 7 changed files with 591 additions and 7 deletions.
253 changes: 253 additions & 0 deletions aiida/calculations/transfer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""Implementation of Transfer CalcJob."""

import os
from aiida import orm
from aiida.engine import CalcJob
from aiida.common.datastructures import CalcInfo


def validate_instructions(instructions, _):
"""Check that the instructions dict contains the necessary keywords"""

instructions_dict = instructions.get_dict()
retrieve_files = instructions_dict.get('retrieve_files', None)

if retrieve_files is None:
errmsg = (
'\n\n'
'no indication of what to do in the instruction node:\n'
f' > {instructions.uuid}\n'
'(to store the files in the repository set retrieve_files=True,\n'
'to copy them to the specified folder on the remote computer,\n'
'set it to False)\n'
)
return errmsg

if not isinstance(retrieve_files, bool):
errmsg = (
'entry for retrieve files inside of instruction node:\n'
f' > {instructions.uuid}\n'
'must be either True or False; instead, it is:\n'
f' > {retrieve_files}\n'
)
return errmsg

local_files = instructions_dict.get('local_files', None)
remote_files = instructions_dict.get('remote_files', None)
symlink_files = instructions_dict.get('symlink_files', None)

if not any([local_files, remote_files, symlink_files]):
errmsg = (
'no indication of which files to copy were found in the instruction node:\n'
f' > {instructions.uuid}\n'
'Please include at least one of `local_files`, `remote_files`, or `symlink_files`.\n'
'These should be lists containing 3-tuples with the following format:\n'
' (source_node_key, source_relpath, target_relpath)\n'
)
return errmsg


def validate_transfer_inputs(inputs, _):
"""Check that the instructions dict and the source nodes are consistent"""

source_nodes = inputs['source_nodes']
instructions = inputs['instructions']
computer = inputs['metadata']['computer']

instructions_dict = instructions.get_dict()
local_files = instructions_dict.get('local_files', [])
remote_files = instructions_dict.get('remote_files', [])
symlink_files = instructions_dict.get('symlink_files', [])

source_nodes_provided = set(source_nodes.keys())
source_nodes_required = set()
error_message_list = []

for node_label, node_object in source_nodes.items():
if isinstance(node_object, orm.RemoteData):
if computer.name != node_object.computer.name:
error_message = (
f' > remote node `{node_label}` points to computer `{node_object.computer}`, '
f'not the one being used (`{computer}`)'
)
error_message_list.append(error_message)

for source_label, _, _ in local_files:
source_nodes_required.add(source_label)
source_node = source_nodes.get(source_label, None)
error_message = check_node_type('local_files', source_label, source_node, orm.FolderData)
if error_message:
error_message_list.append(error_message)

for source_label, _, _ in remote_files:
source_nodes_required.add(source_label)
source_node = source_nodes.get(source_label, None)
error_message = check_node_type('remote_files', source_label, source_node, orm.RemoteData)
if error_message:
error_message_list.append(error_message)

for source_label, _, _ in symlink_files:
source_nodes_required.add(source_label)
source_node = source_nodes.get(source_label, None)
error_message = check_node_type('symlink_files', source_label, source_node, orm.RemoteData)
if error_message:
error_message_list.append(error_message)

unrequired_nodes = source_nodes_provided.difference(source_nodes_required)
for node_label in unrequired_nodes:
error_message = f' > node `{node_label}` provided as inputs is not being used'
error_message_list.append(error_message)

if len(error_message_list) > 0:
error_message = '\n\n'
for error_add in error_message_list:
error_message = error_message + error_add + '\n'
return error_message


def check_node_type(list_name, node_label, node_object, node_type):
"""Common utility function to check the type of a node"""

if node_object is None:
return f' > node `{node_label}` requested on list `{list_name}` not found among inputs'

if not isinstance(node_object, node_type):
target_class = node_type.class_node_type
return f' > node `{node_label}`, requested on list `{list_name}` should be of type `{target_class}`'

return None


class TransferCalculation(CalcJob):
"""Utility to copy files from different FolderData and RemoteData nodes into a single place.
The final destination for these files can be either the local repository (by creating a
new FolderData node to store them) or in the remote computer (by leaving the files in a
new remote folder saved in a RemoteData node).
Only files from the local computer and from remote folders in the same external computer
can be moved at the same time with a single instance of this CalcJob.
The user needs to provide three inputs:
* ``instructions``: a dict node specifying which files to copy from which nodes.
* ``source_nodes``: a dict of nodes, each with a unique identifier label as its key.
* ``metadata.computer``: the computer that contains the remote files and will contain
the final RemoteData node.
The ``instructions`` dict must have the ``retrieve_files`` flag. The CalcJob will create a
new folder in the remote machine (``RemoteData``) and put all the files there and will either:
(1) leave them there (``retrieve_files = False``) or ...
(2) retrieve all the files and store them locally in a ``FolderData`` (``retrieve_files = True``)
The `instructions` dict must also contain at least one list with specifications of which files
to copy and from where. All these lists take tuples of 3 that have the following format:
.. code-block:: python
( source_node_key, path_to_file_in_source, path_to_file_in_target)
where the ``source_node_key`` has to be the respective one used when providing the node in the
``source_nodes`` input nodes dictionary.
The two main lists to include are ``local_files`` (for files to be taken from FolderData nodes)
and ``remote_files`` (for files to be taken from RemoteData nodes). Alternatively, files inside
of RemoteData nodes can instead be put in the ``symlink_files`` list: the only difference is that
files from the first list will be fully copied in the target RemoteData folder, whereas for the
files in second list only a symlink to the original file will be created there. This will only
affect the content of the final RemoteData target folder, but in both cases the full file will
be copied back in the local target FolderData (if ``retrieve_files = True``).
"""

@classmethod
def define(cls, spec):
super().define(spec)

spec.input(
'instructions',
valid_type=orm.Dict,
help='A dictionary containing the `retrieve_files` flag and at least one of the file lists:'
'`local_files`, `remote_files` and/or `symlink_files`.',
validator=validate_instructions,
)
spec.input_namespace(
'source_nodes',
valid_type=(orm.FolderData, orm.RemoteData),
dynamic=True,
help='All the nodes that contain files referenced in the instructions.',
)

# The transfer just needs a computer, the code are resources are set here
spec.inputs.pop('code', None)
spec.inputs['metadata']['computer'].required = True
spec.inputs['metadata']['options']['resources'].default = {
'num_machines': 1,
'num_mpiprocs_per_machine': 1,
}

spec.inputs.validator = validate_transfer_inputs

def prepare_for_submission(self, folder):
source_nodes = self.inputs.source_nodes
instructions = self.inputs.instructions.get_dict()

local_files = instructions.get('local_files', [])
remote_files = instructions.get('remote_files', [])
symlink_files = instructions.get('symlink_files', [])
retrieve_files = instructions.get('retrieve_files')

calc_info = CalcInfo()
calc_info.skip_submit = True
calc_info.codes_info = []
calc_info.local_copy_list = []
calc_info.remote_copy_list = []
calc_info.remote_symlink_list = []
retrieve_paths = []

for source_label, source_relpath, target_relpath in local_files:

source_node = source_nodes[source_label]
retrieve_paths.append(target_relpath)
calc_info.local_copy_list.append((
source_node.uuid,
source_relpath,
target_relpath,
))

for source_label, source_relpath, target_relpath in remote_files:

source_node = source_nodes[source_label]
retrieve_paths.append(target_relpath)
calc_info.remote_copy_list.append((
source_node.computer.uuid,
os.path.join(source_node.get_remote_path(), source_relpath),
target_relpath,
))

for source_label, source_relpath, target_relpath in symlink_files:

source_node = source_nodes[source_label]
retrieve_paths.append(target_relpath)
calc_info.remote_symlink_list.append((
source_node.computer.uuid,
os.path.join(source_node.get_remote_path(), source_relpath),
target_relpath,
))

if retrieve_files:
calc_info.retrieve_list = retrieve_paths
else:
calc_info.retrieve_list = []

return calc_info
5 changes: 4 additions & 1 deletion aiida/common/datastructures.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ class CalcInfo(DefaultFieldsAttributeDict):
already indirectly present in the repository through one of the data nodes passed as input to the calculation.
* codes_info: a list of dictionaries used to pass the info of the execution of a code
* codes_run_mode: a string used to specify the order in which multi codes can be executed
* skip_submit: a flag that, when set to True, orders the engine to skip the submit/update steps (so no code will
run, it will only upload the files and then retrieve/parse).
"""

_default_fields = (
Expand All @@ -98,7 +100,8 @@ class CalcInfo(DefaultFieldsAttributeDict):
'remote_symlink_list',
'provenance_exclude_list',
'codes_info',
'codes_run_mode'
'codes_run_mode',
'skip_submit'
)


Expand Down
16 changes: 10 additions & 6 deletions aiida/engine/processes/calcjobs/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,14 @@ async def do_upload():
raise PreSubmitException('exception occurred in presubmit call') from exception
else:
execmanager.upload_calculation(node, transport, calc_info, folder)
skip_submit = calc_info.skip_submit or False

return
return skip_submit

try:
logger.info(f'scheduled request to upload CalcJob<{node.pk}>')
ignore_exceptions = (plumpy.CancelledError, PreSubmitException)
result = await exponential_backoff_retry(
skip_submit = await exponential_backoff_retry(
do_upload, initial_interval, max_attempts, logger=node.logger, ignore_exceptions=ignore_exceptions
)
except PreSubmitException:
Expand All @@ -96,7 +97,7 @@ async def do_upload():
else:
logger.info(f'uploading CalcJob<{node.pk}> successful')
node.set_state(CalcJobState.SUBMITTING)
return result
return skip_submit


async def task_submit_job(node, transport_queue, cancellable):
Expand Down Expand Up @@ -323,7 +324,7 @@ def load_instance_state(self, saved_state, load_context):

async def execute(self): # pylint: disable=invalid-overridden-method
"""Override the execute coroutine of the base `Waiting` state."""
# pylint: disable=too-many-branches
# pylint: disable=too-many-branches, too-many-statements
node = self.process.node
transport_queue = self.process.runner.transport
command = self.data
Expand All @@ -335,8 +336,11 @@ async def execute(self): # pylint: disable=invalid-overridden-method

if command == UPLOAD_COMMAND:
node.set_process_status(process_status)
await self._launch_task(task_upload_job, self.process, transport_queue)
result = self.submit()
skip_submit = await self._launch_task(task_upload_job, self.process, transport_queue)
if skip_submit:
result = self.retrieve()
else:
result = self.submit()

elif command == SUBMIT_COMMAND:
node.set_process_status(process_status)
Expand Down
73 changes: 73 additions & 0 deletions docs/source/howto/data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -805,3 +805,76 @@ This command will delete both the file repository and the database.
.. danger::

It is not possible to restore a deleted profile unless it was previously backed up!


Transfering data
================

.. danger::

This feature is still in beta version and its API might change in the near future.
It is therefore not recommended that you rely on it for your public/production workflows.

Moreover, feedback on its implementation is much appreciated.

When a calculation job is launched, AiiDA will create a :py:class:`~aiida.orm.nodes.data.remote.RemoteData` node that is attached as an output node to the calculation node with the label ``remote_folder``.
The input files generated by the ``CalcJob`` plugin are copied to this remote folder and, since the job is executed there as well, the code will produce its output files in that same remote folder also.
Since the :py:class:`~aiida.orm.nodes.data.remote.RemoteData` node only explicitly stores the filepath on the remote computer, and not its actual contents, it functions more or less like a symbolic link.
That means that if the remote folder gets deleted, there will be no way to retrieve its contents.
The ``CalcJob`` plugin can for that reason specify some files that should be :ref:`retrieved<topics:calculations:usage:calcjobs:file_lists_retrieve>` and stored locally in a :py:class:`~aiida.orm.nodes.data.folder.FolderData` node for safekeeing, which is attached to the calculation node as an output with the label ``retrieved_folder``.

Although the :ref:`retrieve_list<topics:calculations:usage:calcjobs:file_lists_retrieve>` allows to specify what output files are to be retrieved locally, this has to be done *before* the calculation is submitted.
In order to provide more flexibility in deciding what files of completed calculation jobs are to be stored locally, even after it has terminated, AiiDA ships with a the :py:class:`~aiida.calculations.transfer.TransferCalculation` plugin.
This calculation plugin enables to retrieve files from a remote machine and save them in a local :py:class:`~aiida.orm.nodes.data.folder.FolderData`.
The specifications of what to copy are provided through an input of type

.. code-block:: ipython
In [1]: instructions_cont = {}
... instructions_cont['retrieve_files'] = True
... instructions_cont['symlink_files'] = [
... ('node_keyname', 'source/path/filename', 'target/path/filename'),
... ]
... instructions_node = orm.Dict(dict=instructions_cont)
The ``'source/path/filename'`` and ``'target/path/filename'`` are both relative paths (to their respective folders).
The ``node_keyname`` is a string that will be used when providing the source :py:class:`~aiida.orm.nodes.data.remote.RemoteData` node to the calculation.
You also need to provide the computer between which the transfer will occur:

.. code-block:: ipython
In [2]: transfer_builder = CalculationFactory('core.transfer').get_builder()
... transfer_builder.instructions = instructions_node
... transfer_builder.source_nodes = {'node_keyname': source_node}
... transfer_builder.metadata.computer = source_node.computer
The variable ``source_node`` here corresponds to the ``RemoteData`` node whose contents need to be retrieved.
Finally, you just run or submit the calculation as you would do with any other:

.. code-block:: ipython
In [2]: from aiida.engine import submit
... submit(transfer_builder)
You can also use this to copy local files into a new :py:class:`~aiida.orm.nodes.data.remote.RemoteData` folder.
For this you first have to adapt the instructions to set ``'retrieve_files'`` to ``False`` and use a ``'local_files'`` list instead of the ``'symlink_files'``:

.. code-block:: ipython
In [1]: instructions_cont = {}
... instructions_cont['retrieve_files'] = False
... instructions_cont['local_files'] = [
... ('node_keyname', 'source/path/filename', 'target/path/filename'),
... ]
... instructions_node = orm.Dict(dict=instructions_cont)
It is also relevant to note that, in this case, the ``source_node`` will be of type :py:class:`~aiida.orm.nodes.data.folder.FolderData` so you will have to manually select the computer to where you want to copy the files.
You can do this by looking at your available computers running ``verdi computer list`` and using the label shown to load it with :py:func:`~aiida.orm.utils.load_computer`:

.. code-block:: ipython
In [2]: transfer_builder.metadata.computer = load_computer('some-computer-label')
Both when uploading or retrieving, you can copy multiple files by appending them to the list of the ``local_files`` or ``symlink_files`` keys in the instructions input, respectively.
It is also possible to copy files from any number of nodes by providing several ``source_node`` s, each with a different ``'node_keyname'``.
The target node will always be one (so you can *"gather"* files in a single call, but not *"distribute"* them).
Loading

0 comments on commit 8260b59

Please sign in to comment.