Source code for pyoed.optimization.scipy_optimization

# Copyright © 2023, UChicago Argonne, LLC
# All Rights Reserved


"""
A module implementing a unified interface to Scipy Optimization routine.
"""

import re
import os
import warnings
import numpy as np
from scipy import optimize as sp_optimize
from dataclasses import dataclass, field
from typing import Type, Iterable, Callable, Tuple, Sequence

import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import matplotlib.patches as mpatches

from pyoed.configs import (
    PyOEDConfigsValidationError,
    validate_key,
    aggregate_configurations,
    set_configurations,
    SETTINGS as pyoed_settings,
)
from pyoed import utility
from . import optimization_utils as opt_utils
from .core import (
    Optimizer,
    OptimizerConfigs,
    OptimizerResults,
)


[docs] @dataclass(kw_only=True, slots=True) class ScipyOptimizerConfigs(OptimizerConfigs): """ Configuration dataclass for the :py:class:`ScipyOptimizer`. The attributes here mirror the arguments of `scipy.optimize.minimize`. The documentation of attribute is removed here for clarity, though, we list them for completeness. Additionally, we add one extra flag `maximize` to add flexibility to the optimizer to solve maximization as well as minimization (simply by flipping the sign of the function and its derivatives)` :param fun: :param x0: :param args: :param method: :param jac: :param hess: :param hessp: :param bounds: :param constraints: :param tol: :param callback: :param options: :param maximize: :param maxiter: """ fun: Callable | None = None maximize: bool = False x0: np.ndarray = field(default_factory=lambda: np.empty(0, dtype=float)) args: Tuple | None = None method: str = "L-BFGS-B" jac: Callable | None = None hess: Callable | None = None hessp: Callable | None = None bounds: Sequence | None = None constraints: Sequence | None = None tol: float | None = None callback: Callable | None = None options: dict | None = field(default_factory=lambda: {}) name: str = "ScipyOptimizer: Optimization using scipy.optimize.minimize"
[docs] @set_configurations(configurations_class=ScipyOptimizerConfigs) class ScipyOptimizer(Optimizer): """ Unified (with PyOED optimziers) interface to the `scipy.optimize.minimize` optimization routines. :param configs: Configurations of :py:class:`ScipyOptimizer`, either as an instance of :py:class:`ScipyOptimizerConfigs`, a dictionary, or `None`. The default is `None`, in which case, the default configurations provided by :py:class:`ScipyOptimizerConfigs` are used. """
[docs] def __init__(self, configs: ScipyOptimizerConfigs | dict | None = None): configs = self.configurations_class.data_to_dataclass(configs) super().__init__(configs) # Post init of configurations self.configurations.x0 = utility.asarray(self.configurations.x0).flatten() if self.configurations.options is None: self.configurations.options = {} if "disp" not in self.configurations.options: self.configurations.options.update({"verbose": self.verbose}) # More... ...
[docs] def validate_configurations( self, configs: dict | ScipyOptimizerConfigs, raise_for_invalid: bool = True, ) -> bool: """ Check the passed configuratios and make sure they are conformable with each other, and with current configurations once combined. This guarantees that any key-value pair passed in configs can be properly used .. note:: Here only the locally-defined configurations in :py:class:`ScipyOptimizerConfigs` are validated. Finally, super classes validators are called. :param configs: full or partial (subset) configurations to be validated :param raise_for_invalid: if `True` raise :py:class:`TypeError` for invalid configrations type/key. Default `True` :returns: flag indicating whether passed configurations dictionary is valid or not :raises AttributeError: if any (or a group) of the configurations does not exist in the model configurations :py:class:`ScipyOptimizerConfigs`. :raises PyOEDConfigsValidationError: if the configurations are invalid and `raise_for_invalid` is set to True. """ # Fuse configs into current/default configurations aggregated_configs = aggregate_configurations( obj=self, configs=configs, configs_class=self.configurations_class ) ## Validate specific entries/configs is_float = lambda x: utility.isnumber(x) and (x == float(x)) is_float_or_none = lambda x: x is None or is_float(x) is_callable_or_none = lambda x: x is None or callable(x) is_sequence = lambda x: utility.isiterable(x) is_sequence_or_none = lambda x: x is None or is_sequence(x) is_dict_or_none = lambda x: x is None or isinstance(x, dict) # Validate `fun` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="fun", test=callable, message=f"'fun' must be a callable/function!", raise_for_invalid=raise_for_invalid, ): return False # Validate `x0` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="x0", test=lambda x: utility.isiterable( x, ) and utility.asarray(x).ndim == 1, message=f"'x0' must be castable to one-dimensional numpy array!", raise_for_invalid=raise_for_invalid, ): return False # Validate `args` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="args", test=is_sequence_or_none, message=f"'args' must be a sequence; e.g., tuple!", raise_for_invalid=raise_for_invalid, ): return False # Validate `method` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="method", test=lambda x: isinstance(x, str), message=f"'method' must be a string!", raise_for_invalid=raise_for_invalid, ): return False # Validate `jac` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="jac", test=is_callable_or_none, message=f"'jac' must be a callable/function or None!", raise_for_invalid=raise_for_invalid, ): return False # Validate `hess` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="hess", test=is_callable_or_none, message=f"'hess' must be a callable/function or None!", raise_for_invalid=raise_for_invalid, ): return False # Validate `hessp` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="hessp", test=is_callable_or_none, message=f"'hessp' must be a callable/function or None!", raise_for_invalid=raise_for_invalid, ): return False # Validate `bounds` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="bounds", test=is_sequence_or_none, message=f"'bounds' must be a tuple or None!", raise_for_invalid=raise_for_invalid, ): return False # Validate `constraints` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="constraints", test=lambda x: is_sequence_or_none(x) or isinstance(x, dict), message=f"'constraints' must be dict or a sequence of dicts or None!", raise_for_invalid=raise_for_invalid, ): return False # Validate `callback` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="callback", test=is_callable_or_none, message=f"'callback' must be callable or None!", raise_for_invalid=raise_for_invalid, ): return False # Validate `tol` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="tol", test=is_float_or_none, message=f"'tol' must be float or None!", raise_for_invalid=raise_for_invalid, ): return False # Validate `options` if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="options", test=is_dict_or_none, message=f"'options' must be dict or None!", raise_for_invalid=raise_for_invalid, ): return False # Check parent for further validation (if any) return super().validate_configurations( configs=configs, raise_for_invalid=raise_for_invalid, )
[docs] def update_configurations(self, **kwargs): """ Take any set of keyword arguments, and lookup each in the configurations, and update as nessesary/possible/valid :raises PyOEDConfigsValidationError: if invalid configurations passed """ # Validate and udpate super().update_configurations(**kwargs) ## Special updates based on specific arguments # Check if `x0' is passed if "x0" in kwargs: self.configurations.x0 = utility.asarray(kwargs["x0"]).flatten()
[docs] def objective_function_value( self, x, ): """ Evaluate the objective function `self.configurations.fun` at the passed `x`. :param x: variable at which to evaluate the objective function value :returns: value of the objective/utility function at the passed instance `x` """ args = self.configurations.args or () return self.configurations.fun(x, *args)
[docs] def objective_function_gradient( self, x, ): """ Evaluate the gradient of the objective function `self.configurations.jac` at the passed `x`. If `jac` is not registered (i.e., `None`), finite difference approximation of the gradient is returned. :param x: variable at which to evaluate the objective function value :returns: value of the objective/utility function at the passed instance `x` """ x = utility.asarray(x) bounds = opt_utils.standardize_domain_bounds( bounds=self.configurations.bounds, size=x.size, ) args = self.configurations.args or () if self.configurations.jac is None: jac = utility.finite_difference_gradient( fun=lambda a: self.configurations.jac(a, *args), state=x, bounds=bounds, verbose=self.verbose, ) else: jac = self.configurations.jac(x, *args) return jac
[docs] def solve( self, x0=None, ): """ Start solving the optimization problem using scipy optimizer with registered configurations. :param x0: `None or ``iterable`` to be used as the initial guess. If `None`, the registered `x0` in the configurations is used. :returns: an instance of `ScipyOptimizerResults`. :raises: :py:class:`TypeError` if the initial policy has wrong size or any of the values fall outside the interval [0, 1] """ # Extract configurations (avoid copying) maximize = self.configurations.maximize # Objective function, jacobian, and hessian if maximize: # Flip sign(s) fun = lambda x, *args: -self.configurations.fun(x, *args) if self.configurations.jac is not None: jac = lambda x, *args: -self.configurations.jac(x, *args) else: jac = None if self.configurations.hess is not None: hess = lambda x, *args: -self.configurations.hess(x, *args) else: hess = None if self.configurations.hessp is not None: hessp = lambda x, *args: -self.configurations.hessp(x, *args) else: hessp = None else: fun = self.configurations.fun jac = self.configurations.jac hess = self.configurations.hess hessp = self.configurations.hessp if x0 is None: if self.configurations.x0.size == 0: raise TypeError( f"Invalid initial solution {x0}. " f"The initial solution `x0` in configurations is not " f"proper (it has size 0) which indicates no proper initial " f"solution is available" ) else: x0 = self.configurations.x0 else: x0 = utility.asarray(x0) # Extract configurations args = self.configurations.args method = self.configurations.method constraints = self.configurations.constraints tol = self.configurations.tol options = self.configurations.options # NOTE: callback in configs is overriden# callback = self.configurations.callback # Trajectory tracker optimization_trajectory = [] optimization_trajectory_objval = [] def callback(x): """Local callback""" optimization_trajectory.append(x) optimization_trajectory_objval.append(self.objective_function_value(x)) if self.configurations.callback is not None: return self.configurations.callback(x) # Cast bounds and arguments properly bounds = opt_utils.standardize_domain_bounds( bounds=self.configurations.bounds, size=x0.size, ) if args is None: args = () try: res = sp_optimize.minimize( fun=fun, x0=x0, args=args, method=method, jac=jac, hess=hess, hessp=hessp, bounds=bounds, constraints=constraints, tol=tol, callback=callback, options=options, ) except Exception as err: print( f"Tried solving the optimization problem by using scipy.optimize.minimize " f"but failed!\n" f"Trace the error below for details...\n" f"Unexpected {err=} of {type(err)=}" ) raise # Postprocessing... (crete Results object) ... results = ScipyOptimizerResults( optimization_results=res, configurations=self.configurations.asdict( deep=True, ignore_error=True, ), optimization_trajectory=optimization_trajectory, optimization_trajectory_objval=optimization_trajectory_objval, ) return results
[docs] def plot_results( self, results, overwrite=False, bruteforce=False, fun_symbol=r"U", num_active=None, output_dir=None, keep_plots=False, fontsize=22, line_width=3, usetex=True, show_axis_grids=True, axis_grids_alpha=(0.25, 0.4), plots_format="pdf", **kwargs, ): """ Given the results returned by :py:meth:`solve`, visualize optimization results. This function, creates the following plots: - ... .. note:: Adding additional `kwargs` to enable high flexibility in calling this method. :returns: a dictionary with entries/keys: - `figures` a dictionary indexed by plot identifier with value set to the corresponding figure handle (or `None` if `keep_plots` is set to False) - `stats`: a dictionary holding statistics - `extras`: :raises: :py:class:`TypeError` if the results dictionary is invalid """ if isinstance(results, ScipyOptimizerResults): results = results.asdict() elif isinstance(results, dict): pass else: raise TypeError( f"Expected results to be dictionary or `BinaryReinforceOptimizerResults`; " f"received {results=} of {type(results)=}!" ) if len(kwargs) > 0: print( f"Unknown keyword arguments encountered in " f"{self.__class__}.plot_results:\n{kwargs}\n" f"These argments/keywords are discarded" ) ## Extract mandatory entries from the results dictionary try: # optimization_trajectory = results['optimization_trajectory'] optimization_trajectory_objval = results["optimization_trajectory_objval"] except Exception as err: raise TypeError( f"Failed to extract entries from the passed `results`\n" f"Passed results of {type(results)=}\nTrace error below!\n" f"Unexpected {err=} of {type(err)=}" ) line_colors = [c["color"] for c in plt.rcParams["axes.prop_cycle"]] utility.plots_enhancer(fontsize=fontsize, usetex=usetex) ## File naming optname = f"ScipyOptimizer" plots_format = plots_format.strip(" .").lower() # opt_iter_filename = f"Optimization_Iterations.{plots_format}" # Initialize figures dictionary figures = dict() if output_dir is None: output_dir = self.configurations.output_dir if not os.path.isdir(output_dir): os.makedirs(output_dir) saveto = os.path.join(output_dir, opt_iter_filename) if os.path.isfile(saveto) and not overwrite: print(f"Plot exists; skipping: {saveto}") else: fig = plt.figure(figsize=(14, 6)) ax = fig.add_subplot(111) ax.plot( np.arange(len(optimization_trajectory_objval)), optimization_trajectory_objval, "-", linewidth=line_width * 1.5, color=line_colors[0], markersize=10, markeredgewidth=3, label=rf"${fun_symbol}$" if fun_symbol is not None else None, ) ax.xaxis.set_major_locator(MaxNLocator(integer=True)) ax.set_xlabel(r"Optimization Iteration") ax.set_ylabel(r"Objective Value") # Show Axis Grids if show_axis_grids: utility.show_axis_grids( ax=ax, major_alpha=axis_grids_alpha[0], minor_alpha=axis_grids_alpha[1], ) # Add legend ax.legend( bbox_to_anchor=(0, 1.02, 1, 0.2), loc="lower left", mode="expand", borderaxespad=0, ncol=3, ) # Update figures dictionary figures.update( { saveto.rstrip(f".{plots_format}").split(os.path.sep)[-1]: fig, } ) # save plot if self.verbose: print("Saving/Plotting Results to: {0}".format(saveto)) fig.savefig(saveto, dpi=600, bbox_inches="tight") # TODO: Consider adding more plots (optimization trajectory, etc.) ... # Finalize return object if not keep_plots: for key, fig in figures.items(): plt.close(fig) # figures[key] = None # Create results dictionary plot_results = {"figures": figures, "stats": {}, "extras": {}} return plot_results
@property def size(self): """ Dimension/size of the optimziation variable. """ return self.configurations.x0.size
[docs] @dataclass(kw_only=True, slots=True) class ScipyOptimizerResults(OptimizerResults): """ Container for results/data generated by the :py:class:`ScipyOptimizer`. :param optimization_results: results object `OptimizerResults` returned from the scipy optimizer. :param configurations: a dictionary holding problem configurations. This can be obtained by calling :py:meth:`asdict()` of the configurations object used by the optimizer, e.g., :py:class:`ScipyOptimizerConfigs`. :param optimization_trajectory: the optimization trajectory (variable over iterations) :param optimization_trajectory_objval: the objective values corresopnding to points on the optimization trajectory """ optimization_results: OptimizerResults | None = None configurations: None | dict = None optimization_trajectory: None | Sequence[np.ndarray] = None optimization_trajectory_objval: None | Sequence[float] = None @property def x(self) -> np.ndarray: """The optimal solution of the optimization problem.""" return self.optimization_results.x @property def success(self) -> bool: """Whether or not the optimizer exited successfully.""" return self.optimization_results.success @property def message(self) -> str: """Description of the cause of the termination.""" return self.optimization_results.message @property def fun(self) -> float: """Value of objective function at the optimal solution `x`.""" return self.optimization_results.fun @property def jac(self) -> np.ndarray: """Value of the Jacobian of the objective function at the optimal solution `x` (if available).""" return self.optimization_results.jac @property def hess(self) -> object: """Value of the Hessian of the objective function at the optimal solution `x` (if available).""" return self.optimization_results.hess @property def hess_inv(self) -> object: """Value of the inverse of Hessian of the objective function at the optimal solution `x` (if available).""" return self.optimization_results.hess_inv @property def nfev(self) -> int: """Number of evaluations of the objective functions.""" return self.optimization_results.nfev @property def njev(self) -> int: """Number of evaluations of the the Jacobian of the objective function.""" return self.optimization_results.njev @property def nhev(self) -> int: """Number of evaluations of the the Hessian of the objective function.""" return self.optimization_results.nhev @property def nit(self) -> int: """Number of iterations performed by the optimizer.""" return self.optimization_results.nit