# Copyright © 2023, UChicago Argonne, LLC
# All Rights Reserved
"""
A module implementing a unified interface to Scipy Optimization routine.
"""
import re
import os
import warnings
import numpy as np
from scipy import optimize as sp_optimize
from dataclasses import dataclass, field
from typing import Type, Iterable, Callable, Tuple, Sequence
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
import matplotlib.patches as mpatches
from pyoed.configs import (
PyOEDConfigsValidationError,
validate_key,
aggregate_configurations,
set_configurations,
SETTINGS as pyoed_settings,
)
from pyoed import utility
from . import optimization_utils as opt_utils
from .core import (
Optimizer,
OptimizerConfigs,
OptimizerResults,
)
[docs]
@dataclass(kw_only=True, slots=True)
class ScipyOptimizerConfigs(OptimizerConfigs):
"""
Configuration dataclass for the :py:class:`ScipyOptimizer`.
The attributes here mirror the arguments of `scipy.optimize.minimize`.
The documentation of attribute is removed here for clarity, though, we
list them for completeness.
Additionally, we add one extra flag `maximize` to add flexibility to the
optimizer to solve maximization as well as minimization
(simply by flipping the sign of the function and its derivatives)`
:param fun:
:param x0:
:param args:
:param method:
:param jac:
:param hess:
:param hessp:
:param bounds:
:param constraints:
:param tol:
:param callback:
:param options:
:param maximize:
:param maxiter:
"""
fun: Callable | None = None
maximize: bool = False
x0: np.ndarray = field(default_factory=lambda: np.empty(0, dtype=float))
args: Tuple | None = None
method: str = "L-BFGS-B"
jac: Callable | None = None
hess: Callable | None = None
hessp: Callable | None = None
bounds: Sequence | None = None
constraints: Sequence | None = None
tol: float | None = None
callback: Callable | None = None
options: dict | None = field(default_factory=lambda: {})
name: str = "ScipyOptimizer: Optimization using scipy.optimize.minimize"
[docs]
@set_configurations(configurations_class=ScipyOptimizerConfigs)
class ScipyOptimizer(Optimizer):
"""
Unified (with PyOED optimziers) interface to the `scipy.optimize.minimize`
optimization routines.
:param configs: Configurations of :py:class:`ScipyOptimizer`, either as an instance
of :py:class:`ScipyOptimizerConfigs`, a dictionary, or `None`. The default is
`None`, in which case, the default configurations provided by
:py:class:`ScipyOptimizerConfigs` are used.
"""
[docs]
def __init__(self, configs: ScipyOptimizerConfigs | dict | None = None):
configs = self.configurations_class.data_to_dataclass(configs)
super().__init__(configs)
# Post init of configurations
self.configurations.x0 = utility.asarray(self.configurations.x0).flatten()
if self.configurations.options is None:
self.configurations.options = {}
if "disp" not in self.configurations.options:
self.configurations.options.update({"verbose": self.verbose})
# More...
...
[docs]
def validate_configurations(
self,
configs: dict | ScipyOptimizerConfigs,
raise_for_invalid: bool = True,
) -> bool:
"""
Check the passed configuratios and make sure they are conformable with each
other, and with current configurations once combined. This guarantees that any
key-value pair passed in configs can be properly used
.. note::
Here only the locally-defined configurations in :py:class:`ScipyOptimizerConfigs`
are validated. Finally, super classes validators are called.
:param configs: full or partial (subset) configurations to be validated
:param raise_for_invalid: if `True` raise :py:class:`TypeError` for invalid
configrations type/key. Default `True`
:returns: flag indicating whether passed configurations dictionary is valid or not
:raises AttributeError: if any (or a group) of the configurations does not exist
in the model configurations :py:class:`ScipyOptimizerConfigs`.
:raises PyOEDConfigsValidationError: if the configurations are invalid and
`raise_for_invalid` is set to True.
"""
# Fuse configs into current/default configurations
aggregated_configs = aggregate_configurations(
obj=self, configs=configs, configs_class=self.configurations_class
)
## Validate specific entries/configs
is_float = lambda x: utility.isnumber(x) and (x == float(x))
is_float_or_none = lambda x: x is None or is_float(x)
is_callable_or_none = lambda x: x is None or callable(x)
is_sequence = lambda x: utility.isiterable(x)
is_sequence_or_none = lambda x: x is None or is_sequence(x)
is_dict_or_none = lambda x: x is None or isinstance(x, dict)
# Validate `fun`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="fun",
test=callable,
message=f"'fun' must be a callable/function!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `x0`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="x0",
test=lambda x: utility.isiterable(
x,
)
and utility.asarray(x).ndim == 1,
message=f"'x0' must be castable to one-dimensional numpy array!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `args`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="args",
test=is_sequence_or_none,
message=f"'args' must be a sequence; e.g., tuple!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `method`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="method",
test=lambda x: isinstance(x, str),
message=f"'method' must be a string!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `jac`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="jac",
test=is_callable_or_none,
message=f"'jac' must be a callable/function or None!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `hess`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="hess",
test=is_callable_or_none,
message=f"'hess' must be a callable/function or None!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `hessp`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="hessp",
test=is_callable_or_none,
message=f"'hessp' must be a callable/function or None!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `bounds`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="bounds",
test=is_sequence_or_none,
message=f"'bounds' must be a tuple or None!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `constraints`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="constraints",
test=lambda x: is_sequence_or_none(x) or isinstance(x, dict),
message=f"'constraints' must be dict or a sequence of dicts or None!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `callback`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="callback",
test=is_callable_or_none,
message=f"'callback' must be callable or None!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `tol`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="tol",
test=is_float_or_none,
message=f"'tol' must be float or None!",
raise_for_invalid=raise_for_invalid,
):
return False
# Validate `options`
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="options",
test=is_dict_or_none,
message=f"'options' must be dict or None!",
raise_for_invalid=raise_for_invalid,
):
return False
# Check parent for further validation (if any)
return super().validate_configurations(
configs=configs,
raise_for_invalid=raise_for_invalid,
)
[docs]
def update_configurations(self, **kwargs):
"""
Take any set of keyword arguments, and lookup each in
the configurations, and update as nessesary/possible/valid
:raises PyOEDConfigsValidationError: if invalid configurations passed
"""
# Validate and udpate
super().update_configurations(**kwargs)
## Special updates based on specific arguments
# Check if `x0' is passed
if "x0" in kwargs:
self.configurations.x0 = utility.asarray(kwargs["x0"]).flatten()
[docs]
def objective_function_value(
self,
x,
):
"""
Evaluate the objective function `self.configurations.fun` at the passed `x`.
:param x: variable at which to evaluate the objective function value
:returns: value of the objective/utility function at the passed instance `x`
"""
args = self.configurations.args or ()
return self.configurations.fun(x, *args)
[docs]
def objective_function_gradient(
self,
x,
):
"""
Evaluate the gradient of the objective function `self.configurations.jac`
at the passed `x`.
If `jac` is not registered (i.e., `None`), finite difference approximation
of the gradient is returned.
:param x: variable at which to evaluate the objective function value
:returns: value of the objective/utility function at the passed instance `x`
"""
x = utility.asarray(x)
bounds = opt_utils.standardize_domain_bounds(
bounds=self.configurations.bounds,
size=x.size,
)
args = self.configurations.args or ()
if self.configurations.jac is None:
jac = utility.finite_difference_gradient(
fun=lambda a: self.configurations.jac(a, *args),
state=x,
bounds=bounds,
verbose=self.verbose,
)
else:
jac = self.configurations.jac(x, *args)
return jac
[docs]
def solve(
self,
x0=None,
):
"""
Start solving the optimization problem using scipy optimizer with registered
configurations.
:param x0: `None or ``iterable`` to be used as the initial guess.
If `None`, the registered `x0` in the configurations is used.
:returns: an instance of `ScipyOptimizerResults`.
:raises: :py:class:`TypeError` if the initial policy has wrong size or any of
the values fall outside the interval [0, 1]
"""
# Extract configurations (avoid copying)
maximize = self.configurations.maximize
# Objective function, jacobian, and hessian
if maximize:
# Flip sign(s)
fun = lambda x, *args: -self.configurations.fun(x, *args)
if self.configurations.jac is not None:
jac = lambda x, *args: -self.configurations.jac(x, *args)
else:
jac = None
if self.configurations.hess is not None:
hess = lambda x, *args: -self.configurations.hess(x, *args)
else:
hess = None
if self.configurations.hessp is not None:
hessp = lambda x, *args: -self.configurations.hessp(x, *args)
else:
hessp = None
else:
fun = self.configurations.fun
jac = self.configurations.jac
hess = self.configurations.hess
hessp = self.configurations.hessp
if x0 is None:
if self.configurations.x0.size == 0:
raise TypeError(
f"Invalid initial solution {x0}. "
f"The initial solution `x0` in configurations is not "
f"proper (it has size 0) which indicates no proper initial "
f"solution is available"
)
else:
x0 = self.configurations.x0
else:
x0 = utility.asarray(x0)
# Extract configurations
args = self.configurations.args
method = self.configurations.method
constraints = self.configurations.constraints
tol = self.configurations.tol
options = self.configurations.options
# NOTE: callback in configs is overriden# callback = self.configurations.callback
# Trajectory tracker
optimization_trajectory = []
optimization_trajectory_objval = []
def callback(x):
"""Local callback"""
optimization_trajectory.append(x)
optimization_trajectory_objval.append(self.objective_function_value(x))
if self.configurations.callback is not None:
return self.configurations.callback(x)
# Cast bounds and arguments properly
bounds = opt_utils.standardize_domain_bounds(
bounds=self.configurations.bounds,
size=x0.size,
)
if args is None:
args = ()
try:
res = sp_optimize.minimize(
fun=fun,
x0=x0,
args=args,
method=method,
jac=jac,
hess=hess,
hessp=hessp,
bounds=bounds,
constraints=constraints,
tol=tol,
callback=callback,
options=options,
)
except Exception as err:
print(
f"Tried solving the optimization problem by using scipy.optimize.minimize "
f"but failed!\n"
f"Trace the error below for details...\n"
f"Unexpected {err=} of {type(err)=}"
)
raise
# Postprocessing... (crete Results object)
...
results = ScipyOptimizerResults(
optimization_results=res,
configurations=self.configurations.asdict(
deep=True,
ignore_error=True,
),
optimization_trajectory=optimization_trajectory,
optimization_trajectory_objval=optimization_trajectory_objval,
)
return results
[docs]
def plot_results(
self,
results,
overwrite=False,
bruteforce=False,
fun_symbol=r"U",
num_active=None,
output_dir=None,
keep_plots=False,
fontsize=22,
line_width=3,
usetex=True,
show_axis_grids=True,
axis_grids_alpha=(0.25, 0.4),
plots_format="pdf",
**kwargs,
):
"""
Given the results returned by :py:meth:`solve`, visualize optimization results.
This function, creates the following plots:
- ...
.. note::
Adding additional `kwargs` to enable high flexibility in calling
this method.
:returns: a dictionary with entries/keys:
- `figures` a dictionary indexed by plot identifier with value set to
the corresponding figure handle (or `None` if `keep_plots` is set to False)
- `stats`: a dictionary holding statistics
- `extras`:
:raises: :py:class:`TypeError` if the results dictionary is invalid
"""
if isinstance(results, ScipyOptimizerResults):
results = results.asdict()
elif isinstance(results, dict):
pass
else:
raise TypeError(
f"Expected results to be dictionary or `BinaryReinforceOptimizerResults`; "
f"received {results=} of {type(results)=}!"
)
if len(kwargs) > 0:
print(
f"Unknown keyword arguments encountered in "
f"{self.__class__}.plot_results:\n{kwargs}\n"
f"These argments/keywords are discarded"
)
## Extract mandatory entries from the results dictionary
try:
# optimization_trajectory = results['optimization_trajectory']
optimization_trajectory_objval = results["optimization_trajectory_objval"]
except Exception as err:
raise TypeError(
f"Failed to extract entries from the passed `results`\n"
f"Passed results of {type(results)=}\nTrace error below!\n"
f"Unexpected {err=} of {type(err)=}"
)
line_colors = [c["color"] for c in plt.rcParams["axes.prop_cycle"]]
utility.plots_enhancer(fontsize=fontsize, usetex=usetex)
## File naming
optname = f"ScipyOptimizer"
plots_format = plots_format.strip(" .").lower()
#
opt_iter_filename = f"Optimization_Iterations.{plots_format}"
# Initialize figures dictionary
figures = dict()
if output_dir is None:
output_dir = self.configurations.output_dir
if not os.path.isdir(output_dir):
os.makedirs(output_dir)
saveto = os.path.join(output_dir, opt_iter_filename)
if os.path.isfile(saveto) and not overwrite:
print(f"Plot exists; skipping: {saveto}")
else:
fig = plt.figure(figsize=(14, 6))
ax = fig.add_subplot(111)
ax.plot(
np.arange(len(optimization_trajectory_objval)),
optimization_trajectory_objval,
"-",
linewidth=line_width * 1.5,
color=line_colors[0],
markersize=10,
markeredgewidth=3,
label=rf"${fun_symbol}$" if fun_symbol is not None else None,
)
ax.xaxis.set_major_locator(MaxNLocator(integer=True))
ax.set_xlabel(r"Optimization Iteration")
ax.set_ylabel(r"Objective Value")
# Show Axis Grids
if show_axis_grids:
utility.show_axis_grids(
ax=ax,
major_alpha=axis_grids_alpha[0],
minor_alpha=axis_grids_alpha[1],
)
# Add legend
ax.legend(
bbox_to_anchor=(0, 1.02, 1, 0.2),
loc="lower left",
mode="expand",
borderaxespad=0,
ncol=3,
)
# Update figures dictionary
figures.update(
{
saveto.rstrip(f".{plots_format}").split(os.path.sep)[-1]: fig,
}
)
# save plot
if self.verbose:
print("Saving/Plotting Results to: {0}".format(saveto))
fig.savefig(saveto, dpi=600, bbox_inches="tight")
# TODO: Consider adding more plots (optimization trajectory, etc.)
...
# Finalize return object
if not keep_plots:
for key, fig in figures.items():
plt.close(fig)
# figures[key] = None
# Create results dictionary
plot_results = {"figures": figures, "stats": {}, "extras": {}}
return plot_results
@property
def size(self):
"""
Dimension/size of the optimziation variable.
"""
return self.configurations.x0.size
[docs]
@dataclass(kw_only=True, slots=True)
class ScipyOptimizerResults(OptimizerResults):
"""
Container for results/data generated by the :py:class:`ScipyOptimizer`.
:param optimization_results: results object `OptimizerResults` returned from
the scipy optimizer.
:param configurations: a dictionary holding problem configurations. This can be obtained by
calling :py:meth:`asdict()` of the configurations object used by the optimizer, e.g.,
:py:class:`ScipyOptimizerConfigs`.
:param optimization_trajectory: the optimization trajectory (variable over iterations)
:param optimization_trajectory_objval: the objective values corresopnding to points on
the optimization trajectory
"""
optimization_results: OptimizerResults | None = None
configurations: None | dict = None
optimization_trajectory: None | Sequence[np.ndarray] = None
optimization_trajectory_objval: None | Sequence[float] = None
@property
def x(self) -> np.ndarray:
"""The optimal solution of the optimization problem."""
return self.optimization_results.x
@property
def success(self) -> bool:
"""Whether or not the optimizer exited successfully."""
return self.optimization_results.success
@property
def message(self) -> str:
"""Description of the cause of the termination."""
return self.optimization_results.message
@property
def fun(self) -> float:
"""Value of objective function at the optimal solution `x`."""
return self.optimization_results.fun
@property
def jac(self) -> np.ndarray:
"""Value of the Jacobian of the objective function at the optimal solution `x` (if available)."""
return self.optimization_results.jac
@property
def hess(self) -> object:
"""Value of the Hessian of the objective function at the optimal solution `x` (if available)."""
return self.optimization_results.hess
@property
def hess_inv(self) -> object:
"""Value of the inverse of Hessian of the objective function at the optimal solution `x` (if available)."""
return self.optimization_results.hess_inv
@property
def nfev(self) -> int:
"""Number of evaluations of the objective functions."""
return self.optimization_results.nfev
@property
def njev(self) -> int:
"""Number of evaluations of the the Jacobian of the objective function."""
return self.optimization_results.njev
@property
def nhev(self) -> int:
"""Number of evaluations of the the Hessian of the objective function."""
return self.optimization_results.nhev
@property
def nit(self) -> int:
"""Number of iterations performed by the optimizer."""
return self.optimization_results.nit