Skip to content

Commit

Permalink
Dragon server enhancement (#582)
Browse files Browse the repository at this point in the history
The Dragon server could fail, dumping a core file, if it was shut down
before all spawned Process Groups completed. This PR fixes such
behavior: the immediate flag on the `DragonShutdownRequest` now requests
every non-terminated job to be stopped.

[ committed by @al-rigazzi ]
[ reviewed by @ashao ]
  • Loading branch information
al-rigazzi authored May 14, 2024
1 parent 781d4b6 commit 4e7302e
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 10 deletions.
4 changes: 4 additions & 0 deletions doc/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ To be released at some future point in time

Description

- Improve Dragon server shutdown
- Add dragon runtime installer
- Add launcher based on Dragon
- Reuse Orchestrators within the testing suite to improve performance.
Expand Down Expand Up @@ -62,6 +63,9 @@ Description
- Fix publishing of development docs

Detailed Notes

- The Dragon server will now terminate any process which is still running
when a request of an immediate shutdown is sent. ([SmartSim-PR582](https://github.com/CrayLabs/SmartSim/pull/582))
- Add `--dragon` option to `smart build`. Install appropriate Dragon
runtime from Dragon GitHub release assets.
([SmartSim-PR580](https://github.com/CrayLabs/SmartSim/pull/580))
Expand Down
18 changes: 16 additions & 2 deletions smartsim/_core/launcher/dragon/dragonBackend.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,10 @@ def redir_worker(io_conn: dragon_connection.Connection, file_path: str) -> None:
except Exception as e:
print(e)
finally:
io_conn.close()
try:
io_conn.close()
except Exception as e:
print(e)


class DragonBackend:
Expand Down Expand Up @@ -293,6 +296,9 @@ def _can_honor(self, request: DragonRunRequest) -> t.Tuple[bool, t.Optional[str]
message = f"Cannot satisfy request. Requested {request.nodes} nodes, "
message += f"but only {len(self._hosts)} nodes are available."
return False, message
if self._shutdown_requested:
message = "Cannot satisfy request, server is shutting down."
return False, message
return True, None

def _allocate_step(
Expand Down Expand Up @@ -565,6 +571,12 @@ def _update(self) -> None:
self._refresh_statuses()
self._update_shutdown_status()

def _kill_all_running_jobs(self) -> None:
with self._queue_lock:
for step_id, group_info in self._group_infos.items():
if group_info.status not in TERMINAL_STATUSES:
self._stop_requests.append(DragonStopRequest(step_id=step_id))

def update(self) -> None:
"""Update internal data structures, queues, and job statuses"""
logger.debug("Dragon Backend update thread started")
Expand All @@ -579,6 +591,7 @@ def update(self) -> None:
logger.debug(str(self))
except ValueError as e:
logger.error(e)

logger.debug("Dragon Backend update thread stopping")

@functools.singledispatchmethod
Expand Down Expand Up @@ -633,7 +646,8 @@ def _(self, request: DragonHandshakeRequest) -> DragonHandshakeResponse:
def _(self, request: DragonShutdownRequest) -> DragonShutdownResponse:
self._shutdown_requested = True
self._update_shutdown_status()
self._can_shutdown |= request.immediate
if request.immediate:
self._kill_all_running_jobs()
self._frontend_shutdown = request.frontend_shutdown
return DragonShutdownResponse()

Expand Down
2 changes: 1 addition & 1 deletion smartsim/_core/launcher/dragon/dragonLauncher.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ def add_step_to_mapping_table(self, name: str, step_map: StepMap) -> None:
elif step_map.step_id.startswith("PBS-"):
sublauncher = self._pbs_launcher
else:
raise ValueError(f"Step id {step_map.step_id} is not valid.")
return

sublauncher_step_map = StepMap(
step_id=DragonLauncher._unprefix_step_id(step_map.step_id),
Expand Down
File renamed without changes.
67 changes: 60 additions & 7 deletions tests/test_dragon_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from smartsim._core.schemas.dragonRequests import *
from smartsim._core.schemas.dragonResponses import *
from smartsim._core.utils.helpers import create_short_id_str
from smartsim.status import TERMINAL_STATUSES, SmartSimStatus

if t.TYPE_CHECKING:
from smartsim._core.launcher.dragon.dragonBackend import (
Expand Down Expand Up @@ -248,6 +249,31 @@ def test_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
assert not dragon_backend._running_steps


def test_deny_run_request(monkeypatch: pytest.MonkeyPatch) -> None:
dragon_backend = get_mock_backend(monkeypatch)

dragon_backend._shutdown_requested = True

run_req = DragonRunRequest(
exe="sleep",
exe_args=["5"],
path="/a/fake/path",
nodes=2,
tasks=1,
tasks_per_node=1,
env={},
current_env={},
pmi_enabled=False,
)

run_resp = dragon_backend.process_request(run_req)
assert isinstance(run_resp, DragonRunResponse)
assert run_resp.error_message == "Cannot satisfy request, server is shutting down."
step_id = run_resp.step_id

assert dragon_backend.group_infos[step_id].status == SmartSimStatus.STATUS_FAILED


def test_udpate_status_request(monkeypatch: pytest.MonkeyPatch) -> None:
dragon_backend = get_mock_backend(monkeypatch)

Expand Down Expand Up @@ -296,34 +322,61 @@ def test_stop_request(monkeypatch: pytest.MonkeyPatch) -> None:


@pytest.mark.parametrize(
"immediate, frontend_shutdown",
[[True, True], [True, False], [False, True], [False, False]],
"immediate, kill_jobs, frontend_shutdown",
[
[True, True, True],
[True, True, False],
[True, False, True],
[True, False, False],
[False, True, True],
[False, True, False],
],
)
def test_shutdown_request(
monkeypatch: pytest.MonkeyPatch, immediate: bool, frontend_shutdown: bool
monkeypatch: pytest.MonkeyPatch,
immediate: bool,
kill_jobs: bool,
frontend_shutdown: bool,
) -> None:
monkeypatch.setenv("SMARTSIM_FLAG_TELEMETRY", "0")
dragon_backend = get_mock_backend(monkeypatch)
monkeypatch.setattr(dragon_backend, "_cooldown_period", 1)
set_mock_group_infos(monkeypatch, dragon_backend)

if kill_jobs:
for group_info in dragon_backend.group_infos.values():
if not group_info.status in TERMINAL_STATUSES:
group_info.status = SmartSimStatus.STATUS_FAILED
group_info.return_codes = [-9]
group_info.process_group = None
group_info.redir_workers = None
dragon_backend._running_steps.clear()

shutdown_req = DragonShutdownRequest(
immediate=immediate, frontend_shutdown=frontend_shutdown
)
shutdown_resp = dragon_backend.process_request(shutdown_req)

assert dragon_backend._shutdown_requested
if not kill_jobs:
stop_request_ids = (
stop_request.step_id for stop_request in dragon_backend._stop_requests
)
for step_id, group_info in dragon_backend.group_infos.items():
if not group_info.status in TERMINAL_STATUSES:
assert step_id in stop_request_ids

assert isinstance(shutdown_resp, DragonShutdownResponse)
assert dragon_backend._can_shutdown == immediate
assert dragon_backend._shutdown_requested
assert dragon_backend.frontend_shutdown == frontend_shutdown

dragon_backend._update()
assert not dragon_backend.should_shutdown
time.sleep(dragon_backend._cooldown_period + 0.1)
dragon_backend._update()

assert dragon_backend.should_shutdown == immediate
assert dragon_backend._has_cooled_down == immediate
assert dragon_backend._can_shutdown == kill_jobs
assert dragon_backend.should_shutdown == kill_jobs
assert dragon_backend._has_cooled_down == kill_jobs


@pytest.mark.parametrize("telemetry_flag", ["0", "1"])
Expand Down

0 comments on commit 4e7302e

Please sign in to comment.