Skip to content

Commit

Permalink
Improve testing robustness (#381)
Browse files Browse the repository at this point in the history
A number of defects were found in the testing that primarily affected
the tests run on HPC platforms. Many of these were uncovered during
the recent introductions of major features that touched on various
aspects of the testing suite. Most fixes are focused primarily on changes
to the actual tests as opposed to fundamental changes in the underlying
code base. Some of the major changes are:
- Most tests are run in their own experiment directory avoiding the
  overwrite of directories between tests
- When attempting to run multi-gpu tests, a bug (presumably in RedisAI)
   was found that prevents the setting of multiple GPUs when using the
   Tensorflow backend. These tests now only use a single GPU regardless
   of the value of `SMARTSIM_TEST_NUM_GPUS`
- Ensures that tests that spin up an `Orchestrator` always stop it before
   exiting, either due to success or failure of a different component of the
   test

Lastly, changes were also made to `QsubBatchSettings` to add support
for PBS-like platforms that use the `resources` tag to define additional
resources and/or otherwise customize PBS batch jobs

[ committed by @al-rigazzi and @ashao ]
[ reviewed by: @MattToast ]

Co-authored-by: Alessandro Rigazzi <al.rigazzi@hpe.com>
Co-authored-by: Andrew Shao <andrew.shao@hpe.com>
  • Loading branch information
ashao and al-rigazzi authored Dec 11, 2023
1 parent d8fba1b commit 1b92adf
Show file tree
Hide file tree
Showing 62 changed files with 1,033 additions and 932 deletions.
224 changes: 103 additions & 121 deletions conftest.py

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions smartsim/_core/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import json
import os
import psutil
import typing as t
Expand Down Expand Up @@ -179,6 +180,20 @@ def test_num_gpus(self) -> int: # pragma: no cover
def test_port(self) -> int: # pragma: no cover
return int(os.environ.get("SMARTSIM_TEST_PORT", 6780))

@property
def test_batch_resources(self) -> t.Dict[t.Any,t.Any]: # pragma: no cover
resource_str = os.environ.get("SMARTSIM_TEST_BATCH_RESOURCES", "{}")
resources = json.loads(resource_str)
if not isinstance(resources, dict):
raise TypeError(
(
"SMARTSIM_TEST_BATCH_RESOURCES was not interpreted as a "
"dictionary, check to make sure that it is a valid "
f"JSON string: {resource_str}"
)
)
return resources

@property
def test_interface(self) -> t.List[str]: # pragma: no cover
if interfaces_cfg := os.environ.get("SMARTSIM_TEST_INTERFACE", None):
Expand Down
63 changes: 39 additions & 24 deletions smartsim/_core/control/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from __future__ import annotations

import itertools
import os.path as osp
import pathlib
import pickle
Expand All @@ -39,9 +40,16 @@

from smartredis import Client, ConfigOptions

from smartsim._core.utils.network import get_ip_from_host

from ..._core.launcher.step import Step
from ..._core.utils.helpers import unpack_colo_db_identifier, unpack_db_identifier
from ..._core.utils.redis import db_is_active, set_ml_model, set_script, shutdown_db
from ..._core.utils.redis import (
db_is_active,
set_ml_model,
set_script,
shutdown_db_node,
)
from ...database import Orchestrator
from ...entity import (
Ensemble,
Expand Down Expand Up @@ -235,12 +243,22 @@ def stop_db(self, db: Orchestrator) -> None:
if db.batch:
self.stop_entity(db)
else:
shutdown_db(db.hosts, db.ports)
with JM_LOCK:
for entity in db:
job = self._jobs[entity.name]
job.set_status(STATUS_CANCELLED, "", 0, output=None, error=None)
self._jobs.move_to_completed(job)
for node in db.entities:
for host_ip, port in itertools.product(
(get_ip_from_host(host) for host in node.hosts), db.ports
):
retcode, _, _ = shutdown_db_node(host_ip, port)
# Sometimes the DB will not shutdown (unless we force NOSAVE)
if retcode != 0:
self.stop_entity(node)
continue

job = self._jobs[node.name]
job.set_status(STATUS_CANCELLED, "", 0, output=None, error=None)
self._jobs.move_to_completed(job)

db.reset_hosts()

def stop_entity_list(self, entity_list: EntitySequence[SmartSimEntity]) -> None:
"""Stop an instance of an entity list
Expand Down Expand Up @@ -358,9 +376,9 @@ def _launch(
for orchestrator in manifest.dbs:
for key in self._jobs.get_db_host_addresses():
_, db_id = unpack_db_identifier(key, "_")
if orchestrator.name == db_id:
if orchestrator.db_identifier == db_id:
raise SSDBIDConflictError(
f"Database identifier {orchestrator.name}"
f"Database identifier {orchestrator.db_identifier}"
" has already been used. Pass in a unique"
" name for db_identifier"
)
Expand Down Expand Up @@ -600,30 +618,27 @@ def _prep_entity_client_env(self, entity: Model) -> None:

for db_id, addresses in address_dict.items():
db_name, _ = unpack_db_identifier(db_id, "_")

if addresses:
if len(addresses) <= 128:
client_env[f"SSDB{db_name}"] = ",".join(addresses)
else:
# Cap max length of SSDB
client_env[f"SSDB{db_name}"] = ",".join(addresses[:128])
if entity.incoming_entities:
client_env[f"SSKEYIN{db_name}"] = ",".join(
[in_entity.name for in_entity in entity.incoming_entities]
)
if entity.query_key_prefixing():
client_env[f"SSKEYOUT{db_name}"] = entity.name
# Cap max length of SSDB
client_env[f"SSDB{db_name}"] = ",".join(addresses[:128])

# Retrieve num_shards to append to client env
client_env[f"SR_DB_TYPE{db_name}"] = (
CLUSTERED if len(addresses) > 1 else STANDALONE
)

# Retrieve num_shards to append to client env
client_env[f"SR_DB_TYPE{db_name}"] = (
CLUSTERED if len(addresses) > 1 else STANDALONE
if entity.incoming_entities:
client_env["SSKEYIN"] = ",".join(
[in_entity.name for in_entity in entity.incoming_entities]
)
if entity.query_key_prefixing():
client_env["SSKEYOUT"] = entity.name

# Set address to local if it's a colocated model
if entity.colocated and entity.run_settings.colocated_db_settings is not None:
db_name_colo = entity.run_settings.colocated_db_settings["db_identifier"]

for key in self._jobs.get_db_host_addresses():
for key in address_dict:
_, db_id = unpack_db_identifier(key, "_")
if db_name_colo == db_id:
raise SSDBIDConflictError(
Expand Down
29 changes: 17 additions & 12 deletions smartsim/_core/control/jobmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,8 @@

from ...database import Orchestrator
from ...entity import DBNode, SmartSimEntity, EntitySequence
from ...error import SmartSimError
from ...log import get_logger
from ...status import TERMINAL_STATUSES
from ...status import TERMINAL_STATUSES, STATUS_NEVER_STARTED
from ..config import CONFIG
from ..launcher import LocalLauncher, Launcher
from ..utils.network import get_ip_from_host
Expand Down Expand Up @@ -160,6 +159,13 @@ def __call__(self) -> t.Dict[str, Job]:
all_jobs = {**self.jobs, **self.db_jobs}
return all_jobs

def __contains__(self, key: str) -> bool:
try:
self[key] # pylint: disable=pointless-statement
return True
except KeyError:
return False

def add_job(
self,
job_name: str,
Expand Down Expand Up @@ -242,17 +248,14 @@ def get_status(
:returns: tuple of status
"""
with self._lock:
try:
if entity.name in self.completed:
return self.completed[entity.name].status
if entity.name in self.completed:
return self.completed[entity.name].status

if entity.name in self:
job: Job = self[entity.name] # locked
except KeyError:
raise SmartSimError(
f"Entity {entity.name} has not been launched in this Experiment"
) from None
return job.status

return job.status
return STATUS_NEVER_STARTED

def set_launcher(self, launcher: Launcher) -> None:
"""Set the launcher of the job manager to a specific launcher instance
Expand Down Expand Up @@ -312,7 +315,7 @@ def get_db_host_addresses(self) -> t.Dict[str, t.List[str]]:
:rtype: Dict[str, list]
"""

address_dict = {}
address_dict: t.Dict[str, t.List[str]] = {}
for db_job in self.db_jobs.values():
addresses = []
if isinstance(db_job.entity, (DBNode, Orchestrator)):
Expand All @@ -321,7 +324,9 @@ def get_db_host_addresses(self) -> t.Dict[str, t.List[str]]:
ip_addr = get_ip_from_host(combine[0])
addresses.append(":".join((ip_addr, str(combine[1]))))

address_dict.update({db_entity.name: addresses})
dict_entry: t.List[str] = address_dict.get(db_entity.db_identifier, [])
dict_entry.extend(addresses)
address_dict[db_entity.db_identifier] = dict_entry

return address_dict

Expand Down
24 changes: 12 additions & 12 deletions smartsim/_core/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,28 +40,28 @@


def unpack_db_identifier(db_id: str, token: str) -> t.Tuple[str, str]:
"""Unpack the unformatted database identifier using the token,
and format for env variable suffix
:db_id: the unformatted database identifier eg. identifier_1_0
:token: character '_' or '-' to use to unpack the database identifier
:return: db suffix, and formatted db_id eg. _identifier_1, identifier_1
"""Unpack the unformatted database identifier
and format for env variable suffix using the token
:param db_id: the unformatted database identifier eg. identifier_1
:type db_id: str
:param token: character to use to construct the db suffix
:type token: str
:return: db id suffix and formatted db_id e.g. ("_identifier_1", "identifier_1")
:rtype: (str, str)
"""

if db_id == "orchestrator":
return "", ""
db_id = "_".join(db_id.split(token)[:-1])
# if unpacked db_id is default, return empty
if db_id == "orchestrator":
# if db_id is default after unpack, return empty
return "", ""
db_name_suffix = "_" + db_id
db_name_suffix = token + db_id
return db_name_suffix, db_id


def unpack_colo_db_identifier(db_id: str) -> str:
"""Create database identifier suffix for colocated database
:db_id: the unformatted database identifier
:param db_id: the unformatted database identifier
:type db_id: str
:return: db suffix
:rtype: str
"""
return "_" + db_id if db_id else ""

Expand Down
44 changes: 21 additions & 23 deletions smartsim/_core/utils/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import itertools
import logging
import redis
import time
Expand Down Expand Up @@ -219,31 +218,30 @@ def set_script(db_script: DBScript, client: Client) -> None:
raise error


def shutdown_db(hosts: t.List[str], ports: t.List[int]) -> None: # cov-wlm
"""Send shutdown signal to cluster instances.
def shutdown_db_node(host_ip: str, port: int) -> t.Tuple[int, str, str]: # cov-wlm
"""Send shutdown signal to DB node.
Should only be used in the case where cluster deallocation
needs to occur manually. Usually, the SmartSim task manager
needs to occur manually. Usually, the SmartSim job manager
will take care of this automatically.
:param hosts: List of hostnames to connect to
:type hosts: List[str]
:param ports: List of ports for each hostname
:type ports: List[int]
:raises SmartSimError: if cluster creation fails
:param host_ip: IP of host to connect to
:type hosts: str
:param ports: Port to which node is listening
:type ports: int
:return: returncode, output, and error of the process
:rtype: tuple of (int, str, str)
"""
for host_ip, port in itertools.product(
(get_ip_from_host(host) for host in hosts), ports
):
# call cluster command
redis_cli = CONFIG.database_cli
cmd = [redis_cli, "-h", host_ip, "-p", str(port), "shutdown"]
returncode, out, err = execute_cmd(
cmd, proc_input="yes", shell=False, timeout=10
)
redis_cli = CONFIG.database_cli
cmd = [redis_cli, "-h", host_ip, "-p", str(port), "shutdown"]
returncode, out, err = execute_cmd(
cmd, proc_input="yes", shell=False, timeout=10
)

if returncode != 0:
logger.error(out)
logger.error(err)
elif out:
logger.debug(out)

if returncode != 0:
logger.error(out)
logger.error(err)
else:
logger.debug(out)
return returncode, out, err
Loading

0 comments on commit 1b92adf

Please sign in to comment.