diff --git a/.vscode/settings.json b/.vscode/settings.json index a01d1d6418f..f29dec766e2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,6 +8,7 @@ "**/__pycache__/": true, "**/node_modules/": true, "**/*.egg-info": true, + "mlos_*/build/": true, "doc/source/autoapi/": true, "doc/build/doctrees/": true, "doc/build/html/": true, diff --git a/mlos_bench/mlos_bench/config/schemas/cli/globals-schema.json b/mlos_bench/mlos_bench/config/schemas/cli/globals-schema.json index 015b4a6e62c..39e60e3249b 100644 --- a/mlos_bench/mlos_bench/config/schemas/cli/globals-schema.json +++ b/mlos_bench/mlos_bench/config/schemas/cli/globals-schema.json @@ -24,6 +24,9 @@ }, "optimization_targets": { "$ref": "./common-defs-subschemas.json#/$defs/optimization_targets" + }, + "mock_trial_data": { + "$ref": "../environments/mock-env-subschema.json#/$defs/mock_trial_data" } }, "additionalProperties": { diff --git a/mlos_bench/mlos_bench/config/schemas/environments/mock-env-subschema.json b/mlos_bench/mlos_bench/config/schemas/environments/mock-env-subschema.json index cb2de6c719f..b453c8573aa 100644 --- a/mlos_bench/mlos_bench/config/schemas/environments/mock-env-subschema.json +++ b/mlos_bench/mlos_bench/config/schemas/environments/mock-env-subschema.json @@ -3,6 +3,136 @@ "$id": "https://raw.githubusercontent.com/microsoft/MLOS/main/mlos_bench/mlos_bench/config/schemas/environments/mock-env-subschema.json", "title": "mlos_bench MockEnv config", "description": "Config instance for a mlos_bench MockEnv", + + "$defs": { + "mock_trial_common_phase_data": { + "type": "object", + "properties": { + "sleep": { + "type": "number", + "description": "Optional time to sleep (in seconds) before returning from this phase of the trial.", + "examples": [0, 0.1, 0.5, 1, 2], + "minimum": 0, + "maximum": 60 + }, + "exception": { + "type": "string", + "description": "Optional exception message to raise during phase." + } + } + }, + "mock_trial_status_run_phase_data": { + "type": "object", + "properties": { + "status": { + "description": "The status to report for this phase of the trial. Default is phase dependent.", + "enum": [ + "UNKNOWN", + "PENDING", + "READY", + "RUNNING", + "SUCCEEDED", + "CANCELED", + "FAILED", + "TIMED_OUT" + ] + }, + "metrics": { + "type": "object", + "description": "A dictionary of metrics for this phase of the trial.", + "additionalProperties": { + "type": [ + "number", + "string", + "boolean" + ], + "description": "The value of the metric." + }, + "examples": [ + { + "score": 0.95, + "color": "green" + }, + { + "accuracy": 0.85, + "loss": 0.15 + } + ], + "minProperties": 0 + } + } + }, + "mock_trial_data_item": { + "description": "Mock data for a single trial, split by phase", + "type": "object", + "properties": { + "run": { + "description": "A dictionary of trial data for the run phase.", + "type": "object", + "allOf": [ + { + "$ref": "#/$defs/mock_trial_common_phase_data" + }, + { + "$ref": "#/$defs/mock_trial_status_run_phase_data" + } + ], + "minProperties": 1, + "unevaluatedProperties": false + }, + "status": { + "description": "A dictionary of trial data for the status phase.", + "type": "object", + "allOf": [ + { + "$ref": "#/$defs/mock_trial_common_phase_data" + }, + { + "$ref": "#/$defs/mock_trial_status_run_phase_data" + } + ], + "minProperties": 1, + "unevaluatedProperties": false + }, + "setup": { + "description": "A dictionary of trial data for the setup phase.", + "type": "object", + "allOf": [ + { + "$ref": "#/$defs/mock_trial_common_phase_data" + } + ], + "minProperties": 1, + "unevaluatedProperties": false + }, + "teardown": { + "description": "A dictionary of trial data for the teardown phase.", + "type": "object", + "allOf": [ + { + "$ref": "#/$defs/mock_trial_common_phase_data" + } + ], + "minProperties": 1, + "unevaluatedProperties": false + } + }, + "unevaluatedProperties": false, + "minProperties": 1 + }, + "mock_trial_data": { + "description": "A set of mock trial data to use for testing, keyed by trial id. Used by MockEnv.", + "type": "object", + "patternProperties": { + "^[1-9][0-9]*$": { + "$ref": "#/$defs/mock_trial_data_item" + } + }, + "unevaluatedProperties": false, + "minProperties": 1 + } + }, + "type": "object", "properties": { "class": { @@ -42,6 +172,9 @@ }, "minItems": 1, "uniqueItems": true + }, + "mock_trial_data": { + "$ref": "#/$defs/mock_trial_data" } } } diff --git a/mlos_bench/mlos_bench/environments/base_environment.py b/mlos_bench/mlos_bench/environments/base_environment.py index 094085c78b5..fe40025f95d 100644 --- a/mlos_bench/mlos_bench/environments/base_environment.py +++ b/mlos_bench/mlos_bench/environments/base_environment.py @@ -363,6 +363,92 @@ def parameters(self) -> dict[str, TunableValue]: """ return self._params.copy() + @property + def current_trial_id(self) -> int: + """ + Get the current trial ID. + + This value can be used in scripts or environment variables to help + identify the Trial this Environment is currently running. + + Returns + ------- + trial_id : int + The current trial ID. + + Notes + ----- + This method is used to identify the current trial ID for the environment. + It is expected to be called *after* the base + :py:meth:`Environment.setup` method has been called and parameters have + been assigned. + """ + val = self._params["trial_id"] + assert isinstance(val, int), ( + "Expected trial_id to be an int, but got %s (type %s): %s", + val, + type(val), + self._params, + ) + return val + + @property + def trial_runner_id(self) -> int: + """ + Get the ID of the :py:class:`~.mlos_bench.schedulers.trial_runner.TrialRunner` + for this Environment. + + This value can be used in scripts or environment variables to help + identify the TrialRunner for this Environment. + + Returns + ------- + trial_runner_id : int + The trial runner ID. + + Notes + ----- + This shouldn't change during the lifetime of the Environment since each + Environment is assigned to a single TrialRunner. + """ + val = self._params["trial_runner_id"] + assert isinstance(val, int), ( + "Expected trial_runner_id to be an int, but got %s (type %s)", + val, + type(val), + ) + return val + + @property + def experiment_id(self) -> str: + """ + Get the ID of the experiment. + + This value can be used in scripts or environment variables to help + identify the TrialRunner for this Environment. + + Returns + ------- + experiment_id : str + The ID of the experiment. + + Notes + ----- + This value comes from the globals config or ``mlos_bench`` CLI arguments + in the experiment setup. + + See Also + -------- + mlos_bench.config : documentation on the configuration system + """ + val = self._params["experiment_id"] + assert isinstance(val, str), ( + "Expected experiment_id to be an int, but got %s (type %s)", + val, + type(val), + ) + return val + def setup(self, tunables: TunableGroups, global_config: dict | None = None) -> bool: """ Set up a new benchmark environment, if necessary. This method must be diff --git a/mlos_bench/mlos_bench/environments/mock_env.py b/mlos_bench/mlos_bench/environments/mock_env.py index ac6d9b7f001..a2b7f411a7d 100644 --- a/mlos_bench/mlos_bench/environments/mock_env.py +++ b/mlos_bench/mlos_bench/environments/mock_env.py @@ -6,10 +6,14 @@ import logging import random +import time +from copy import deepcopy +from dataclasses import dataclass from datetime import datetime from typing import Any import numpy +from pytz import UTC from mlos_bench.environments.base_environment import Environment from mlos_bench.environments.status import Status @@ -21,6 +25,131 @@ _LOG = logging.getLogger(__name__) +@dataclass +class MockTrialPhaseData: + """Mock trial data for a specific phase of a trial.""" + + phase: str + """Phase of the trial data (e.g., setup, run, status, teardown).""" + + status: Status + """Status response for the phase.""" + + metrics: dict[str, TunableValue] | None = None + """Metrics response for the phase.""" + + sleep: float | None = 0.0 + """Optional sleep time in seconds to simulate phase execution time.""" + + exception: str | None = None + """Message of an exception to raise for the phase.""" + + @staticmethod + def from_dict(phase: str, data: dict | None) -> "MockTrialPhaseData": + """ + Create a MockTrialPhaseData instance from a dictionary. + + Parameters + ---------- + phase : str + Phase of the trial data. + data : dict | None + Dictionary containing the phase data. + + Returns + ------- + MockTrialPhaseData + Instance of MockTrialPhaseData. + """ + data = data or {} + assert phase in {"setup", "run", "status", "teardown"}, f"Invalid phase: {phase}" + if phase in {"setup", "teardown"}: + # setup/teardown phase is not expected to have metrics or status. + assert "metrics" not in data, f"Unexpected metrics data in {phase} phase: {data}" + assert "status" not in data, f"Unexpected status data in {phase} phase: {data}" + if "sleep" in data: + assert isinstance( + data["sleep"], (int, float) + ), f"Invalid sleep in {phase} phase: {data}" + assert 60 >= data["sleep"] >= 0, f"Invalid sleep time in {phase} phase: {data}" + if "metrics" in data: + assert isinstance(data["metrics"], dict), f"Invalid metrics in {phase} phase: {data}" + default_phases = { + "run": Status.SUCCEEDED, + # FIXME: this causes issues if we report RUNNING instead of READY + "status": Status.READY, + } + status = Status.parse(data.get("status", default_phases.get(phase, Status.UNKNOWN))) + return MockTrialPhaseData( + phase=phase, + status=status, + metrics=data.get("metrics"), + sleep=data.get("sleep"), + exception=data.get("exception"), + ) + + +@dataclass +class MockTrialData: + """Mock trial data for a specific trial ID.""" + + trial_id: int + """Trial ID for the mock trial data.""" + setup: MockTrialPhaseData + """Setup phase data for the trial.""" + run: MockTrialPhaseData + """Run phase data for the trial.""" + status: MockTrialPhaseData + """Status phase data for the trial.""" + teardown: MockTrialPhaseData + """Teardown phase data for the trial.""" + + @staticmethod + def from_dict(trial_id: int, data: dict) -> "MockTrialData": + """ + Create a MockTrialData instance from a dictionary. + + Parameters + ---------- + trial_id : int + Trial ID for the mock trial data. + data : dict + Dictionary containing the trial data. + + Returns + ------- + MockTrialData + Instance of MockTrialData. + """ + return MockTrialData( + trial_id=trial_id, + setup=MockTrialPhaseData.from_dict("setup", data.get("setup")), + run=MockTrialPhaseData.from_dict("run", data.get("run")), + status=MockTrialPhaseData.from_dict("status", data.get("status")), + teardown=MockTrialPhaseData.from_dict("teardown", data.get("teardown")), + ) + + @staticmethod + def load_mock_trial_data(mock_trial_data: dict) -> dict[int, "MockTrialData"]: + """ + Load mock trial data from a dictionary. + + Parameters + ---------- + mock_trial_data : dict + Dictionary containing mock trial data. + + Returns + ------- + dict[int, MockTrialData] + Dictionary of mock trial data keyed by trial ID. + """ + return { + int(trial_id): MockTrialData.from_dict(trial_id=int(trial_id), data=trial_data) + for trial_id, trial_data in mock_trial_data.items() + } + + class MockEnv(Environment): """Scheduler-side environment to mock the benchmark results.""" @@ -55,6 +184,19 @@ def __init__( # pylint: disable=too-many-arguments service: Service An optional service object. Not used by this class. """ + # First allow merging mock_trial_data from the global_config into the + # config so we can check it against the JSON schema for expected data + # types. + if global_config and "mock_trial_data" in global_config: + mock_trial_data = global_config["mock_trial_data"] + if not isinstance(mock_trial_data, dict): + raise ValueError(f"Invalid mock_trial_data in global_config: {mock_trial_data}") + # Merge the mock trial data into the config. + config["mock_trial_data"] = { + **config.get("mock_trial_data", {}), + **mock_trial_data, + } + super().__init__( name=name, config=config, @@ -62,6 +204,9 @@ def __init__( # pylint: disable=too-many-arguments tunables=tunables, service=service, ) + self._mock_trial_data = MockTrialData.load_mock_trial_data( + self.config.get("mock_trial_data", {}) + ) seed = int(self.config.get("mock_env_seed", -1)) self._run_random = random.Random(seed or None) if seed >= 0 else None self._status_random = random.Random(seed or None) if seed >= 0 else None @@ -83,6 +228,59 @@ def _produce_metrics(self, rand: random.Random | None) -> dict[str, TunableValue return {metric: float(score) for metric in self._metrics or []} + @property + def mock_trial_data(self) -> dict[int, MockTrialData]: + """ + Get the mock trial data for all trials. + + Returns + ------- + dict[int, MockTrialData] + Dictionary of mock trial data keyed by trial ID. + """ + return deepcopy(self._mock_trial_data) + + def get_current_mock_trial_data(self) -> MockTrialData: + """ + Gets mock trial data for the current trial ID. + + If no (or missing) mock trial data is found, a new instance of + MockTrialData is created and later filled with random data. + + Note + ---- + This method must be called after the base :py:meth:`.Environment.setup` + method is called to ensure the current ``trial_id`` is set. + """ + trial_id = self.current_trial_id + if trial_id not in self._mock_trial_data: + # Make an empty mock trial data object if not found. + self._mock_trial_data[trial_id] = MockTrialData.from_dict(trial_id, data={}) + return self._mock_trial_data[trial_id] + + def setup(self, tunables: TunableGroups, global_config: dict | None = None) -> bool: + is_success = super().setup(tunables, global_config) + mock_trial_data = self.get_current_mock_trial_data() + if mock_trial_data.setup.sleep: + _LOG.debug("Sleeping for %s seconds", mock_trial_data.setup.sleep) + time.sleep(mock_trial_data.setup.sleep) + if mock_trial_data.setup.exception: + raise RuntimeError( + f"Mock trial data setup exception: {mock_trial_data.setup.exception}" + ) + return is_success + + def teardown(self) -> None: + mock_trial_data = self.get_current_mock_trial_data() + if mock_trial_data.teardown.sleep: + _LOG.debug("Sleeping for %s seconds", mock_trial_data.teardown.sleep) + time.sleep(mock_trial_data.teardown.sleep) + if mock_trial_data.teardown.exception: + raise RuntimeError( + f"Mock trial data teardown exception: {mock_trial_data.teardown.exception}" + ) + super().teardown() + def run(self) -> tuple[Status, datetime, dict[str, TunableValue] | None]: """ Produce mock benchmark data for one experiment. @@ -99,8 +297,20 @@ def run(self) -> tuple[Status, datetime, dict[str, TunableValue] | None]: (status, timestamp, _) = result = super().run() if not status.is_ready(): return result - metrics = self._produce_metrics(self._run_random) - return (Status.SUCCEEDED, timestamp, metrics) + mock_trial_data = self.get_current_mock_trial_data() + if mock_trial_data.run.sleep: + _LOG.debug("Sleeping for %s seconds", mock_trial_data.run.sleep) + time.sleep(mock_trial_data.run.sleep) + # Update the timestamp after the sleep. + timestamp = datetime.now(UTC) + if mock_trial_data.run.exception: + raise RuntimeError(f"Mock trial data run exception: {mock_trial_data.run.exception}") + if mock_trial_data.run.metrics is not None: + metrics = mock_trial_data.run.metrics + else: + # If no metrics are provided, generate them. + metrics = self._produce_metrics(self._run_random) + return (mock_trial_data.run.status, timestamp, metrics) def status(self) -> tuple[Status, datetime, list[tuple[datetime, str, Any]]]: """ @@ -116,10 +326,28 @@ def status(self) -> tuple[Status, datetime, list[tuple[datetime, str, Any]]]: (status, timestamp, _) = result = super().status() if not status.is_ready(): return result - metrics = self._produce_metrics(self._status_random) + mock_trial_data = self.get_current_mock_trial_data() + if mock_trial_data.status.sleep: + _LOG.debug("Sleeping for %s seconds", mock_trial_data.status.sleep) + time.sleep(mock_trial_data.status.sleep) + # Update the timestamp after the sleep. + timestamp = datetime.now(UTC) + if mock_trial_data.status.exception: + raise RuntimeError( + f"Mock trial data status exception: {mock_trial_data.status.exception}" + ) + if mock_trial_data.status.metrics is None: + # If no metrics are provided, generate them. + # Note: we don't save these in the mock trial data as they may need + # to change to preserve backwards compatibility with previous tests. + metrics = self._produce_metrics(self._status_random) + else: + # If metrics are provided, use them. + # Note: current implementation uses the same metrics for all status + # calls of this trial. + metrics = mock_trial_data.status.metrics return ( - # FIXME: this causes issues if we report RUNNING instead of READY - Status.READY, + mock_trial_data.status.status, timestamp, [(timestamp, metric, score) for (metric, score) in metrics.items()], ) diff --git a/mlos_bench/mlos_bench/environments/script_env.py b/mlos_bench/mlos_bench/environments/script_env.py index 6ac4674cfe1..d71eb661834 100644 --- a/mlos_bench/mlos_bench/environments/script_env.py +++ b/mlos_bench/mlos_bench/environments/script_env.py @@ -5,7 +5,7 @@ """ Base scriptable benchmark environment. -TODO: Document how variable propogation works in the script environments using +TODO: Document how variable propagation works in the script environments using shell_env_params, required_args, const_args, etc. """ diff --git a/mlos_bench/mlos_bench/environments/status.py b/mlos_bench/mlos_bench/environments/status.py index 6d76d7206c8..aa3b3e99c16 100644 --- a/mlos_bench/mlos_bench/environments/status.py +++ b/mlos_bench/mlos_bench/environments/status.py @@ -24,21 +24,37 @@ class Status(enum.Enum): TIMED_OUT = 7 @staticmethod - def from_str(status_str: Any) -> "Status": - """Convert a string to a Status enum.""" - if not isinstance(status_str, str): - _LOG.warning("Expected type %s for status: %s", type(status_str), status_str) - status_str = str(status_str) - if status_str.isdigit(): + def parse(status: Any) -> "Status": + """ + Convert the input to a Status enum. + + Parameters + ---------- + status : Any + The status to parse. This can be a string (or string convertible), + int, or Status enum. + + Returns + ------- + Status + The corresponding Status enum value or else UNKNOWN if the input is not + recognized. + """ + if isinstance(status, Status): + return status + if not isinstance(status, str): + _LOG.warning("Expected type %s for status: %s", type(status), status) + status = str(status) + if status.isdigit(): try: - return Status(int(status_str)) + return Status(int(status)) except ValueError: - _LOG.warning("Unknown status: %d", int(status_str)) + _LOG.warning("Unknown status: %d", int(status)) try: - status_str = status_str.upper().strip() - return Status[status_str] + status = status.upper().strip() + return Status[status] except KeyError: - _LOG.warning("Unknown status: %s", status_str) + _LOG.warning("Unknown status: %s", status) return Status.UNKNOWN def is_good(self) -> bool: @@ -113,4 +129,15 @@ def is_timed_out(self) -> bool: Status.TIMED_OUT, } ) -"""The set of completed statuses.""" +""" +The set of completed statuses. + +Includes all statuses that indicate the trial or experiment has finished, either +successfully or not. +This set is used to determine if a trial or experiment has reached a final state. +This includes: +- :py:attr:`.Status.SUCCEEDED`: The trial or experiment completed successfully. +- :py:attr:`.Status.CANCELED`: The trial or experiment was canceled. +- :py:attr:`.Status.FAILED`: The trial or experiment failed. +- :py:attr:`.Status.TIMED_OUT`: The trial or experiment timed out. +""" diff --git a/mlos_bench/mlos_bench/launcher.py b/mlos_bench/mlos_bench/launcher.py index c728ed7fb20..353ace23f0e 100644 --- a/mlos_bench/mlos_bench/launcher.py +++ b/mlos_bench/mlos_bench/launcher.py @@ -55,8 +55,9 @@ def __init__(self, description: str, long_text: str = "", argv: list[str] | None Other required_args values can also be pulled from shell environment variables. - For additional details, please see the website or the README.md files in - the source tree: + For additional details, please see the documentation website or the + README.md files in the source tree: + """ parser = argparse.ArgumentParser(description=f"{description} : {long_text}", epilog=epilog) diff --git a/mlos_bench/mlos_bench/optimizers/base_optimizer.py b/mlos_bench/mlos_bench/optimizers/base_optimizer.py index 44aa9a035e2..72b437b320e 100644 --- a/mlos_bench/mlos_bench/optimizers/base_optimizer.py +++ b/mlos_bench/mlos_bench/optimizers/base_optimizer.py @@ -356,8 +356,10 @@ def _get_scores( assert scores is not None target_metrics: dict[str, float] = {} for opt_target, opt_dir in self._opt_targets.items(): + if opt_target not in scores: + raise ValueError(f"Score for {opt_target} not found in {scores}.") val = scores[opt_target] - assert val is not None + assert val is not None, f"Score for {opt_target} is None." target_metrics[opt_target] = float(val) * opt_dir return target_metrics diff --git a/mlos_bench/mlos_bench/optimizers/mock_optimizer.py b/mlos_bench/mlos_bench/optimizers/mock_optimizer.py index 947e34a7da4..0878ca4d75d 100644 --- a/mlos_bench/mlos_bench/optimizers/mock_optimizer.py +++ b/mlos_bench/mlos_bench/optimizers/mock_optimizer.py @@ -14,6 +14,7 @@ import logging import random from collections.abc import Callable, Sequence +from dataclasses import dataclass from mlos_bench.environments.status import Status from mlos_bench.optimizers.track_best_optimizer import TrackBestOptimizer @@ -25,6 +26,15 @@ _LOG = logging.getLogger(__name__) +@dataclass +class RegisteredScore: + """A registered score for a trial.""" + + config: TunableGroups + score: dict[str, TunableValue] | None + status: Status + + class MockOptimizer(TrackBestOptimizer): """Mock optimizer to test the Environment API.""" @@ -42,6 +52,40 @@ def __init__( "float": lambda tunable: rnd.uniform(*tunable.range), "int": lambda tunable: rnd.randint(*(int(x) for x in tunable.range)), } + self._registered_scores: list[RegisteredScore] = [] + + @property + def registered_scores(self) -> list[RegisteredScore]: + """ + Return the list of registered scores. + + Notes + ----- + Used for testing and validation. + """ + return self._registered_scores + + def register( + self, + tunables: TunableGroups, + status: Status, + score: dict[str, TunableValue] | None = None, + ) -> dict[str, float] | None: + # Track the registered scores for testing and validation. + # Almost the same as _get_scores, but we don't adjust the direction here. + scores: dict[str, TunableValue] = { + k: float(v) + for k, v in (score or {}).items() + if k in self._opt_targets and v is not None + } + self._registered_scores.append( + RegisteredScore( + config=tunables.copy(), + score=scores, + status=status, + ) + ) + return super().register(tunables, status, score) def bulk_register( self, diff --git a/mlos_bench/mlos_bench/schedulers/base_scheduler.py b/mlos_bench/mlos_bench/schedulers/base_scheduler.py index 1cd88fd5859..3c22d427c79 100644 --- a/mlos_bench/mlos_bench/schedulers/base_scheduler.py +++ b/mlos_bench/mlos_bench/schedulers/base_scheduler.py @@ -242,8 +242,15 @@ def __exit__( self._in_context = False return False # Do not suppress exceptions - def start(self) -> None: - """Start the scheduling loop.""" + def _prepare_start(self) -> bool: + """ + Prepare the scheduler for starting. + + Notes + ----- + This method is called by the :py:meth:`Scheduler.start` method. + It is split out mostly to allow for easier unit testing/mocking. + """ assert self.experiment is not None _LOG.info( "START: Experiment: %s Env: %s Optimizer: %s", @@ -262,21 +269,39 @@ def start(self) -> None: is_warm_up: bool = self.optimizer.supports_preload if not is_warm_up: _LOG.warning("Skip pending trials and warm-up: %s", self.optimizer) + return is_warm_up + def start(self) -> None: + """Start the scheduling loop.""" + assert self.experiment is not None + is_warm_up = self._prepare_start() not_done: bool = True while not_done: - _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) - self.run_schedule(is_warm_up) - not_done = self.add_new_optimizer_suggestions() - self.assign_trial_runners( - self.experiment.pending_trials( - datetime.now(UTC), - running=False, - trial_runner_assigned=False, - ) - ) + not_done = self._execute_scheduling_step(is_warm_up) is_warm_up = False + def _execute_scheduling_step(self, is_warm_up: bool) -> bool: + """ + Perform a single scheduling step. + + Notes + ----- + This method is called by the :py:meth:`Scheduler.start` method. + It is split out mostly to allow for easier unit testing/mocking. + """ + assert self.experiment is not None + _LOG.info("Optimization loop: Last trial ID: %d", self._last_trial_id) + self.run_schedule(is_warm_up) + not_done = self.add_new_optimizer_suggestions() + self.assign_trial_runners( + self.experiment.pending_trials( + datetime.now(UTC), + running=False, + trial_runner_assigned=False, + ) + ) + return not_done + def teardown(self) -> None: """ Tear down the TrialRunners/Environment(s). diff --git a/mlos_bench/mlos_bench/storage/sql/common.py b/mlos_bench/mlos_bench/storage/sql/common.py index 97eb270c9d9..032cf9259d8 100644 --- a/mlos_bench/mlos_bench/storage/sql/common.py +++ b/mlos_bench/mlos_bench/storage/sql/common.py @@ -95,7 +95,7 @@ def get_trials( config_id=trial.config_id, ts_start=utcify_timestamp(trial.ts_start, origin="utc"), ts_end=utcify_nullable_timestamp(trial.ts_end, origin="utc"), - status=Status.from_str(trial.status), + status=Status.parse(trial.status), trial_runner_id=trial.trial_runner_id, ) for trial in trials.fetchall() diff --git a/mlos_bench/mlos_bench/storage/sql/experiment.py b/mlos_bench/mlos_bench/storage/sql/experiment.py index acc2a497b48..4181df95f32 100644 --- a/mlos_bench/mlos_bench/storage/sql/experiment.py +++ b/mlos_bench/mlos_bench/storage/sql/experiment.py @@ -22,7 +22,7 @@ from mlos_bench.storage.sql.schema import DbSchema from mlos_bench.storage.sql.trial import Trial from mlos_bench.tunables.tunable_groups import TunableGroups -from mlos_bench.util import utcify_timestamp +from mlos_bench.util import try_parse_val, utcify_timestamp _LOG = logging.getLogger(__name__) @@ -149,7 +149,11 @@ def load_telemetry(self, trial_id: int) -> list[tuple[datetime, str, Any]]: # Not all storage backends store the original zone info. # We try to ensure data is entered in UTC and augment it on return again here. return [ - (utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value) + ( + utcify_timestamp(row.ts, origin="utc"), + row.metric_id, + try_parse_val(row.metric_value), + ) for row in cur_telemetry.fetchall() ] @@ -188,7 +192,7 @@ def load( status: list[Status] = [] for trial in cur_trials.fetchall(): - stat = Status.from_str(trial.status) + stat = Status.parse(trial.status) status.append(stat) trial_ids.append(trial.trial_id) configs.append( @@ -229,10 +233,15 @@ def _get_key_val(conn: Connection, table: Table, field: str, **kwargs: Any) -> d .select_from(table) .where(*[column(key) == val for (key, val) in kwargs.items()]) ) + + def _tuple_to_kv(row_tuple: tuple[str, Any]) -> tuple[str, Any]: + return row_tuple[0], try_parse_val(row_tuple[1]) + # NOTE: `Row._tuple()` is NOT a protected member; the class uses `_` to # avoid naming conflicts. return dict( - row._tuple() for row in cur_result.fetchall() # pylint: disable=protected-access + _tuple_to_kv(row._tuple()) # pylint: disable=protected-access + for row in cur_result.fetchall() ) def get_trial_by_id( @@ -272,7 +281,7 @@ def get_trial_by_id( config_id=trial.config_id, trial_runner_id=trial.trial_runner_id, opt_targets=self._opt_targets, - status=Status.from_str(trial.status), + status=Status.parse(trial.status), restoring=True, config=config, ) @@ -330,7 +339,7 @@ def pending_trials( config_id=trial.config_id, trial_runner_id=trial.trial_runner_id, opt_targets=self._opt_targets, - status=Status.from_str(trial.status), + status=Status.parse(trial.status), restoring=True, config=config, ) @@ -367,11 +376,7 @@ def _new_trial( ts_start: datetime | None = None, config: dict[str, Any] | None = None, ) -> Storage.Trial: - # MySQL can round microseconds into the future causing scheduler to skip trials. - # Truncate microseconds to avoid this issue. - ts_start = utcify_timestamp(ts_start or datetime.now(UTC), origin="local").replace( - microsecond=0 - ) + ts_start = utcify_timestamp(ts_start or datetime.now(UTC), origin="local") _LOG.debug("Create trial: %s:%d @ %s", self._experiment_id, self._trial_id, ts_start) with self._engine.begin() as conn: try: diff --git a/mlos_bench/mlos_bench/storage/sql/schema.py b/mlos_bench/mlos_bench/storage/sql/schema.py index 2bc00f00825..3cbb63bcc45 100644 --- a/mlos_bench/mlos_bench/storage/sql/schema.py +++ b/mlos_bench/mlos_bench/storage/sql/schema.py @@ -39,6 +39,7 @@ create_mock_engine, inspect, ) +from sqlalchemy.dialects import mysql from sqlalchemy.engine import Engine from mlos_bench.util import path_join @@ -104,8 +105,8 @@ def __init__(self, engine: Engine | None): Column("git_repo", String(1024), nullable=False), Column("git_commit", String(40), nullable=False), # For backwards compatibility, we allow NULL for ts_start. - Column("ts_start", DateTime), - Column("ts_end", DateTime), + Column("ts_start", DateTime().with_variant(mysql.DATETIME(fsp=6), "mysql")), + Column("ts_end", DateTime().with_variant(mysql.DATETIME(fsp=6), "mysql")), # Should match the text IDs of `mlos_bench.environments.Status` enum: # For backwards compatibility, we allow NULL for status. Column("status", String(self._STATUS_LEN)), @@ -179,8 +180,16 @@ def __init__(self, engine: Engine | None): Column("trial_id", Integer, nullable=False), Column("config_id", Integer, nullable=False), Column("trial_runner_id", Integer, nullable=True, default=None), - Column("ts_start", DateTime, nullable=False), - Column("ts_end", DateTime), + Column( + "ts_start", + DateTime().with_variant(mysql.DATETIME(fsp=6), "mysql"), + nullable=False, + ), + Column( + "ts_end", + DateTime().with_variant(mysql.DATETIME(fsp=6), "mysql"), + nullable=True, + ), # Should match the text IDs of `mlos_bench.environments.Status` enum: Column("status", String(self._STATUS_LEN), nullable=False), PrimaryKeyConstraint("exp_id", "trial_id"), @@ -232,7 +241,12 @@ def __init__(self, engine: Engine | None): self._meta, Column("exp_id", String(self._ID_LEN), nullable=False), Column("trial_id", Integer, nullable=False), - Column("ts", DateTime(timezone=True), nullable=False, default="now"), + Column( + "ts", + DateTime(timezone=True).with_variant(mysql.DATETIME(fsp=6), "mysql"), + nullable=False, + default="now", + ), Column("status", String(self._STATUS_LEN), nullable=False), UniqueConstraint("exp_id", "trial_id", "ts"), ForeignKeyConstraint( @@ -267,7 +281,12 @@ def __init__(self, engine: Engine | None): self._meta, Column("exp_id", String(self._ID_LEN), nullable=False), Column("trial_id", Integer, nullable=False), - Column("ts", DateTime(timezone=True), nullable=False, default="now"), + Column( + "ts", + DateTime(timezone=True).with_variant(mysql.DATETIME(fsp=6), "mysql"), + nullable=False, + default="now", + ), Column("metric_id", String(self._ID_LEN), nullable=False), Column("metric_value", String(self._METRIC_VALUE_LEN)), UniqueConstraint("exp_id", "trial_id", "ts", "metric_id"), diff --git a/mlos_bench/mlos_bench/storage/sql/trial_data.py b/mlos_bench/mlos_bench/storage/sql/trial_data.py index 03ef19570ca..af5821591f3 100644 --- a/mlos_bench/mlos_bench/storage/sql/trial_data.py +++ b/mlos_bench/mlos_bench/storage/sql/trial_data.py @@ -16,7 +16,7 @@ from mlos_bench.storage.base_tunable_config_data import TunableConfigData from mlos_bench.storage.sql.schema import DbSchema from mlos_bench.storage.sql.tunable_config_data import TunableConfigSqlData -from mlos_bench.util import utcify_timestamp +from mlos_bench.util import try_parse_val, utcify_timestamp if TYPE_CHECKING: from mlos_bench.storage.base_tunable_config_trial_group_data import ( @@ -97,7 +97,10 @@ def results_df(self) -> pandas.DataFrame: ) ) return pandas.DataFrame( - [(row.metric_id, row.metric_value) for row in cur_results.fetchall()], + [ + (row.metric_id, try_parse_val(row.metric_value)) + for row in cur_results.fetchall() + ], columns=["metric", "value"], ) @@ -120,7 +123,11 @@ def telemetry_df(self) -> pandas.DataFrame: # We try to ensure data is entered in UTC and augment it on return again here. return pandas.DataFrame( [ - (utcify_timestamp(row.ts, origin="utc"), row.metric_id, row.metric_value) + ( + utcify_timestamp(row.ts, origin="utc"), + row.metric_id, + try_parse_val(row.metric_value), + ) for row in cur_telemetry.fetchall() ], columns=["ts", "metric", "value"], @@ -145,6 +152,6 @@ def metadata_df(self) -> pandas.DataFrame: ) ) return pandas.DataFrame( - [(row.param_id, row.param_value) for row in cur_params.fetchall()], + [(row.param_id, try_parse_val(row.param_value)) for row in cur_params.fetchall()], columns=["parameter", "value"], ) diff --git a/mlos_bench/mlos_bench/tests/__init__.py b/mlos_bench/mlos_bench/tests/__init__.py index ce5fbdb45af..25df725a9ed 100644 --- a/mlos_bench/mlos_bench/tests/__init__.py +++ b/mlos_bench/mlos_bench/tests/__init__.py @@ -31,9 +31,9 @@ ZONE_INFO: list[tzinfo | None] = [nullable(pytz.timezone, zone_name) for zone_name in ZONE_NAMES] BUILT_IN_ENV_VAR_DEFAULTS = { - "experiment_id": None, - "trial_id": None, - "trial_runner_id": None, + "experiment_id": "SomeExperimentName", + "trial_id": 1, + "trial_runner_id": 0, } # A decorator for tests that require docker. diff --git a/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/bad/invalid/mock_env-bad-trial-data-fields.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/bad/invalid/mock_env-bad-trial-data-fields.jsonc new file mode 100644 index 00000000000..d36559cf334 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/bad/invalid/mock_env-bad-trial-data-fields.jsonc @@ -0,0 +1,24 @@ +{ + "class": "mlos_bench.environments.mock_env.MockEnv", + "config": { + "mock_trial_data": { + "1": { + "run": { + // bad types + "status": null, + "metrics": [], + "exception": null, + "sleep": "1", + }, + // missing fields + "setup": {}, + "teardown": { + "status": "UNKNOWN", + "metrics": { + "unexpected": "field" + } + } + } + } + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/bad/invalid/mock_env-bad-trial-data-ids.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/bad/invalid/mock_env-bad-trial-data-ids.jsonc new file mode 100644 index 00000000000..400e557d0fa --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/bad/invalid/mock_env-bad-trial-data-ids.jsonc @@ -0,0 +1,13 @@ +{ + "class": "mlos_bench.environments.mock_env.MockEnv", + "config": { + "mock_trial_data": { + // invalid trial id + "trial_id_1": { + "run": { + "status": "FAILED" + } + } + } + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/bad/unhandled/mock_env-trial-data-extras.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/bad/unhandled/mock_env-trial-data-extras.jsonc new file mode 100644 index 00000000000..ecdf4cd0f51 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/bad/unhandled/mock_env-trial-data-extras.jsonc @@ -0,0 +1,15 @@ +{ + "class": "mlos_bench.environments.mock_env.MockEnv", + "config": { + "mock_trial_data": { + "1": { + "new_phase": { + "status": "FAILED" + }, + "run": { + "expected": "property" + } + } + } + } +} diff --git a/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/good/full/mock_env-full.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/good/full/mock_env-full.jsonc index a00f8ca60c0..a23971f0362 100644 --- a/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/good/full/mock_env-full.jsonc +++ b/mlos_bench/mlos_bench/tests/config/schemas/environments/test-cases/good/full/mock_env-full.jsonc @@ -25,6 +25,38 @@ "mock_env_metrics": [ "latency", "cost" - ] + ], + "mock_trial_data": { + "1": { + "setup": { + "sleep": 0.1 + }, + "status": { + "metrics": { + "latency": 0.2, + "cost": 0.3 + } + }, + "run": { + "sleep": 0.2, + "status": "SUCCEEDED", + "metrics": { + "latency": 0.1, + "cost": 0.2 + } + }, + "teardown": { + "sleep": 0.1 + } + }, + "2": { + "setup": { + "exception": "Some exception" + }, + "teardown": { + "exception": "Some other exception" + } + } + } } } diff --git a/mlos_bench/mlos_bench/tests/config/schemas/globals/test-cases/good/full/globals-with-schema.jsonc b/mlos_bench/mlos_bench/tests/config/schemas/globals/test-cases/good/full/globals-with-schema.jsonc index 58a0a31bb36..4ed580e09a9 100644 --- a/mlos_bench/mlos_bench/tests/config/schemas/globals/test-cases/good/full/globals-with-schema.jsonc +++ b/mlos_bench/mlos_bench/tests/config/schemas/globals/test-cases/good/full/globals-with-schema.jsonc @@ -10,5 +10,20 @@ "mysql": ["mysql-innodb", "mysql-myisam", "mysql-binlog", "mysql-hugepages"] }, "experiment_id": "ExperimentName", - "trial_id": 1 + "trial_id": 1, + + "mock_trial_data": { + "1": { + "setup": { + "sleep": 1 + }, + "run": { + "status": "SUCCEEDED", + "metrics": { + "score": 0.9, + "color": "green" + } + } + } + } } diff --git a/mlos_bench/mlos_bench/tests/conftest.py b/mlos_bench/mlos_bench/tests/conftest.py index becae205033..158d1fffaf5 100644 --- a/mlos_bench/mlos_bench/tests/conftest.py +++ b/mlos_bench/mlos_bench/tests/conftest.py @@ -5,12 +5,14 @@ """Common fixtures for mock TunableGroups and Environment objects.""" import os +import re import sys from collections.abc import Generator from typing import Any import pytest from fasteners import InterProcessLock, InterProcessReaderWriterLock +from pytest import FixtureRequest from pytest_docker.plugin import Services as DockerServices from pytest_docker.plugin import get_docker_services @@ -30,7 +32,22 @@ @pytest.fixture -def mock_env(tunable_groups: TunableGroups) -> MockEnv: +def mock_env_global_config(request: FixtureRequest) -> dict: + """A global config for a MockEnv.""" + test_name = request.node.name + test_name = re.sub(r"[^a-zA-Z0-9]", "_", test_name) + experiment_id = f"TestExperiment-{test_name}" + return { + "experiment_id": experiment_id, + "trial_id": 1, + } + + +@pytest.fixture +def mock_env( + tunable_groups: TunableGroups, + mock_env_global_config: dict, +) -> MockEnv: """Test fixture for MockEnv.""" return MockEnv( name="Test Env", @@ -41,11 +58,15 @@ def mock_env(tunable_groups: TunableGroups) -> MockEnv: "mock_env_metrics": ["score"], }, tunables=tunable_groups, + global_config=mock_env_global_config, ) @pytest.fixture -def mock_env_no_noise(tunable_groups: TunableGroups) -> MockEnv: +def mock_env_no_noise( + tunable_groups: TunableGroups, + mock_env_global_config: dict, +) -> MockEnv: """Test fixture for MockEnv.""" return MockEnv( name="Test Env No Noise", @@ -56,6 +77,7 @@ def mock_env_no_noise(tunable_groups: TunableGroups) -> MockEnv: "mock_env_metrics": ["score", "other_score"], }, tunables=tunable_groups, + global_config=mock_env_global_config, ) diff --git a/mlos_bench/mlos_bench/tests/environments/composite_env_test.py b/mlos_bench/mlos_bench/tests/environments/composite_env_test.py index 94613102cd2..9cda1cea52e 100644 --- a/mlos_bench/mlos_bench/tests/environments/composite_env_test.py +++ b/mlos_bench/mlos_bench/tests/environments/composite_env_test.py @@ -75,7 +75,10 @@ def composite_env(tunable_groups: TunableGroups) -> CompositeEnv: }, tunables=tunable_groups, service=ConfigPersistenceService({}), - global_config={"global_param": "global_value"}, + global_config={ + "global_param": "global_value", + **BUILT_IN_ENV_VAR_DEFAULTS, + }, ) @@ -231,7 +234,10 @@ def nested_composite_env(tunable_groups: TunableGroups) -> CompositeEnv: }, tunables=tunable_groups, service=ConfigPersistenceService({}), - global_config={"global_param": "global_value"}, + global_config={ + "global_param": "global_value", + **BUILT_IN_ENV_VAR_DEFAULTS, + }, ) diff --git a/mlos_bench/mlos_bench/tests/environments/include_tunables_test.py b/mlos_bench/mlos_bench/tests/environments/include_tunables_test.py index 4d54e6aad44..2446983453b 100644 --- a/mlos_bench/mlos_bench/tests/environments/include_tunables_test.py +++ b/mlos_bench/mlos_bench/tests/environments/include_tunables_test.py @@ -50,6 +50,7 @@ def test_two_groups_setup(tunable_groups: TunableGroups) -> None: }, }, tunables=tunable_groups, + global_config=BUILT_IN_ENV_VAR_DEFAULTS, ) expected_params = { "vmSize": "Standard_B4ms", @@ -98,6 +99,7 @@ def test_zero_groups_implicit_setup(tunable_groups: TunableGroups) -> None: }, }, tunables=tunable_groups, + global_config=BUILT_IN_ENV_VAR_DEFAULTS, ) assert env.tunable_params.get_param_values() == {} @@ -137,7 +139,11 @@ def test_loader_level_include() -> None: ] } ) - env = loader.build_environment(config=env_json, tunables=TunableGroups()) + env = loader.build_environment( + config=env_json, + tunables=TunableGroups(), + global_config=BUILT_IN_ENV_VAR_DEFAULTS, + ) expected_params = { "align_va_addr": "on", "idle": "halt", diff --git a/mlos_bench/mlos_bench/tests/environments/test_status.py b/mlos_bench/mlos_bench/tests/environments/test_status.py index 3c0a9bccf3c..785275825c0 100644 --- a/mlos_bench/mlos_bench/tests/environments/test_status.py +++ b/mlos_bench/mlos_bench/tests/environments/test_status.py @@ -51,16 +51,19 @@ def test_status_from_str_valid(input_str: str, expected_status: Status) -> None: Expected Status enum value. """ assert ( - Status.from_str(input_str) == expected_status + Status.parse(input_str) == expected_status ), f"Expected {expected_status} for input: {input_str}" # Check lowercase representation assert ( - Status.from_str(input_str.lower()) == expected_status + Status.parse(input_str.lower()) == expected_status ), f"Expected {expected_status} for input: {input_str.lower()}" + assert ( + Status.parse(expected_status) == expected_status + ), f"Expected {expected_status} for input: {expected_status}" if input_str.isdigit(): # Also test the numeric representation assert ( - Status.from_str(int(input_str)) == expected_status + Status.parse(int(input_str)) == expected_status ), f"Expected {expected_status} for input: {int(input_str)}" @@ -83,7 +86,7 @@ def test_status_from_str_invalid(invalid_input: Any) -> None: input. """ assert ( - Status.from_str(invalid_input) == Status.UNKNOWN + Status.parse(invalid_input) == Status.UNKNOWN ), f"Expected Status.UNKNOWN for invalid input: {invalid_input}" diff --git a/mlos_bench/mlos_bench/tests/optimizers/opt_bulk_register_test.py b/mlos_bench/mlos_bench/tests/optimizers/opt_bulk_register_test.py index df40ddc839a..d15fd399f38 100644 --- a/mlos_bench/mlos_bench/tests/optimizers/opt_bulk_register_test.py +++ b/mlos_bench/mlos_bench/tests/optimizers/opt_bulk_register_test.py @@ -40,7 +40,12 @@ def mock_scores() -> list[dict[str, TunableValue] | None]: @pytest.fixture def mock_status() -> list[Status]: """Mock status values for earlier experiments.""" - return [Status.FAILED, Status.SUCCEEDED, Status.SUCCEEDED, Status.SUCCEEDED] + return [ + Status.FAILED, + Status.SUCCEEDED, + Status.SUCCEEDED, + Status.SUCCEEDED, + ] def _test_opt_update_min( diff --git a/mlos_bench/mlos_bench/tests/schedulers/__init__.py b/mlos_bench/mlos_bench/tests/schedulers/__init__.py new file mode 100644 index 00000000000..4bc0076079f --- /dev/null +++ b/mlos_bench/mlos_bench/tests/schedulers/__init__.py @@ -0,0 +1,5 @@ +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# +"""mlos_bench.tests.schedulers.""" diff --git a/mlos_bench/mlos_bench/tests/schedulers/conftest.py b/mlos_bench/mlos_bench/tests/schedulers/conftest.py new file mode 100644 index 00000000000..e82c9ab7b07 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/schedulers/conftest.py @@ -0,0 +1,178 @@ +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# +"""Pytest fixtures for mlos_bench.schedulers tests.""" +# pylint: disable=redefined-outer-name + +import json + +import pytest + +from mlos_bench.environments.mock_env import MockEnv +from mlos_bench.schedulers.trial_runner import TrialRunner +from mlos_bench.services.config_persistence import ConfigPersistenceService +from mlos_bench.tunables.tunable_groups import TunableGroups + +NUM_TRIAL_RUNNERS = 4 + + +@pytest.fixture +def mock_env_config() -> dict: + """A config for a MockEnv with mock_trial_data.""" + return { + "name": "Test MockEnv With Explicit Mock Trial Data", + "class": "mlos_bench.environments.mock_env.MockEnv", + "config": { + # Reference the covariant groups from the `tunable_groups` fixture. + # See Also: + # - mlos_bench/tests/conftest.py + # - mlos_bench/tests/tunable_groups_fixtures.py + "tunable_params": ["provision", "boot", "kernel"], + "mock_env_seed": -1, + "mock_env_range": [0, 10], + "mock_env_metrics": ["score"], + # TODO: Add more mock trial data here: + "mock_trial_data": { + "1": { + "setup": { + "sleep": 0.1, + }, + "status": { + "sleep": 0.1, + "metrics": { + "color": "orange", + "tps": 8, + }, + }, + "run": { + "sleep": 0.25, + "status": "SUCCEEDED", + "metrics": { + "score": 1.0, + }, + }, + }, + "2": { + "setup": { + "sleep": 0.1, + }, + "status": { + "sleep": 0.1, + "metrics": { + "color": "purple", + "tps": 7, + }, + }, + "run": { + "sleep": 0.3, + "status": "SUCCEEDED", + "metrics": { + "score": 2.0, + }, + }, + }, + "3": { + "setup": { + "sleep": 0.1, + }, + "status": { + "sleep": 0.1, + "metrics": { + "color": "blue", + "tps": 6789, + }, + }, + "run": { + "sleep": 0.2, + "status": "SUCCEEDED", + "metrics": { + "score": 3.0, + }, + }, + }, + "4": { + "setup": { + "sleep": 0.1, + }, + "status": { + "sleep": 0.1, + "metrics": { + "color": "blue", + "tps": 5, + }, + }, + "run": { + "sleep": 0.2, + "status": "SUCCEEDED", + "metrics": { + "score": 2.7, + }, + }, + }, + "5": { + "setup": { + "sleep": 0.1, + }, + "status": { + "sleep": 0.1, + "metrics": {}, + }, + "run": { + "sleep": 0.2, + "status": "SUCCEEDED", + "metrics": { + # Return multiple scores. + "color": "green", + "score": 3.1, + }, + }, + }, + }, + }, + } + + +@pytest.fixture +def mock_env_json_config(mock_env_config: dict) -> str: + """A JSON string of the mock_env_config.""" + return json.dumps(mock_env_config) + + +@pytest.fixture +def mock_env( + mock_env_json_config: str, + tunable_groups: TunableGroups, + mock_env_global_config: dict, +) -> MockEnv: + """A fixture to create a MockEnv instance using the mock_env_json_config.""" + config_loader_service = ConfigPersistenceService() + mock_env = config_loader_service.load_environment( + mock_env_json_config, + tunable_groups, + service=config_loader_service, + global_config=mock_env_global_config, + ) + assert isinstance(mock_env, MockEnv) + return mock_env + + +@pytest.fixture +def trial_runners( + mock_env_json_config: str, + tunable_groups: TunableGroups, + mock_env_global_config: dict, +) -> list[TrialRunner]: + """A fixture to create a list of TrialRunner instances using the + mock_env_json_config. + """ + config_loader_service = ConfigPersistenceService( + global_config=mock_env_global_config, + ) + return TrialRunner.create_from_json( + config_loader=config_loader_service, + env_json=mock_env_json_config, + tunable_groups=tunable_groups, + num_trial_runners=NUM_TRIAL_RUNNERS, + global_config=mock_env_global_config, + ) diff --git a/mlos_bench/mlos_bench/tests/schedulers/test_scheduler.py b/mlos_bench/mlos_bench/tests/schedulers/test_scheduler.py new file mode 100644 index 00000000000..3375bf72527 --- /dev/null +++ b/mlos_bench/mlos_bench/tests/schedulers/test_scheduler.py @@ -0,0 +1,231 @@ +# +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. +# +"""Unit tests for :py:class:`mlos_bench.schedulers` and their internals.""" + +import sys +from logging import warning + +import pytest + +import mlos_bench.tests.optimizers.fixtures as optimizers_fixtures +import mlos_bench.tests.storage.sql.fixtures as sql_storage_fixtures +from mlos_bench.environments.mock_env import MockEnv +from mlos_bench.optimizers.mock_optimizer import MockOptimizer +from mlos_bench.schedulers.base_scheduler import Scheduler +from mlos_bench.schedulers.trial_runner import TrialRunner +from mlos_bench.storage.base_trial_data import TrialData +from mlos_bench.storage.sql.storage import SqlStorage +from mlos_core.tests import get_all_concrete_subclasses + +mock_opt = optimizers_fixtures.mock_opt +sqlite_storage = sql_storage_fixtures.sqlite_storage + +# pylint: disable=redefined-outer-name + +DEBUGGING = False # Set to True to enable debugging output for easier stepping through the code. + + +def create_scheduler( + scheduler_type: type[Scheduler], + trial_runners: list[TrialRunner], + mock_opt: MockOptimizer, + sqlite_storage: SqlStorage, + global_config: dict, +) -> Scheduler: + """Create a Scheduler instance using trial_runners, mock_opt, and sqlite_storage + fixtures. + """ + env = trial_runners[0].environment + assert isinstance(env, MockEnv), "Environment is not a MockEnv instance." + max_trials = max(int(trial_id) for trial_id in env.mock_trial_data.keys()) + max_trials = min(max_trials, mock_opt.max_suggestions) + + return scheduler_type( + config={ + "max_trials": max_trials, + }, + global_config=global_config, + trial_runners=trial_runners, + optimizer=mock_opt, + storage=sqlite_storage, + root_env_config="", + ) + + +def is_subset_of(dict_a: dict, dict_b: dict) -> bool: + """Check if dict_a is a subset of dict_b.""" + return all(item in dict_b.items() for item in dict_a.items()) + + +def mock_opt_has_registered_trial_score( + mock_opt: MockOptimizer, + trial_data: TrialData, +) -> bool: + """Check that the MockOptimizer has registered a given MockTrialData.""" + if not DEBUGGING: + return any( + registered_score.status == trial_data.status + and registered_score.score is not None + and is_subset_of(registered_score.score, trial_data.results_dict) + and registered_score.config.get_param_values() == trial_data.tunable_config.config_dict + for registered_score in mock_opt.registered_scores + ) + # For debugging, we can print the data to examine mismatches. + for registered_score in mock_opt.registered_scores: + if registered_score.status != trial_data.status: + warning( + f"Registered status: {registered_score.status} " + f"!= TrialData status: {trial_data.status}" + ) + continue + # Check if registered_score.score is a subset of trial_data.results_dict + if not ( + registered_score.score is not None + and is_subset_of(registered_score.score, trial_data.results_dict) + ): + warning( + f"Registered score: {registered_score.score} " + f"is not a subset of TrialData results: {trial_data.results_dict}" + ) + continue + if registered_score.config.get_param_values() != trial_data.tunable_config.config_dict: + warning( + f"Registered config: {registered_score.config.get_param_values()} " + f"!= TrialData config: {trial_data.tunable_config.config_dict}" + ) + continue + # Else, found a match! + warning(f"Found matching registered score for trial {trial_data}: {registered_score}") + return True + warning( + f"No matching registered score found for trial {trial_data}. " + f"Registered scores: {mock_opt.registered_scores}" + ) + return False + + +scheduler_classes = get_all_concrete_subclasses( + Scheduler, # type: ignore[type-abstract] + pkg_name="mlos_bench", +) +assert scheduler_classes, "No Scheduler classes found in mlos_bench." + + +@pytest.mark.parametrize( + "scheduler_class", + scheduler_classes, +) +@pytest.mark.skipif( + sys.platform == "win32", + reason="Skipping test on Windows - SQLite storage is not accessible in parallel tests there.", +) +def test_scheduler_with_mock_trial_data( + scheduler_class: type[Scheduler], + trial_runners: list[TrialRunner], + mock_opt: MockOptimizer, + sqlite_storage: SqlStorage, + mock_env_global_config: dict, +) -> None: + """ + Full integration test for Scheduler: runs trials, checks storage, optimizer + registration, and internal bookkeeping. + """ + # pylint: disable=too-many-locals + + # Create the scheduler. + scheduler = create_scheduler( + scheduler_class, + trial_runners, + mock_opt, + sqlite_storage, + mock_env_global_config, + ) + + root_env = scheduler.root_environment + experiment_id = root_env.experiment_id + assert isinstance(root_env, MockEnv), f"Root environment {root_env} is not a MockEnv." + assert root_env.mock_trial_data, "No mock trial data found in root environment." + + # Run the scheduler + with scheduler: + scheduler.start() + scheduler.teardown() + + # Now check the overall results. + ran_trials = {trial.trial_id for trial in scheduler.ran_trials} + assert ( + experiment_id in sqlite_storage.experiments + ), f"Experiment {experiment_id} not found in storage." + exp_data = sqlite_storage.experiments[experiment_id] + + for mock_trial_data in root_env.mock_trial_data.values(): + trial_id = mock_trial_data.trial_id + + # Check the bookkeeping for ran_trials. + assert trial_id in ran_trials, f"Trial {trial_id} not found in Scheduler.ran_trials." + + # Check the results in storage. + assert trial_id in exp_data.trials, f"Trial {trial_id} not found in storage." + trial_data = exp_data.trials[trial_id] + + # Check the results. + metrics = mock_trial_data.run.metrics + if metrics: + for result_key, result_value in metrics.items(): + assert ( + result_key in trial_data.results_dict + ), f"Result {result_key} not found in storage for trial {trial_data}." + assert ( + trial_data.results_dict[result_key] == result_value + ), f"Result value for {result_key} does not match expected value." + # TODO: Should we check the reverse - no extra metrics were registered? + # else: metrics weren't explicit in the mock trial data, so we only + # check that a score was stored for the optimization target, but that's + # good to do regardless + for opt_target in mock_opt.targets: + assert ( + opt_target in trial_data.results_dict + ), f"Result column {opt_target} not found in storage." + assert ( + trial_data.results_dict[opt_target] is not None + ), f"Result value for {opt_target} is None." + + # Check that the appropriate sleeps occurred. + min_trial_time = 0.0 + min_trial_time += mock_trial_data.setup.sleep or 0 + min_trial_time += mock_trial_data.run.sleep or 0 + min_trial_time += mock_trial_data.status.sleep or 0 + min_trial_time += mock_trial_data.teardown.sleep or 0 + assert trial_data.ts_end is not None, f"Trial {trial_id} has no end time." + trial_duration = trial_data.ts_end - trial_data.ts_start + trial_dur_secs = trial_duration.total_seconds() + assert ( + trial_dur_secs >= min_trial_time + ), f"Trial {trial_id} took less time ({trial_dur_secs}) than expected ({min_trial_time}). " + + # Check that the trial status matches what we expected. + assert ( + trial_data.status == mock_trial_data.run.status + ), f"Trial {trial_id} status {trial_data.status} was not {mock_trial_data.run.status}." + + # Check the trial status telemetry. + telemetry_dict = dict( + zip(trial_data.telemetry_df["metric"], trial_data.telemetry_df["value"]) + ) + assert telemetry_dict == mock_trial_data.status.metrics, ( + f"Trial {trial_id} telemetry {telemetry_dict} does not match expected " + f"{mock_trial_data.status.metrics}." + ) + + # Check the optimizer registration. + assert mock_opt_has_registered_trial_score( + mock_opt, + trial_data, + ), f"Trial {trial_id} was not registered in the optimizer." + + # TODO: And check the intermediary results. + # 4. Check the bookkeeping for add_new_optimizer_suggestions and _last_trial_id. + # This last part may require patching and intercepting during the start() + # loop to validate in-progress book keeping instead of just overall. diff --git a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py index e07cf80c70a..a482d070f15 100644 --- a/mlos_bench/mlos_bench/tests/storage/exp_load_test.py +++ b/mlos_bench/mlos_bench/tests/storage/exp_load_test.py @@ -104,12 +104,12 @@ def test_exp_trial_update_categ( [ { "idle": "halt", - "kernel_sched_latency_ns": "2000000", - "kernel_sched_migration_cost_ns": "-1", + "kernel_sched_latency_ns": 2000000, + "kernel_sched_migration_cost_ns": -1, "vmSize": "Standard_B4ms", } ], - [{"score": "99.9", "benchmark": "test"}], + [{"score": 99.9, "benchmark": "test"}], [Status.SUCCEEDED], ) @@ -153,7 +153,7 @@ def test_exp_trial_pending_3( (trial_ids, configs, scores, status) = exp_storage.load() assert trial_ids == [trial_fail.trial_id, trial_succ.trial_id] assert len(configs) == 2 - assert scores == [None, {"score": f"{score}"}] + assert scores == [None, {"score": score}] assert status == [Status.FAILED, Status.SUCCEEDED] assert tunable_groups.copy().assign(configs[0]).reset() == trial_fail.tunables assert tunable_groups.copy().assign(configs[1]).reset() == trial_succ.tunables diff --git a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py index 0bebeeff824..db6dc5fa2e3 100644 --- a/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py +++ b/mlos_bench/mlos_bench/tests/storage/sql/fixtures.py @@ -30,7 +30,7 @@ # pylint: disable=redefined-outer-name -@pytest.fixture +@pytest.fixture(scope="function") def sqlite_storage() -> Generator[SqlStorage]: """ Fixture for file based SQLite storage in a temporary directory. diff --git a/mlos_bench/mlos_bench/tests/storage/trial_config_test.py b/mlos_bench/mlos_bench/tests/storage/trial_config_test.py index 30dabe05d9c..2dbabb92612 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_config_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_config_test.py @@ -22,7 +22,7 @@ def test_exp_trial_pending(exp_storage: Storage.Experiment, tunable_groups: Tuna assert pending.tunables == tunable_groups assert pending.config() == { "location": "westus2", - "num_repeats": "100", + "num_repeats": 100, "experiment_id": "Test-001", "trial_id": trial.trial_id, } diff --git a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py index d4b122ed2bb..a5c25feda7f 100644 --- a/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py +++ b/mlos_bench/mlos_bench/tests/storage/trial_telemetry_test.py @@ -13,7 +13,7 @@ from mlos_bench.storage.base_storage import Storage from mlos_bench.tests import ZONE_INFO from mlos_bench.tunables.tunable_groups import TunableGroups -from mlos_bench.util import nullable +from mlos_bench.util import nullable, try_parse_val # pylint: disable=redefined-outer-name @@ -41,12 +41,12 @@ def zoned_telemetry_data(zone_info: tzinfo | None) -> list[tuple[datetime, str, ) -def _telemetry_str( +def _telemetry_val( data: list[tuple[datetime, str, Any]], -) -> list[tuple[datetime, str, str | None]]: +) -> list[tuple[datetime, str, int | float | str | None]]: """Convert telemetry values to strings.""" # All retrieved timestamps should have been converted to UTC. - return [(ts.astimezone(UTC), key, nullable(str, val)) for (ts, key, val) in data] + return [(ts.astimezone(UTC), key, try_parse_val(val)) for (ts, key, val) in data] @pytest.mark.parametrize(("origin_zone_info"), ZONE_INFO) @@ -62,13 +62,13 @@ def test_update_telemetry( assert exp_storage.load_telemetry(trial.trial_id) == [] trial.update_telemetry(Status.RUNNING, datetime.now(origin_zone_info), telemetry_data) - assert exp_storage.load_telemetry(trial.trial_id) == _telemetry_str(telemetry_data) + assert exp_storage.load_telemetry(trial.trial_id) == _telemetry_val(telemetry_data) # Also check that the TrialData telemetry looks right. trial_data = storage.experiments[exp_storage.experiment_id].trials[trial.trial_id] trial_telemetry_df = trial_data.telemetry_df trial_telemetry_data = [tuple(r) for r in trial_telemetry_df.to_numpy()] - assert _telemetry_str(trial_telemetry_data) == _telemetry_str(telemetry_data) + assert _telemetry_val(trial_telemetry_data) == _telemetry_val(telemetry_data) @pytest.mark.parametrize(("origin_zone_info"), ZONE_INFO) @@ -84,4 +84,4 @@ def test_update_telemetry_twice( trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) trial.update_telemetry(Status.RUNNING, timestamp, telemetry_data) - assert exp_storage.load_telemetry(trial.trial_id) == _telemetry_str(telemetry_data) + assert exp_storage.load_telemetry(trial.trial_id) == _telemetry_val(telemetry_data)