Source code for tstrends.optimization.optimizer

from typing import Any, Optional, Type, Union

from bayes_opt import BayesianOptimization, acquisition

from ..returns_estimation import BaseReturnEstimator
from ..trend_labelling import BaseLabeller

from .optimization_bounds import OptimizationBounds

# Constants for parameter types
INTEGER_PARAMS = ["window_size"]


[docs] class Optimizer: """Bayesian optimization for trend labelling parameters. This class implements Bayesian optimization to find the optimal parameters for trend labelling algorithms. It uses the returns from a returns estimator as the objective function to maximize. Attributes: returns_estimator (BaseReturnEstimator): The returns estimator instance to use. initial_points (int): Number of initial random points for optimization. nb_iter (int): Number of optimization iterations. random_state (int | None): Random seed for reproducibility. _optimizer (BayesianOptimization | None): Internal optimizer instance. Example: >>> from returns_estimation import SimpleReturnEstimator >>> optimizer = Optimizer(SimpleReturnEstimator(), initial_points=5, nb_iter=100) >>> result = optimizer.optimize(BinaryCTL, prices) >>> print(result['params']) # {'omega': 0.005} Note: The optimization process uses Bayesian optimization with Gaussian processes, which is particularly effective for expensive-to-evaluate objective functions with a small number of parameters. """
[docs] def __init__( self, returns_estimator: BaseReturnEstimator, initial_points: int = 10, nb_iter: int = 1_000, random_state: int | None = None, ) -> None: self.returns_estimator = returns_estimator self.initial_points = initial_points self.nb_iter = nb_iter self.random_state = random_state self._optimizer = None
[docs] def get_optimizer(self) -> BayesianOptimization: """ Get the underlying BayesianOptimization object if you need access to the full optimization history. Returns: BayesianOptimization: The optimizer instance with optimization results. Raises: ValueError: If optimize() hasn't been called yet. """ if self._optimizer is None: raise ValueError( "Optimizer not initialized. You need to call the optimize method first." ) return self._optimizer
def _process_parameters( self, params: dict[str, float] ) -> dict[str, Union[int, float]]: """ Process optimization parameters and convert specific parameters to required types. Args: params (dict[str, float]): Raw parameters from the optimizer. Returns: dict[str, Union[int, float]]: Processed parameters with correct types. """ return { key: int(value) if key in INTEGER_PARAMS else value for key, value in params.items() }
[docs] def optimize( self, labeller_class: type[BaseLabeller], time_series_list: list[float] | list[list[float]], bounds: dict[str, tuple[float, float]] | None = None, acquisition_function: acquisition.AcquisitionFunction | None = None, verbose: int | None = 0, ) -> dict[str, dict[str, float] | float]: """ Optimize the trend labelling parameters. Args: labeller_class (type[BaseLabeller]): The trend labeller class to optimize. time_series_list (list[float] | list[list[float]]): Either a single time series list or a list of time series lists to optimize the trend labelling parameters on. bounds (dict[str, tuple[float, float]] | None, optional): The bounds of the parameters to optimize. If not provided, the bounds will be the default bounds. Defaults to None. acquisition_function (acquisition.AcquisitionFunction | None, optional): The acquisition function to use. If not provided, the default acquisition function UpperConfidenceBound(kappa=2) will be used. verbose (int | None, optional): Verbosity level for optimization output. Defaults to 0. Returns: dict[str, dict[str, float] | float]: A dictionary containing: - 'params': Dictionary of optimal parameters - 'target': The maximum target value achieved """ if acquisition_function is None: acquisition_function = acquisition.UpperConfidenceBound( kappa=2, random_state=self.random_state ) bounds = bounds or OptimizationBounds().get_bounds(labeller_class) def objective_function(**params: float) -> float: processed_params = self._process_parameters(params) labeller = labeller_class(**processed_params) if isinstance(time_series_list[0], float): # First element is a float (an not a list) -> Single time series case return self.returns_estimator.estimate_return( time_series_list, # pyright: ignore[reportArgumentType] labeller.get_labels( time_series_list # pyright: ignore[reportArgumentType] ), ) # Multiple time series case total_return = 0.0 for series in time_series_list: total_return += self.returns_estimator.estimate_return( series, # pyright: ignore[reportArgumentType] labeller.get_labels(series), # pyright: ignore[reportArgumentType] ) return total_return self._optimizer = BayesianOptimization( f=objective_function, pbounds=bounds, verbose=verbose or 0, random_state=self.random_state, acquisition_function=acquisition_function, ) self._optimizer.maximize(init_points=self.initial_points, n_iter=self.nb_iter) # Return the optimization results directly return { "params": self._process_parameters( self._optimizer.max[ # pyright: ignore[reportUnknownArgumentType, reportOptionalSubscript] "params" ] ), "target": self._optimizer.max[ # pyright: ignore[reportOptionalSubscript] "target" ], }