Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dragon server enhancement #582

Merged
merged 7 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ To be released at some future point in time

Description

- Improve Dragon server shutdown
- Add dragon runtime installer
- Add launcher based on Dragon
- Reuse Orchestrators within the testing suite to improve performance.
Expand Down Expand Up @@ -62,6 +63,9 @@ Description
- Fix publishing of development docs

Detailed Notes

- The Dragon server will now terminate any process which is still running
when a request of an immediate shutdown is sent. ([SmartSim-PR582](https://github.com/CrayLabs/SmartSim/pull/582))
- Add `--dragon` option to `smart build`. Install appropriate Dragon
runtime from Dragon GitHub release assets.
([SmartSim-PR580](https://github.com/CrayLabs/SmartSim/pull/580))
Expand Down
18 changes: 16 additions & 2 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@
except Exception as e:
print(e)
finally:
io_conn.close()
try:
io_conn.close()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explain why this is a universal catch and print? Is it that io_conn can fail out in a number of ways? Would re-reraising the error result in too much crashing down?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm a bit scared of raising an exception and taking down all the dragon infra. I'll put up a ticket to investigate better ways of handling this.

except Exception as e:
print(e)

Check warning on line 136 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L133-L136

Added lines #L133 - L136 were not covered by tests


class DragonBackend:
Expand Down Expand Up @@ -293,6 +296,9 @@
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(self._hosts)} nodes are available."
return False, message
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message

Check warning on line 301 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L299-L301

Added lines #L299 - L301 were not covered by tests
return True, None

def _allocate_step(
Expand Down Expand Up @@ -565,6 +571,12 @@
self._refresh_statuses()
self._update_shutdown_status()

def _kill_all_running_jobs(self) -> None:
with self._queue_lock:
for step_id, group_info in self._group_infos.items():
if group_info.status not in TERMINAL_STATUSES:
self._stop_requests.append(DragonStopRequest(step_id=step_id))

Check warning on line 578 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L574-L578

Added lines #L574 - L578 were not covered by tests

def update(self) -> None:
"""Update internal data structures, queues, and job statuses"""
logger.debug("Dragon Backend update thread started")
Expand All @@ -579,6 +591,7 @@
logger.debug(str(self))
except ValueError as e:
logger.error(e)

logger.debug("Dragon Backend update thread stopping")

@functools.singledispatchmethod
Expand Down Expand Up @@ -633,7 +646,8 @@
def _(self, request: DragonShutdownRequest) -> DragonShutdownResponse:
self._shutdown_requested = True
self._update_shutdown_status()
self._can_shutdown |= request.immediate
if request.immediate:
self._kill_all_running_jobs()

Check warning on line 650 in smartsim/_core/launcher/dragon/dragonBackend.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonBackend.py#L649-L650

Added lines #L649 - L650 were not covered by tests
self._frontend_shutdown = request.frontend_shutdown
return DragonShutdownResponse()

Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/launcher/dragon/dragonLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@
elif step_map.step_id.startswith("PBS-"):
sublauncher = self._pbs_launcher
else:
raise ValueError(f"Step id {step_map.step_id} is not valid.")
return

Check warning on line 113 in smartsim/_core/launcher/dragon/dragonLauncher.py

View check run for this annotation

Codecov / codecov/patch

smartsim/_core/launcher/dragon/dragonLauncher.py#L113

Added line #L113 was not covered by tests

sublauncher_step_map = StepMap(
step_id=DragonLauncher._unprefix_step_id(step_map.step_id),
Expand Down
File renamed without changes.
67 changes: 60 additions & 7 deletions tests/test_dragon_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from smartsim._core.schemas.dragonRequests import *
from smartsim._core.schemas.dragonResponses import *
from smartsim._core.utils.helpers import create_short_id_str
from smartsim.status import TERMINAL_STATUSES, SmartSimStatus

if t.TYPE_CHECKING:
from smartsim._core.launcher.dragon.dragonBackend import (
Expand Down Expand Up @@ -248,6 +249,31 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert not dragon_backend._running_steps


def test_deny_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
dragon_backend = get_mock_backend(monkeypatch)

dragon_backend._shutdown_requested = True

run_req = DragonRunRequest(
exe="sleep",
exe_args=["5"],
path="/a/fake/path",
nodes=2,
tasks=1,
tasks_per_node=1,
env={},
current_env={},
pmi_enabled=False,
)

run_resp = dragon_backend.process_request(run_req)
assert isinstance(run_resp, DragonRunResponse)
assert run_resp.error_message == "Cannot satisfy request, server is shutting down."
step_id = run_resp.step_id

assert dragon_backend.group_infos[step_id].status == SmartSimStatus.STATUS_FAILED


def test_udpate_status_request(monkeypatch: pytest.MonkeyPatch) -> None:
dragon_backend = get_mock_backend(monkeypatch)

Expand Down Expand Up @@ -296,34 +322,61 @@ def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None:


@pytest.mark.parametrize(
"immediate, frontend_shutdown",
[[True, True], [True, False], [False, True], [False, False]],
"immediate, kill_jobs, frontend_shutdown",
[
[True, True, True],
[True, True, False],
[True, False, True],
[True, False, False],
[False, True, True],
[False, True, False],
],
)
def test_shutdown_request(
monkeypatch: pytest.MonkeyPatch, immediate: bool, frontend_shutdown: bool
monkeypatch: pytest.MonkeyPatch,
immediate: bool,
kill_jobs: bool,
frontend_shutdown: bool,
) -> None:
monkeypatch.setenv("SMARTSIM_FLAG_TELEMETRY", "0")
dragon_backend = get_mock_backend(monkeypatch)
monkeypatch.setattr(dragon_backend, "_cooldown_period", 1)
set_mock_group_infos(monkeypatch, dragon_backend)

if kill_jobs:
for group_info in dragon_backend.group_infos.values():
if not group_info.status in TERMINAL_STATUSES:
group_info.status = SmartSimStatus.STATUS_FAILED
group_info.return_codes = [-9]
group_info.process_group = None
group_info.redir_workers = None
dragon_backend._running_steps.clear()

shutdown_req = DragonShutdownRequest(
immediate=immediate, frontend_shutdown=frontend_shutdown
)
shutdown_resp = dragon_backend.process_request(shutdown_req)

assert dragon_backend._shutdown_requested
if not kill_jobs:
stop_request_ids = (
stop_request.step_id for stop_request in dragon_backend._stop_requests
)
for step_id, group_info in dragon_backend.group_infos.items():
if not group_info.status in TERMINAL_STATUSES:
assert step_id in stop_request_ids

assert isinstance(shutdown_resp, DragonShutdownResponse)
assert dragon_backend._can_shutdown == immediate
assert dragon_backend._shutdown_requested
assert dragon_backend.frontend_shutdown == frontend_shutdown

dragon_backend._update()
assert not dragon_backend.should_shutdown
time.sleep(dragon_backend._cooldown_period + 0.1)
dragon_backend._update()

assert dragon_backend.should_shutdown == immediate
assert dragon_backend._has_cooled_down == immediate
assert dragon_backend._can_shutdown == kill_jobs
assert dragon_backend.should_shutdown == kill_jobs
assert dragon_backend._has_cooled_down == kill_jobs


@pytest.mark.parametrize("telemetry_flag", ["0", "1"])
Expand Down
Loading