Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return status and actual timestamp from all environments #671

Merged
merged 9 commits into from
Feb 13, 2024
22 changes: 12 additions & 10 deletions mlos_bench/mlos_bench/environments/base_environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ def teardown(self) -> None:
assert self._in_context
self._is_ready = False

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Execute the run script for this environment.

Expand All @@ -387,30 +387,32 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:

Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
# Make sure we create a context before invoking setup/run/status/teardown
assert self._in_context
(status, _) = self.status()
return (status, None)
(status, timestamp, _) = self.status()
return (status, timestamp, None)

def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:
def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
"""
Check the status of the benchmark environment.

Returns
-------
(benchmark_status, telemetry) : (Status, list)
A pair of (benchmark status, telemetry) values.
(benchmark_status, timestamp, telemetry) : (Status, datetime, list)
3-tuple of (benchmark status, timestamp, telemetry) values.
`timestamp` is UTC time stamp of the status; it's current time by default.
`telemetry` is a list (maybe empty) of (timestamp, metric, value) triplets.
"""
# Make sure we create a context before invoking setup/run/status/teardown
assert self._in_context
timestamp = datetime.utcnow()
if self._is_ready:
return (Status.READY, [])
return (Status.READY, timestamp, [])
_LOG.warning("Environment not ready: %s", self)
return (Status.PENDING, [])
return (Status.PENDING, timestamp, [])
33 changes: 18 additions & 15 deletions mlos_bench/mlos_bench/environments/composite_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,61 +179,64 @@ def teardown(self) -> None:
env_context.teardown()
super().teardown()

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Submit a new experiment to the environment.
Return the result of the *last* child environment if successful,
or the status of the last failed environment otherwise.

Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
_LOG.info("Run: %s", self._children)
(status, metrics) = super().run()
(status, timestamp, metrics) = super().run()
if not status.is_ready():
return (status, metrics)
return (status, timestamp, metrics)

joint_metrics = {}
for env_context in self._child_contexts:
_LOG.debug("Child env. run: %s", env_context)
(status, metrics) = env_context.run()
(status, timestamp, metrics) = env_context.run()
_LOG.debug("Child env. run results: %s :: %s %s", env_context, status, metrics)
if not status.is_good():
_LOG.info("Run failed: %s :: %s", self, status)
return (status, None)
return (status, timestamp, None)
joint_metrics.update(metrics or {})

_LOG.info("Run completed: %s :: %s %s", self, status, joint_metrics)
return (status, joint_metrics)
# Return the status and the timestamp of the last child environment.
return (status, timestamp, joint_metrics)

def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:
def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
"""
Check the status of the benchmark environment.

Returns
-------
(benchmark_status, telemetry) : (Status, list)
A pair of (benchmark status, telemetry) values.
(benchmark_status, timestamp, telemetry) : (Status, datetime, list)
3-tuple of (benchmark status, timestamp, telemetry) values.
`timestamp` is UTC time stamp of the status; it's current time by default.
`telemetry` is a list (maybe empty) of (timestamp, metric, value) triplets.
"""
(status, telemetry) = super().status()
(status, timestamp, telemetry) = super().status()
if not status.is_ready():
return (status, telemetry)
return (status, timestamp, telemetry)

joint_telemetry = []
final_status = None
for env_context in self._child_contexts:
(status, telemetry) = env_context.status()
(status, timestamp, telemetry) = env_context.status()
_LOG.debug("Child env. status: %s :: %s", env_context, status)
joint_telemetry.extend(telemetry)
if not status.is_good() and final_status is None:
final_status = status

final_status = final_status or status
_LOG.info("Final status: %s :: %s", self, final_status)
return (final_status, joint_telemetry)
# Return the status and the timestamp of the last child environment.
return (final_status, timestamp, joint_telemetry)
26 changes: 14 additions & 12 deletions mlos_bench/mlos_bench/environments/local/local_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,19 @@ def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -

return self._is_ready

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Run a script in the local scheduler environment.

Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
(status, _) = result = super().run()
(status, timestamp, _) = result = super().run()
if not status.is_ready():
return result

Expand All @@ -174,13 +174,13 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
if self._script_run:
(return_code, output) = self._local_exec(self._script_run, self._temp_dir)
if return_code != 0:
return (Status.FAILED, None)
return (Status.FAILED, timestamp, None)
stdout_data = self._extract_stdout_results(output.get("stdout", ""))

# FIXME: We should not be assuming that the only output file type is a CSV.
if not self._read_results_file:
_LOG.debug("Not reading the data at: %s", self)
return (Status.SUCCEEDED, stdout_data)
return (Status.SUCCEEDED, timestamp, stdout_data)

data = self._normalize_columns(pandas.read_csv(
self._config_loader_service.resolve_path(
Expand All @@ -201,7 +201,7 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:

stdout_data.update(data.iloc[-1].to_dict())
_LOG.info("Local run complete: %s ::\n%s", self, stdout_data)
return (Status.SUCCEEDED, stdout_data)
return (Status.SUCCEEDED, timestamp, stdout_data)

@staticmethod
def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame:
Expand All @@ -215,17 +215,19 @@ def _normalize_columns(data: pandas.DataFrame) -> pandas.DataFrame:
data.rename(str.rstrip, axis='columns', inplace=True)
return data

def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:
def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:

(status, _) = super().status()
(status, timestamp, _) = super().status()
if not (self._is_ready and self._read_telemetry_file):
return (status, [])
return (status, timestamp, [])

assert self._temp_dir is not None
try:
fname = self._config_loader_service.resolve_path(
self._read_telemetry_file, extra_paths=[self._temp_dir])

# TODO: Use the timestamp of the CSV file as our status timestamp?

# FIXME: We should not be assuming that the only output file type is a CSV.
data = self._normalize_columns(
pandas.read_csv(fname, index_col=False, parse_dates=[0]))
Expand All @@ -241,11 +243,11 @@ def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:

except FileNotFoundError as ex:
_LOG.warning("Telemetry CSV file not found: %s :: %s", self._read_telemetry_file, ex)
return (status, [])
return (status, timestamp, [])

_LOG.debug("Read telemetry data:\n%s", data)
col_dtypes: Mapping[int, Type] = {0: datetime}
return (status, [
return (status, timestamp, [
(pandas.Timestamp(ts).to_pydatetime(), metric, value)
for (ts, metric, value) in data.to_records(index=False, column_dtypes=col_dtypes)
])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,22 +151,22 @@ def _download_files(self, ignore_missing: bool = False) -> None:
_LOG.exception("Cannot download %s to %s", path_from, path_to)
raise ex

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Download benchmark results from the shared storage
and run post-processing scripts locally.

Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
self._download_files()
return super().run()

def status(self) -> Tuple[Status, List[Tuple[datetime, str, Any]]]:
def status(self) -> Tuple[Status, datetime, List[Tuple[datetime, str, Any]]]:
self._download_files(ignore_missing=True)
return super().status()
11 changes: 6 additions & 5 deletions mlos_bench/mlos_bench/environments/mock_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import random
import logging
from datetime import datetime
from typing import Dict, Optional, Tuple

import numpy
Expand Down Expand Up @@ -61,20 +62,20 @@ def __init__(self,
self._metrics = self.config.get("metrics", ["score"])
self._is_ready = True

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Produce mock benchmark data for one experiment.

Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
The keys of the `output` dict are the names of the metrics
specified in the config; by default it's just one metric
named "score". All output metrics have the same value.
"""
(status, _) = result = super().run()
(status, timestamp, _) = result = super().run()
if not status.is_ready():
return result

Expand All @@ -89,7 +90,7 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
if self._range:
score = self._range[0] + score * (self._range[1] - self._range[0])

return (Status.SUCCEEDED, {metric: score for metric in self._metrics})
return (Status.SUCCEEDED, timestamp, {metric: score for metric in self._metrics})

@staticmethod
def _normalized(tunable: Tunable) -> float:
Expand Down
27 changes: 15 additions & 12 deletions mlos_bench/mlos_bench/environments/remote/remote_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"""

import logging
from datetime import datetime
from typing import Dict, Iterable, Optional, Tuple

from mlos_bench.environments.status import Status
Expand Down Expand Up @@ -104,15 +105,15 @@ def setup(self, tunables: TunableGroups, global_config: Optional[dict] = None) -

if self._script_setup:
_LOG.info("Set up the remote environment: %s", self)
(status, _) = self._remote_exec(self._script_setup)
(status, _timestamp, _output) = self._remote_exec(self._script_setup)
_LOG.info("Remote set up complete: %s :: %s", self, status)
self._is_ready = status.is_succeeded()
else:
self._is_ready = True

return self._is_ready

def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:
def run(self) -> Tuple[Status, datetime, Optional[Dict[str, TunableValue]]]:
"""
Runs the run script on the remote environment.

Expand All @@ -122,34 +123,34 @@ def run(self) -> Tuple[Status, Optional[Dict[str, TunableValue]]]:

Returns
-------
(status, output) : (Status, dict)
A pair of (Status, output) values, where `output` is a dict
(status, timestamp, output) : (Status, datetime, dict)
3-tuple of (Status, timestamp, output) values, where `output` is a dict
with the results or None if the status is not COMPLETED.
If run script is a benchmark, then the score is usually expected to
be in the `score` field.
"""
_LOG.info("Run script remotely on: %s", self)
(status, _) = result = super().run()
(status, timestamp, _) = result = super().run()
if not (status.is_ready() and self._script_run):
return result

(status, output) = self._remote_exec(self._script_run)
(status, timestamp, output) = self._remote_exec(self._script_run)
if status.is_succeeded() and output is not None:
output = self._extract_stdout_results(output.get("stdout", ""))
_LOG.info("Remote run complete: %s :: %s = %s", self, status, output)
return (status, output)
return (status, timestamp, output)

def teardown(self) -> None:
"""
Clean up and shut down the remote environment.
"""
if self._script_teardown:
_LOG.info("Remote teardown: %s", self)
(status, _) = self._remote_exec(self._script_teardown)
(status, _timestamp, _output) = self._remote_exec(self._script_teardown)
_LOG.info("Remote teardown complete: %s :: %s", self, status)
super().teardown()

def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, Optional[dict]]:
def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, datetime, Optional[dict]]:
"""
Run a script on the remote host.

Expand All @@ -160,8 +161,8 @@ def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, Optional[dict]]:

Returns
-------
result : (Status, dict)
A pair of Status and dict with the benchmark/script results.
result : (Status, datetime, dict)
3-tuple of Status, timestamp, and dict with the benchmark/script results.
Status is one of {PENDING, SUCCEEDED, FAILED, TIMED_OUT}
"""
env_params = self._get_env_params()
Expand All @@ -172,4 +173,6 @@ def _remote_exec(self, script: Iterable[str]) -> Tuple[Status, Optional[dict]]:
if status in {Status.PENDING, Status.SUCCEEDED}:
(status, output) = self._remote_exec_service.get_remote_exec_results(output)
_LOG.debug("Status: %s :: %s", status, output)
return (status, output)
# FIXME: get the timestamp from the remote environment!
timestamp = datetime.utcnow()
return (status, timestamp, output)
12 changes: 6 additions & 6 deletions mlos_bench/mlos_bench/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,18 +178,18 @@ def _run(env: Environment, opt: Optimizer, trial: Storage.Trial, global_config:
opt.register(trial.tunables, Status.FAILED)
return

(status, results) = env.run() # Block and wait for the final result.
(status, timestamp, results) = env.run() # Block and wait for the final result.
_LOG.info("Results: %s :: %s\n%s", trial.tunables, status, results)

# In async mode (TODO), poll the environment for status and telemetry
# and update the storage with the intermediate results.
(_, telemetry) = env.status()
# Use the status from `.run()` as it is the final status of the experiment.
(_status, _timestamp, telemetry) = env.status()

# Use the status and timestamp from `.run()` as it is the final status of the experiment.
# TODO: Use the `.status()` output in async mode.
trial.update_telemetry(status, telemetry)
trial.update_telemetry(status, timestamp, telemetry)

# FIXME: Use the actual timestamp from the benchmark.
trial.update(status, datetime.utcnow(), results)
trial.update(status, timestamp, results)
# Filter out non-numeric scores from the optimizer.
scores = results if not isinstance(results, dict) \
else {k: float(v) for (k, v) in results.items() if isinstance(v, (int, float))}
Expand Down
Loading