Source code for boa.scheduler

from __future__ import annotations

from pprint import pformat
from typing import Iterable, Optional

from ax.core.optimization_config import OptimizationConfig
from ax.service.scheduler import Scheduler as AxScheduler

from boa.logger import get_logger
from boa.runner import WrappedJobRunner
from boa.wrappers.base_wrapper import BaseWrapper

logger = get_logger()


[docs]class Scheduler(AxScheduler): runner: WrappedJobRunner scheduler_filepath: str = "scheduler.json" opt_filepath: str = "optimization.csv" @property def wrapper(self) -> BaseWrapper: return self.runner.wrapper
[docs] def report_results(self, force_refit: bool = False): """ Ran whenever a batch of data comes in and the results are ready. This could be from one trial or a group of trials at once since it does interval polls to check trial statuses. saves the scheduler to json and saves to the log a status update of what trials have finished, which are running, and what generation step will be used to generate the next trials. Args: force_refit: Not used. Arg from Ax for compatibility. """ self.save_data() try: trials = self.best_raw_trials() best_trial_map = {idx: trial_dict["means"] for idx, trial_dict in trials.items()} if trials else {} best_trial_str = f"\nBest trial so far: {pformat(best_trial_map)}" except Exception as e: # pragma: no cover best_trial_str = "" logger.exception(e) update = ( f"Trials so far: {len(self.experiment.trials)}" f"\nRunning trials: {', '.join(str(t.index) for t in self.running_trials)}" f"\nWill Produce next trials from generation step: {self.generation_strategy.current_step.model_name}" f"{best_trial_str}" ) logger.info(update)
[docs] def best_fitted_trials( self, optimization_config: Optional[OptimizationConfig] = None, trial_indices: Optional[Iterable[int]] = None, use_model_predictions: bool = True, *args, **kwargs, ) -> dict: """Identifies and fit the best parameterizations tried in the experiment so far, this model predictions (fitting) if ``use_model_predictions`` is true and using observed raw values from the experiment otherwise. By default, uses model predictions to account for observation noise. If it is a Multi Objective Problem, then it will return the pareto front, a collection of trials that are the best front that min/maxes the objectives. Else it is the best point that min/maxes the objective. NOTE: The format of this method's output is as follows: { trial_index: {params: best parameters, means: dict of metrics by nam, cov_matrix: dict of cov matrix} }, Args: optimization_config: Optimization config to use in place of the one stored on the experiment. trial_indices: Indices of trials for which to retrieve data. If None will retrieve data from all available trials. use_model_predictions: Whether to extract the Pareto frontier using model predictions or directly observed values. If ``True``, the metric means and covariances in this method's output will also be based on model predictions and may differ from the observed values. Returns: ``None`` if it was not possible to extract the best trial or best Pareto frontier, otherwise a mapping from trial index to the tuple of: - the parameterization of the arm in that trial, - two-item tuple of metric means dictionary and covariance matrix (model-predicted if ``use_model_predictions=True`` and observed otherwise). """ trials = None if self.experiment.is_moo_problem: try: trials = self.get_pareto_optimal_parameters( optimization_config=optimization_config, trial_indices=trial_indices, use_model_predictions=use_model_predictions, *args, **kwargs, ) if trials: trials = { idx: dict(params=trial_tup[0], means=trial_tup[1][0], cov_matrix=trial_tup[1][1]) for idx, trial_tup in trials.items() } except (TypeError, ValueError) as e: # pragma: no cover # If get_pareto doesn't work because of the gen_step not supporting multi obj # then we log to the user that problem logger.warning( "Problem generating best fitted trials for pareto frontier. most likely cause" " is the generation step model/acquisition function is not intended for" f" multi objective optimizations. Exception: {e!r}" ) else: trials = self.get_best_trial( optimization_config=optimization_config, trial_indices=trial_indices, use_model_predictions=use_model_predictions, *args, **kwargs, ) if trials: best_trial, best_params, (means_dict, cov_matrix) = self.get_best_trial( optimization_config=optimization_config, trial_indices=trial_indices, use_model_predictions=use_model_predictions, *args, **kwargs, ) trials = {int(best_trial): dict(params=best_params, means=means_dict, cov_matrix=cov_matrix)} return trials
[docs] def best_raw_trials( self, optimization_config: Optional[OptimizationConfig] = None, trial_indices: Optional[Iterable[int]] = None, use_model_predictions: bool = False, *args, **kwargs, ) -> dict: """Identifies the best parameterizations tried in the experiment so far using the raw points themselves. If it is a Multi Objective Problem, then it will return the pareto front, a collection of trials that are the best front that min/maxes the objectives. Else it is the best point that min/maxes the objective. NOTE: The format of this method's output is as follows: { trial_index: {params: best parameters, means: dict of metrics by nam, cov_matrix: dict of cov matrix} }, Args: optimization_config: Optimization config to use in place of the one stored on the experiment. trial_indices: Indices of trials for which to retrieve data. If None will retrieve data from all available trials. use_model_predictions: Whether to extract the Pareto frontier using model predictions or directly observed values. If ``True``, the metric means and covariances in this method's output will also be based on model predictions and may differ from the observed values. Returns: ``None`` if it was not possible to extract the best trial or best Pareto frontier, otherwise a mapping from trial index to the tuple of: - the parameterization of the arm in that trial, - two-item tuple of metric means dictionary and covariance matrix (model-predicted if ``use_model_predictions=True`` and observed otherwise). """ trials = None if self.experiment.is_moo_problem: try: trials = self.get_pareto_optimal_parameters( optimization_config=optimization_config, trial_indices=trial_indices, use_model_predictions=use_model_predictions, *args, **kwargs, ) if trials: trials = { idx: dict(params=trial_tup[0], means=trial_tup[1][0], cov_matrix=trial_tup[1][1]) for idx, trial_tup in trials.items() } except (TypeError, ValueError) as e: # pragma: no cover # If get_pareto doesn't work because of the gen_step not supporting multi obj # then we log to the user that problem logger.warning( "Problem generating best fitted trials for pareto frontier. most likely cause" " is the generation step model/acquisition function is not intended for" f" multi objective optimizations. Exception: {e!r}" ) else: trials = self.get_best_trial( optimization_config=optimization_config, trial_indices=trial_indices, use_model_predictions=use_model_predictions, *args, **kwargs, ) if trials: best_trial, best_params, (means_dict, cov_matrix) = self.get_best_trial( optimization_config=optimization_config, trial_indices=trial_indices, use_model_predictions=use_model_predictions, *args, **kwargs, ) trials = {int(best_trial): dict(params=best_params, means=means_dict, cov_matrix=cov_matrix)} return trials
[docs] def save_data(self, **kwargs): """Save Scheduler to json file. Defaults to `wrapper.experiment_dir` / `filepath`""" from boa.storage import dump_scheduler_data try: dump_scheduler_data( scheduler=self, dir_=self.runner.wrapper.experiment_dir, scheduler_filepath=self.scheduler_filepath, opt_filepath=self.opt_filepath, **kwargs, ) except Exception as e: logger.exception("failed to save scheduler to json! Reason: %s" % repr(e))