Skip to content

Commit

Permalink
verdi process list: Simplify the daemon load implementation
Browse files Browse the repository at this point in the history
The calculation of the daemon load at the end of the command was using a
utility function that relied manually parsed the response of the daemon
client. However, the `DaemonClient.get_numprocesses` now does this auto-
matically and `aiida.cmdline.commands.cmd_daemon.execute_client_command`
converts this into the correct formatting for stdout.

The `CalculationQueryBuilder` is replaced with a query using the
`QueryBuilder` directly as we only need the count and with the former we
are forced to define a projection and retrieve the results from the
database to count them in Python, which is more expensive an operation.
  • Loading branch information
sphuber committed Mar 30, 2023
1 parent f74ad1c commit 9178370
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 117 deletions.
62 changes: 43 additions & 19 deletions aiida/cmdline/commands/cmd_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ def verdi_process():
@options.PAST_DAYS()
@options.LIMIT()
@options.RAW()
@click.pass_context
@decorators.with_dbenv()
def process_list(
all_entries, group, process_state, process_label, paused, exit_status, failed, past_days, limit, project, raw,
ctx, all_entries, group, process_state, process_label, paused, exit_status, failed, past_days, limit, project, raw,
order_by, order_dir
):
"""Show a list of running or terminated processes.
Expand All @@ -67,8 +68,10 @@ def process_list(
# pylint: disable=too-many-locals
from tabulate import tabulate

from aiida.cmdline.utils.common import check_worker_load, print_last_process_state_change
from aiida.cmdline.commands.cmd_daemon import execute_client_command
from aiida.cmdline.utils.common import print_last_process_state_change
from aiida.engine.daemon.client import get_daemon_client
from aiida.orm import ProcessNode, QueryBuilder
from aiida.tools.query.calculation import CalculationQueryBuilder

relationships = {}
Expand All @@ -82,30 +85,51 @@ def process_list(
relationships=relationships, filters=filters, order_by={order_by: order_dir}, past_days=past_days, limit=limit
)
projected = builder.get_projected(query_set, projections=project)

headers = projected.pop(0)

if raw:
tabulated = tabulate(projected, tablefmt='plain')
echo.echo(tabulated)
else:
tabulated = tabulate(projected, headers=headers)
echo.echo(tabulated)
echo.echo(f'\nTotal results: {len(projected)}\n')
print_last_process_state_change()
return

tabulated = tabulate(projected, headers=headers)
echo.echo(tabulated)
echo.echo(f'\nTotal results: {len(projected)}\n')
print_last_process_state_change()

if not get_daemon_client().is_daemon_running:
echo.echo_warning('The daemon is not running', bold=True)
return

if not get_daemon_client().is_daemon_running:
echo.echo_warning('the daemon is not running', bold=True)
echo.echo_report('Checking daemon load... ', nl=False)
response = execute_client_command('get_numprocesses')

if not response:
# Daemon could not be reached
return

try:
active_workers = response['numprocesses']
except KeyError:
echo.echo_report('No active daemon workers.')
else:
# Second query to get active process count. Currently this is slow but will be fixed with issue #2770. It is
# placed at the end of the command so that the user can Ctrl+C after getting the process table.
slots_per_worker = ctx.obj.config.get_option('daemon.worker_process_slots', scope=ctx.obj.profile.name)
active_processes = QueryBuilder().append(
ProcessNode, filters={
'attributes.process_state': {
'in': ('created', 'waiting', 'running')
}
}
).count()
available_slots = active_workers * slots_per_worker
percent_load = active_processes / available_slots
if percent_load > 0.9: # 90%
echo.echo_warning(f'{percent_load * 100:.0f}%% of the available daemon worker slots have been used!')
echo.echo_warning('Increase the number of workers with `verdi daemon incr`.')
else:
# Second query to get active process count
# Currently this is slow but will be fixed with issue #2770
# We place it at the end so that the user can Ctrl+C after getting the process table.
builder = CalculationQueryBuilder()
filters = builder.get_filters(process_state=('created', 'waiting', 'running'))
query_set = builder.get_query_set(filters=filters)
projected = builder.get_projected(query_set, projections=['pk'])
worker_slot_use = len(projected) - 1
check_worker_load(worker_slot_use)
echo.echo_report(f'Using {percent_load * 100:.0f}%% of the available daemon worker slots.')


@verdi_process.command('show')
Expand Down
61 changes: 0 additions & 61 deletions aiida/cmdline/utils/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,64 +484,3 @@ def build_entries(ports):

echo.echo(tabulate(table, tablefmt='plain'))
echo.echo(style('\nExit codes that invalidate the cache are marked in bold red.\n', italic=True))


def get_num_workers():
"""
Get the number of active daemon workers from the circus client
"""
from aiida.common.exceptions import CircusCallError
from aiida.manage import get_manager

manager = get_manager()
client = manager.get_daemon_client()

if client.is_daemon_running:
response = client.get_numprocesses()
if response['status'] != 'ok':
if response['status'] == client.DAEMON_ERROR_TIMEOUT:
raise CircusCallError('verdi thought the daemon was alive, but the call to the daemon timed-out')
elif response['status'] == client.DAEMON_ERROR_NOT_RUNNING:
raise CircusCallError('verdi thought the daemon was running, but really it is not')
else:
raise CircusCallError
try:
return response['numprocesses']
except KeyError as exc:
raise CircusCallError('Circus did not return the number of daemon processes') from exc


def check_worker_load(active_slots):
"""Log a message with information on the current daemon worker load.
If there are daemon workers active, it logs the current load. If that exceeds 90%, a warning is included with the
suggestion to run ``verdi daemon incr``.
The purpose of this check is to warn the user if they are close to running out of worker slots which could lead to
their processes becoming stuck indefinitely.
:param active_slots: the number of currently active worker slots
"""
from aiida.common.exceptions import CircusCallError
from aiida.manage import get_config_option

warning_threshold = 0.9 # 90%

slots_per_worker = get_config_option('daemon.worker_process_slots')

try:
active_workers = get_num_workers()
except CircusCallError:
echo.echo_critical('Could not contact Circus to get the number of active workers.')

if active_workers is not None:
available_slots = active_workers * slots_per_worker
percent_load = 1.0 if not available_slots else (active_slots / available_slots)
if percent_load > warning_threshold:
echo.echo('') # New line
echo.echo_warning(f'{percent_load * 100:.0f}%% of the available daemon worker slots have been used!')
echo.echo_warning('Increase the number of workers with `verdi daemon incr`.')
else:
echo.echo_report(f'Using {percent_load * 100:.0f}%% of the available daemon worker slots.')
else:
echo.echo_report('No active daemon workers.')
3 changes: 1 addition & 2 deletions tests/cmdline/commands/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,10 @@ def test_list_worker_slot_warning(run_cli_command, monkeypatch):
Test that the if the number of used worker process slots exceeds a threshold,
that the warning message is displayed to the user when running `verdi process list`
"""
from aiida.cmdline.utils import common
from aiida.engine import DaemonClient
from aiida.manage.configuration import get_config

monkeypatch.setattr(common, 'get_num_workers', lambda: 1)
monkeypatch.setattr(DaemonClient, 'get_numprocesses', lambda _: {'numprocesses': 1})
monkeypatch.setattr(DaemonClient, 'is_daemon_running', lambda: True)

# Get the number of allowed processes per worker:
Expand Down
35 changes: 0 additions & 35 deletions tests/cmdline/utils/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
# For further information please visit http://www.aiida.net #
###########################################################################
"""Tests for the :mod:`aiida.cmdline.utils.common` module."""
import pytest

from aiida.cmdline.utils import common
from aiida.common import LinkType
from aiida.engine import Process, calcfunction
Expand Down Expand Up @@ -92,36 +90,3 @@ def test_with_docstring():
common.print_process_info(TestProcessWithDocstring)
common.print_process_info(test_without_docstring)
common.print_process_info(test_with_docstring)


@pytest.mark.parametrize(
'active_workers, active_slots, expected', (
(None, None, 'No active daemon workers.'),
(1, 0, 'Report: Using 0% of the available daemon worker slots.'),
(1, 200, 'Warning: 100% of the available daemon worker slots have been used!'),
)
)
def test_check_worker_load(monkeypatch, capsys, active_workers, active_slots, expected):
"""Test the ``check_worker_load`` function.
We monkeypatch the ``get_num_workers`` method which is called by ``check_worker_load`` to return the number of
active workers that we parametrize.
"""
monkeypatch.setattr(common, 'get_num_workers', lambda: active_workers)
common.check_worker_load(active_slots)
assert expected in capsys.readouterr().out


def test_check_worker_load_fail(monkeypatch, capsys):
"""Test the ``check_worker_load`` function when ``get_num_workers`` will except with ``CircusCallError``."""

def get_num_workers():
from aiida.common.exceptions import CircusCallError
raise CircusCallError

monkeypatch.setattr(common, 'get_num_workers', get_num_workers)

with pytest.raises(SystemExit):
common.check_worker_load(None)

assert 'Could not contact Circus to get the number of active workers.' in capsys.readouterr().err

0 comments on commit 9178370

Please sign in to comment.