Skip to content

Commit

Permalink
verdi process play: only query for active processes with --all fl…
Browse files Browse the repository at this point in the history
…ag (#4671)

The query used to target all process nodes with the `paused` attribute, so even
those in a terminal state. Here an additional filter is added to only query for nodes
in an active process state, because terminal nodes should not be affected. This
should speed up the query in principle.
  • Loading branch information
ramirezfranciscof authored Mar 10, 2021
1 parent 9359b07 commit d762522
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 98 deletions.
3 changes: 2 additions & 1 deletion aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ def process_play(processes, all_entries, timeout, wait):
raise click.BadOptionUsage('all', 'cannot specify individual processes and the `--all` flag at the same time.')

if not processes and all_entries:
builder = QueryBuilder().append(ProcessNode, filters={'attributes.paused': True})
filters = CalculationQueryBuilder().get_filters(process_state=('created', 'waiting', 'running'), paused=True)
builder = QueryBuilder().append(ProcessNode, filters=filters)
processes = builder.all(flat=True)

futures = {}
Expand Down
170 changes: 73 additions & 97 deletions tests/cmdline/commands/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for `verdi process`."""
import subprocess
import sys
import time
import asyncio
from concurrent.futures import Future
Expand All @@ -23,7 +21,6 @@
from aiida.cmdline.commands import cmd_process
from aiida.common.links import LinkType
from aiida.common.log import LOG_LEVEL_REPORT
from aiida.manage.manager import get_manager
from aiida.orm import CalcJobNode, WorkflowNode, WorkFunctionNode, WorkChainNode

from tests.utils import processes as test_processes
Expand All @@ -33,100 +30,6 @@ def get_result_lines(result):
return [e for e in result.output.split('\n') if e]


class TestVerdiProcessDaemon(AiidaTestCase):
"""Tests for `verdi process` that require a running daemon."""

TEST_TIMEOUT = 5.

def setUp(self):
super().setUp()
from aiida.cmdline.utils.common import get_env_with_venv_bin
from aiida.engine.daemon.client import DaemonClient
from aiida.manage.configuration import get_config

# Add the current python path to the environment that will be used for the daemon sub process. This is necessary
# to guarantee the daemon can also import all the classes that are defined in this `tests` module.
env = get_env_with_venv_bin()
env['PYTHONPATH'] = ':'.join(sys.path)

profile = get_config().current_profile
self.daemon_client = DaemonClient(profile)
self.daemon = subprocess.Popen(
self.daemon_client.cmd_string.split(), stderr=sys.stderr, stdout=sys.stdout, env=env
)
self.runner = get_manager().create_runner(rmq_submit=True)
self.cli_runner = CliRunner()

def tearDown(self):
import os
import signal

os.kill(self.daemon.pid, signal.SIGTERM)
super().tearDown()

@pytest.mark.skip(reason='fails to complete randomly (see issue #4731)')
@pytest.mark.requires_rmq
def test_pause_play_kill(self):
"""
Test the pause/play/kill commands
"""
# pylint: disable=no-member
from aiida.orm import load_node

calc = self.runner.submit(test_processes.WaitProcess)
start_time = time.time()
while calc.process_state is not plumpy.ProcessState.WAITING:
if time.time() - start_time >= self.TEST_TIMEOUT:
self.fail('Timed out waiting for process to enter waiting state')

# Make sure that calling any command on a non-existing process id will not except but print an error
# To simulate a process without a corresponding task, we simply create a node and store it. This node will not
# have an associated task at RabbitMQ, but it will be a valid `ProcessNode` so it will pass the initial
# filtering of the `verdi process` commands
orphaned_node = WorkFunctionNode().store()
non_existing_process_id = str(orphaned_node.pk)
for command in [cmd_process.process_pause, cmd_process.process_play, cmd_process.process_kill]:
result = self.cli_runner.invoke(command, [non_existing_process_id])
self.assertClickResultNoException(result)
self.assertIn('Error:', result.output)

self.assertFalse(calc.paused)
result = self.cli_runner.invoke(cmd_process.process_pause, [str(calc.pk)])
self.assertIsNone(result.exception, result.output)

# We need to make sure that the process is picked up by the daemon and put in the Waiting state before we start
# running the CLI commands, so we add a broadcast subscriber for the state change, which when hit will set the
# future to True. This will be our signal that we can start testing
waiting_future = Future()
filters = kiwipy.BroadcastFilter(
lambda *args, **kwargs: waiting_future.set_result(True), sender=calc.pk, subject='state_changed.*.waiting'
)
self.runner.communicator.add_broadcast_subscriber(filters)

# The process may already have been picked up by the daemon and put in the waiting state, before the subscriber
# got the chance to attach itself, making it have missed the broadcast. That's why check if the state is already
# waiting, and if not, we run the loop of the runner to start waiting for the broadcast message. To make sure
# that we have the latest state of the node as it is in the database, we force refresh it by reloading it.
calc = load_node(calc.pk)
if calc.process_state != plumpy.ProcessState.WAITING:
self.runner.loop.run_until_complete(asyncio.wait_for(waiting_future, timeout=5.0))

# Here we now that the process is with the daemon runner and in the waiting state so we can starting running
# the `verdi process` commands that we want to test
result = self.cli_runner.invoke(cmd_process.process_pause, ['--wait', str(calc.pk)])
self.assertIsNone(result.exception, result.output)
self.assertTrue(calc.paused)

result = self.cli_runner.invoke(cmd_process.process_play, ['--wait', str(calc.pk)])
self.assertIsNone(result.exception, result.output)
self.assertFalse(calc.paused)

result = self.cli_runner.invoke(cmd_process.process_kill, ['--wait', str(calc.pk)])
self.assertIsNone(result.exception, result.output)
self.assertTrue(calc.is_terminated)
self.assertTrue(calc.is_killed)


class TestVerdiProcess(AiidaTestCase):
"""Tests for `verdi process`."""

Expand Down Expand Up @@ -490,3 +393,76 @@ def test_multiple_processes(self):
self.assertIn('No callers found', get_result_lines(result)[0])
self.assertIn(str(self.node_root.pk), get_result_lines(result)[1])
self.assertIn(str(self.node_root.pk), get_result_lines(result)[2])


@pytest.mark.skip(reason='fails to complete randomly (see issue #4731)')
@pytest.mark.requires_rmq
@pytest.mark.usefixtures('with_daemon', 'clear_database_before_test')
@pytest.mark.parametrize('cmd_try_all', (True, False))
def test_pause_play_kill(cmd_try_all, run_cli_command):
"""
Test the pause/play/kill commands
"""
# pylint: disable=no-member, too-many-locals
from aiida.cmdline.commands.cmd_process import process_pause, process_play, process_kill
from aiida.manage.manager import get_manager
from aiida.engine import ProcessState
from aiida.orm import load_node

runner = get_manager().create_runner(rmq_submit=True)
calc = runner.submit(test_processes.WaitProcess)

test_daemon_timeout = 5.
start_time = time.time()
while calc.process_state is not plumpy.ProcessState.WAITING:
if time.time() - start_time >= test_daemon_timeout:
raise RuntimeError('Timed out waiting for process to enter waiting state')

# Make sure that calling any command on a non-existing process id will not except but print an error
# To simulate a process without a corresponding task, we simply create a node and store it. This node will not
# have an associated task at RabbitMQ, but it will be a valid `ProcessNode` with and active state, so it will
# pass the initial filtering of the `verdi process` commands
orphaned_node = WorkFunctionNode()
orphaned_node.set_process_state(ProcessState.RUNNING)
orphaned_node.store()
non_existing_process_id = str(orphaned_node.pk)
for command in [process_pause, process_play, process_kill]:
result = run_cli_command(command, [non_existing_process_id])
assert 'Error:' in result.output

assert not calc.paused
result = run_cli_command(process_pause, [str(calc.pk)])

# We need to make sure that the process is picked up by the daemon and put in the Waiting state before we start
# running the CLI commands, so we add a broadcast subscriber for the state change, which when hit will set the
# future to True. This will be our signal that we can start testing
waiting_future = Future()
filters = kiwipy.BroadcastFilter(
lambda *args, **kwargs: waiting_future.set_result(True), sender=calc.pk, subject='state_changed.*.waiting'
)
runner.communicator.add_broadcast_subscriber(filters)

# The process may already have been picked up by the daemon and put in the waiting state, before the subscriber
# got the chance to attach itself, making it have missed the broadcast. That's why check if the state is already
# waiting, and if not, we run the loop of the runner to start waiting for the broadcast message. To make sure
# that we have the latest state of the node as it is in the database, we force refresh it by reloading it.
calc = load_node(calc.pk)
if calc.process_state != plumpy.ProcessState.WAITING:
runner.loop.run_until_complete(asyncio.wait_for(waiting_future, timeout=5.0))

# Here we now that the process is with the daemon runner and in the waiting state so we can starting running
# the `verdi process` commands that we want to test
result = run_cli_command(process_pause, ['--wait', str(calc.pk)])
assert calc.paused

if cmd_try_all:
cmd_option = '--all'
else:
cmd_option = str(calc.pk)

result = run_cli_command(process_play, ['--wait', cmd_option])
assert not calc.paused

result = run_cli_command(process_kill, ['--wait', str(calc.pk)])
assert calc.is_terminated
assert calc.is_killed
30 changes: 30 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,33 @@ def override_logging():
config.unset_option('logging.aiida_loglevel')
config.unset_option('logging.db_loglevel')
configure_logging(with_orm=True)


@pytest.fixture
def with_daemon():
"""Starts the daemon process and then makes sure to kill it once the test is done."""
import sys
import signal
import subprocess

from aiida.engine.daemon.client import DaemonClient
from aiida.cmdline.utils.common import get_env_with_venv_bin

# Add the current python path to the environment that will be used for the daemon sub process.
# This is necessary to guarantee the daemon can also import all the classes that are defined
# in this `tests` module.
env = get_env_with_venv_bin()
env['PYTHONPATH'] = ':'.join(sys.path)

profile = get_config().current_profile
daemon = subprocess.Popen(
DaemonClient(profile).cmd_string.split(),
stderr=sys.stderr,
stdout=sys.stdout,
env=env,
)

yield

# Note this will always be executed after the yield no matter what happened in the test that used this fixture.
os.kill(daemon.pid, signal.SIGTERM)

0 comments on commit d762522

Please sign in to comment.