# Copyright © 2023, UChicago Argonne, LLC
# All Rights Reserved
"""
This module provides implementations of utility functions valid for trajectory OED
Eventually, this can be moved into the OED subpackage under utility_functions
"""
import warnings
from dataclasses import dataclass
import numpy as np
from scipy import sparse as sp
from scipy.sparse import linalg as splinalg
from scipy.sparse.linalg import LinearOperator
from pyoed import utility
from pyoed.models.error_models.Gaussian import (kl_divergence, GaussianErrorModel, isGaussian)
from pyoed.configs import (
validate_key,
set_configurations,
PyOEDConfigsValidationError,
)
from pyoed.assimilation import (
InverseProblem,
Smoother,
assimilation_utils,
)
from pyoed.assimilation.extras.reduced_order_modeling import (SVDReducedHessian)
from pyoed.oed.core import (
CriterionConfigs,
Criterion,
)
[docs]
@dataclass(kw_only=True, slots=True)
class TrajectoryUtilityFunctionConfigs(CriterionConfigs):
"""
Configuration class for :py:class:`TrajectoryUtilityFunction`.
:param inverse_problem: an instance of an inverse problem :py:class:`Smoother`.
:param criterion: name of the OED criterion to use as the objective (utility) function.
:param sensor_size: number of sensors to use/activate around the trajectory state
in order to build the observation operator.
:param navigation_grid: the navigation grid.
This could be the same as the observation operator (if `None`)
(base) grid that defines all candidate locations.
The navigation grid, however could be different from the observation grid, thus
it must be passed here.
"""
inverse_problem: Smoother | None = None
criterion: str = "a_optimal"
sensor_size: int = 1
navigation_grid: np.ndarray | None = None
[docs]
@set_configurations(TrajectoryUtilityFunctionConfigs)
class TrajectoryUtilityFunction(Criterion):
"""
A utility function for trajectory OED problems.
"""
_ACCEPTABLE_CRITERIA = [
"a_optimal", "d_optimal", "e_optimal",
"precond_d_optimal",
"fim_a_optimal", "fim_d_optimal", "fim_e_optimal",
"kl-divergence",
"randomized_a_optimal", "randomized_d_optimal",
]
## TODO: Define a utility function (KL-based) with (\tilde{H}+I): https://arxiv.org/pdf/2212.11466
[docs]
def __init__(self, configs: dict | TrajectoryUtilityFunctionConfigs | None = None):
configs = self.configurations_class.data_to_dataclass(configs)
super().__init__(configs)
self.configurations.criterion = self.configurations.criterion.lower().strip()
if self.configurations.navigation_grid is None:
warnings.warn("Setting the navigation grid to observation grid for backward compatibility")
self.configurations.navigation_grid = self.inverse_problem.observation_operator.base_observation_operator.get_observation_grid()
[docs]
def validate_configurations(
self,
configs: dict | TrajectoryUtilityFunctionConfigs ,
raise_for_invalid: bool = True,
) -> bool:
"""
Validate the passed configurations object.
:param configs: configurations to validate.
If a :py:class:`TrajectoryUtilityFunctionConfigs` object
is passed, validation is performed on the entire set of configurations.
However, if a dictionary is passed, validation is performed only on the
configurations corresponding to the keys in the dictionary.
:raises PyOEDConfigsValidationError: if the configurations are invalid and
`raise_for_invalid` is set to True.
"""
# Fuse configs into current/default configurations
aggregated_configs = self.aggregate_configurations(
configs=configs,
)
# Local tests
is_positive_int = lambda x: utility.isnumber(x) and int(x)==x > 0
def is_valid_criterion(criterion):
if isinstance(criterion, str):
# TODO: Relax this comparison with re.matchb)!
if criterion.lower().strip() in self._ACCEPTABLE_CRITERIA:
return True
else:
return False
else:
return False
def is_valid_grid(grid):
if grid is None:
return True
if not isinstance(grid, np.ndarray):
return False
if grid.ndim != 2:
return False
nx, ny = grid.shape
if not (nx> 1 and ny >=1 ):
return False
return True
def is_valid_inverse_problem(ip):
if isinstance(ip, Smoother):
return True
elif isinstance(ip, InverseProblem):
if hasattr(ip, 'base_inverse_problem'):
if isinstance(ip.base_inverse_problem, Smoother):
return True
else:
return False
else:
return False
else:
return False
## Validation Stage
# `inverse_problem`: None or string
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="inverse_problem",
test=lambda v: is_valid_inverse_problem,
message=f"inverse_problem is of invalid type. Expected instance of Smoother or InverseProblem with Smoother base inverse problem",
raise_for_invalid=raise_for_invalid,
):
return False
# `criterion`: string
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="criterion",
test=is_valid_criterion,
message=f"criterion is invalid! Expected on of: {self._ACCEPTABLE_CRITERIA}",
raise_for_invalid=raise_for_invalid,
):
return False
# `sensor_size`: positive integer
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="sensor_size",
test=is_positive_int,
message=f"sensor_size must be a positive integer",
raise_for_invalid=raise_for_invalid,
):
return False
# `navigation_grid`:
if not validate_key(
current_configs=aggregated_configs,
new_configs=configs,
key="navigation_grid",
test=is_valid_grid,
message=f"Navigation grid must be None of valid numpy array",
raise_for_invalid=raise_for_invalid,
):
return False
# All good!
return super().validate_configurations(configs, raise_for_invalid)
[docs]
def update_configurations(self, **kwargs):
"""
Take any set of keyword arguments, and lookup each in
the configurations, and update as nessesary/possible/valid
:raises PyOEDConfigsValidationError: if invalid configurations passed
"""
# Validate and udpate
super().update_configurations(**kwargs)
## Special updates based on specific arguments
# Resetting inverse problem? update problem_is_linear flag & reset number of solves
if "inverse_problem" in kwargs:
if hasattr(self, "_PINV"):
self._PINV = None
if hasattr(self, "_PSQRTT"):
self._PSQRTT = None
if "criterion" in kwargs:
self.configurations.criterion = kwargs['criterion'].lower().strip()
if "navigation_grid" in kwargs:
if kwargs['navigation_grid'] is None:
warnings.warn("Setting the navigation grid to observation grid for backward compatibility")
self.configurations.navigation_grid = self.inverse_problem.observation_operator.base_observation_operator.get_observation_grid()
[docs]
def evaluate(self, trajectory, ):
"""
Evaluate the utility function / optimality criterion at the given design (trajectory).
"""
# TODO: Proceed here by implementing RMSE-based criterion
...
if self.debug:
print(
"DEBUG: Evaluating Criteriorn with: \n"
f" - {trajectory=}: \n"
f" - {self.criterion=}: \n"
)
# Utility value based on the criterion
if self.criterion == "a_optimal":
utility_value = utility.matrix_trace(
self.posterior_covariance(trajectory=trajectory, ),
)
elif self.criterion == "d_optimal":
utility_value = utility.matrix_logdet(
self.posterior_covariance(trajectory=trajectory, precond=True, ),
)
elif self.criterion == "precond_d_optimal":
utility_value = utility.matrix_logdet(
self.posterior_covariance(trajectory=trajectory, precond=True, ),
)
elif self.criterion == "e_optimal":
utility_value = splinalg.eigsh(
self.posterior_covariance(trajectory=trajectory, ),
k=1,
)[0].item()
elif self.criterion == "fim_a_optimal":
utility_value = utility.matrix_trace(
self.FIM(trajectory=trajectory, data_misfit_only=True, ),
)
elif self.criterion == "fim_d_optimal":
utility_value = utility.matrix_logdet(
self.FIM(trajectory=trajectory, data_misfit_only=True, ),
)
elif self.criterion == "fim_e_optimal":
utility_value = splinalg.eigsh(
self.FIM(trajectory=trajectory, data_misfit_only=True, ),
k=1,
)[0].item()
elif self.criterion == "kl_divergence":
# NOTE: THIS IS VERY LIMITED AS WE NEED AVERAGE/EXPECTATION OVER DATA DISTRIBUTION
prior = self.inverse_problem.prior
if not isGaussian(prior):
prior = GaussianErrorModel({'mean': prior.mean, 'variance': prior.covariance_matrix()})
with self.inverse_problem.observation_operator.design_manager(design) as oper_mngr:
with self.inverse_problem.observation_error_model.design_manager(design) as eroperr_mngr:
self.inverse_problem.solve(update_posterior=True)
posterior = self.inverse_problem.posterior
utility_value = kl_divergence(prior, posterior)
elif self.criterion == "randomized_a_optimal":
randomization_vectors = self.randomization_vectors
precond = self._REDUCED_HESSIAN
with self.inverse_problem.observation_operator.design_manager(design) as oper_mngr:
with self.inverse_problem.observation_error_model.design_manager(design) as eroperr_mngr:
utility_value = utility.matrix_trace(
lambda x: self.inverse_problem.Hessian_inv_matvec(
x,
eval_at=self.linearization_point,
precond=precond,
),
size=size,
method="randomized",
randomization_vectors=randomization_vectors,
# verbose=self.verbose,
)
elif self.criterion == "randomized_d_optimal":
randomization_vectors = self.randomization_vectors
precond = self._REDUCED_HESSIAN
with self.inverse_problem.observation_operator.design_manager(design) as oper_mngr:
with self.inverse_problem.observation_error_model.design_manager(design) as eroperr_mngr:
utility_value = utility.matrix_logdet(
lambda x: self.inverse_problem.Hessian_inv_matvec(
x,
eval_at=self.linearization_point,
precond=precond,
),
size=size,
method="randomized",
randomization_vectors=self.randomization_vectors,
# verbose=self.verbose,
)
else:
# TODO: Consider putting this as a special class variable
raise NotImplementedError(
f"{self.criterion} utility criterion is not implemented yet."
f" Available criteria are: {', '.join(self._ACCEPTABLE_CRITERIA)}"
)
if self.debug:
print(
"DEBUG: Returning Criterion Value: \n"
f" - {utility_value=}: \n"
)
return utility_value
[docs]
def get_time_dependent_design(self, trajectory):
"""Convert a trajectory (of indexes) into time-dependent design (dict)"""
if trajectory is None:
design = {}
elif isinstance(trajectory, dict):
design = trajectory
elif utility.isiterable(trajectory):
trajectory = utility.asarray(trajectory).flatten()
if trajectory.size != self.inverse_problem.observation_times.size:
raise TypeError(
"The trajectory size must match observation times! \n"
f"{self.inverse_problem.observation_times=}\n"
f"{trajectory.size=}"
)
# design placeholder (time-independent)
d = np.empty(
self.inverse_problem.observation_operator.shape[0],
dtype=bool,
)
# Create time-dependent designs
design = {}
for t, k in zip(self.inverse_problem.observation_times, trajectory):
# k is the trajectory element (index of the navigation grid)
idx_closest, _ = self.get_observation_targets(
center=self.navigation_grid[k, :]
)
d[:] = False
d[idx_closest] = True
design.update(
{
t: d.copy()
}
)
else:
raise TypeError(
f"Unexpected design type: {trajectory=} of {type(trajectory)=}"
)
obs_times = self.inverse_problem.observation_times
design_times = utility.asarray(design.keys())
if obs_times.size != design_times.size:
raise TypeError(
"The design must match observation times! \n"
f"{self.inverse_problem.observation_times=}\n"
f"design.keys()={design_times}"
)
if not np.allclose(obs_times, design_times):
raise TypeError(
"The design must match observation times! \n"
f"{self.inverse_problem.observation_times=}\n"
f"design.keys()={design_times}"
)
return design
[docs]
def get_observation_targets(self, center, ):
"""Get the observation target locations based on the agent state
(center sensor location) stated as (x, y) coordinates.
This is a methos to determine observation targets (gridpoints given
the center of a batch of sensors)
:returns: (idx_closest, targets) where:
- 'idx_closest': indexes (in the observation grid) of coordinates to choose
- 'targets': the actual targets
"""
# The center point is the trajectory index around which observations are made
center = utility.asarray(center).flatten()
# The observation grid from which we pick observation points around trajectory point
observation_grid = self.inverse_problem.observation_operator.base_observation_operator.get_observation_grid()
assert self.sensor_size <= len(
observation_grid
), "self.sensor_size must be less than or equal to the number of agent states."
# Get distance of all observation points from the trajectory (center) point
distance = np.linalg.norm(
observation_grid - center.reshape(1, -1),
axis=1,
)
# Extract teh closes points based on the number of sensors
idx_closest = np.argsort(distance)[: self.sensor_size]
idx_closest.sort()
targets = observation_grid[idx_closest, :]
return idx_closest, targets
[docs]
def posterior_covariance(self, trajectory, precond=False, ):
"""Evaluate the posterior covariance matrix (in full)"""
# Get the FIM
FIM = self.FIM(trajectory=trajectory, data_misfit_only=False, precond=precond, )
# Invert
cov = np.linalg.inv(FIM)
if self.debug:
print(
f"DEBUG: Parameter Posterior Covariance:\n {cov=}"
)
# Check if goal-oriented
g = self.frozen_goal_operator
if g is not None:
cov = g @ (cov @ g.T)
if self.debug:
print(
f"DEBUG: Goal Posterior Covariance:\n {cov=} "
)
return cov
[docs]
def FIM_inv(self, trajectory, data_misfit_only=True, precond=False, ):
return np.linalg.inv(
self.FIM(
trajectory=trajectory,
data_misfit_only=data_misfit_only,
precond=precond,
),
)
[docs]
def FIM(self, trajectory, data_misfit_only=True, precond=False, ):
"""Construct the FIM (Hessian of 4dvar without prior) in full for a given trajectory"""
if self.debug:
print(
"DEBUG: FIM with: \n"
f" - {trajectory=}: \n"
)
design = self.get_time_dependent_design(
trajectory,
)
if self.debug:
_tr = {}
for t, d in design.items():
_tr.update(
{t: np.where(d)[0]}
)
print(
"DEBUG: Trajectory -> Design : \n"
f" - {trajectory=} \n"
f" - Desin active indexes = {_tr} \n"
)
# Evaluate the frozen model if not evaluated
_ = self.frozen_model
# Initialize the Hessian (FIM) based on `data_misfit_only`
if data_misfit_only:
Hess = None
elif precond:
Hess = np.eye(self.inverse_problem.prior.size)
else:
# Precision matrices of the error models
Hess = self.prior_precision.copy()
# 2- Hessian evaluation
# Update design and evaluate Hessian for this design
with self.inverse_problem.observation_error_model.design_manager(design) as err_mngr:
with self.inverse_problem.observation_operator.design_manager(design) as oper_mngr:
# Loop over observation times
for t in self.inverse_problem.observation_times:
# Observation error covariance matrix (at this time)
Rinv = err_mngr.precision_matrix(t=t)
# Observation operator at this time; maybe initialize (full one) + slice -> fast execution
if oper_mngr.project_onto_active_design_space:
obs_shape = oper_mngr.active_shape[t]
obs_indexes = oper_mngr.active_indexes[t]
# ObsOper = np.zeros((obs_shape[0], oper_mngr.shape[0]))
# for i, j in enumerate(obs_indexes):
# ObsOper[i, j] = 1.0
# Replacing the above with sparse instead since we know rows & data!
ObsOper = sp.csc_array(
(
np.ones(obs_indexes.size),
(np.arange(obs_indexes.size), obs_indexes)
),
shape=(obs_shape[0], oper_mngr.shape[0]),
)
else:
ObsOper = None
# obs_shape = oper_mngr.shape
# obs_indexes = oper_mngr.active_indexes[t]
# ObsOper = np.eye(obs_shape[0])
# The simulation model TMG
model_tlm = self.frozen_model[t]
if ObsOper is None:
rpart = model_tlm
else:
rpart = ObsOper @ model_tlm
if precond:
rpart = rpart @ self.prior_covariance_sqrt_T
if Hess is None:
Hess = rpart.T @ (Rinv @ rpart)
else:
Hess += rpart.T @ (Rinv @ rpart)
if self.debug:
print(
"DEBUG: Covariance Iteration: \n"
f" - {t=}: \n"
f" - {Rinv=}: \n"
f" - {ObsOper=}: ; (None means ObsOper is set to 1) \n"
f" - {model_tlm =}: \n"
f" - {rpart=}: \n"
f" - {Hess=}: \n"
)
return Hess
@property
def navigation_grid(self):
return self.configurations.navigation_grid
@property
def inverse_problem(self):
return self.configurations.inverse_problem
@inverse_problem.setter
def inverse_problem(self, val):
return self.update_configurations(inverse_problem=val, )
@property
def criterion(self):
return self.configurations.criterion
@criterion.setter
def criterion(self, val):
return self.update_configurations(criterion=val, )
@property
def sensor_size(self):
return self.configurations.sensor_size
@sensor_size.setter
def sensor_size(self, val):
return self.update_configurations(sensor_size=val, )
@property
def prior_covariance_sqrt_T(self):
"""Lazy sqrt of the prior covariance matrix (compute once)"""
if hasattr(self, "_PSQRTT") and self._PSQRTT is not None:
pass
else:
if hasattr(self.inverse_problem, "base_inverse_problem"):
ip = self.inverse_problem.base_inverse_problem
else:
ip = self.inverse_problem
self._PSQRTT = utility.asarray(
ip.prior.covariance_sqrt_matvec,
shape=(ip.prior.size, ip.prior.size),
).T
return self._PSQRTT
@property
def prior_precision(self):
"""Lazy inverse of the prior covariance matrix (compute once)"""
if hasattr(self, "_PINV") and self._PINV is not None:
pass
else:
if hasattr(self.inverse_problem, "base_inverse_problem"):
self._PINV = self.inverse_problem.base_inverse_problem.prior.precision_matrix()
else:
self._PINV = self.inverse_problem.prior.precision_matrix()
return self._PINV
@property
def frozen_model(self):
"""Reference to the lazy object self._FROZEN_MODEL
If this is Called, the object is constructed if not available.
The returned object is actually a dictionary of numpy array
representation of the simulation model over registered observaiton times.
"""
if hasattr(self, "_FROZEN_MODEL"):
model = self._FROZEN_MODEL
else:
model = None
# Recalculate
if model is None:
if self.verbose:
print("Constructing frozen model snapshots (only once)")
# Retrieve observation times
observation_times = self.inverse_problem.observation_times
# TODO: Update/Fix This!
if hasattr(self.inverse_problem, 'base_inverse_problem'):
ip = self.inverse_problem.base_inverse_problem
else:
ip = self.inverse_problem
self._FROZEN_MODEL = {}
design = ip.observation_operator.design
if isinstance(design, dict):
for k in design.keys():
design[k][:] = 1
else:
design[:] = 1
with ip.observation_operator.design_manager(design):
def A(x, t):
out = assimilation_utils.apply_TLM_operator(
ip,
state=x,
obs_time=t,
save_all=False,
eval_at=self.linearization_point,
scale_by_noise=False,
)
return out[0]
for t in self.inverse_problem.observation_times:
# Create the model
self._FROZEN_MODEL.update(
{
t: utility.asarray(
lambda x: A(x, t=t),
shape=ip.observation_operator.shape,
)
}
)
return self._FROZEN_MODEL
@property
def linearization_point(self):
"""Reference to the lazy object self._LINEARIZATION_POINT.
If this is Called, the object is constructed if not available.
The returned object is either `None` (if the underlying inverse problem is linear)
or the solution of the inverse problem (of the inference parameter) to be
used as linearization point of the model, operators, etc in the case of nonlinear inference.
"""
if not hasattr(self, "_LINEARIZATION_POINT"):
# Check if the problem is linear; if not, solve to generate linearization point
if hasattr(self.inverse_problem, 'goal_operator'):
ip = self.inverse_problem.base_inverse_problem
else:
ip = self.inverse_problem
problem_is_linear = utility.inverse_problem_linearity_test(ip)
if problem_is_linear:
eval_at = None
else:
# solve the inverse problem
map_point = ip.solve()
eval_at = map_point
self._LINEARIZATION_POINT = eval_at
return self._LINEARIZATION_POINT
@property
def randomization_vectors(self):
"""Reference to the lazy object self._RANDOMIZATION_VECTORS
If this is Called, the object is constructed if not available.
The returned object is actually a list of random vectors
used to estimate the requested criterion.
"""
if hasattr(self, "_RANDOMIZATION_VECTORS"):
rvecs = self._RANDOMIZATION_VECTORS
else:
rvecs = None
# Recalculate
if hasattr(self.inverse_problem, 'goal_operator'):
size = self.inverse_problem.goal_operator.size
else:
size = self.inverse_problem.prior.size
if rvecs is None:
if self.verbose:
print(
"\n***\nGenerating randomization vectors for the first time...", end= ""
)
# Generate the randomization vectors
if self.criterion == "randomized_a_optimal":
# Create preconditioner
try:
hess = SVDReducedHessian(
inverse_problem=self.inverse_problem,
rank_p=25, # Make this optional
verbose=self.verbose,
data_misfit_only=False,
eval_at=self.linearization_point,
)
self._REDUCED_HESSIAN = lambda x, p: hess.Hessian_matvec(
state=p,
eval_at=self.linearization_point,
)
except (Exception) as err:
print(
"Failed to construct reduced Hessian; ignoring the error raised below:\n"
f"{err=} or {type(err)=}"
)
self._REDUCED_HESSIAN = None
_, rvecs = utility.matrix_trace(
lambda x: self.inverse_problem.Hessian_inv_matvec(
x,
eval_at=self.linearization_point,
precond=self._REDUCED_HESSIAN,
),
size=size,
method="randomized",
return_randomization_vectors=True,
optimize_sample_size=True,
min_sample_size=32, # TODO: Should be an option/config, or use default values
verbose=self.verbose,
)
elif self.criterion == "randomized_d_optimal":
# Create preconditioner
try:
hess = SVDReducedHessian(
inverse_problem=self.inverse_problem,
rank_p=25, # Make this optional
verbose=self.verbose,
data_misfit_only=False,
eval_at=self.linearization_point,
)
self._REDUCED_HESSIAN = lambda x, p: hess.Hessian_matvec(
state=p,
eval_at=self.linearization_point,
)
self._REDUCED_HESSIAN = None
except (Exception) as err:
print(
"Failed to construct reduced Hessian; ignoring the error raised below:\n"
f"{err=} or {type(err)=}"
)
_, rvecs = utility.matrix_logdet(
lambda x: self.inverse_problem.Hessian_inv_matvec(
x,
eval_at=self.linearization_point,
precond=self._REDUCED_HESSIAN,
),
size=size,
method="randomized",
return_randomization_vectors=True,
optimize_sample_size=True,
min_sample_size=32, # TODO: Should be an option/config, or use default values
verbose=self.verbose,
)
self._REDUCED_HESSIAN = lambda x, p: hess.Hessian_matvec(state=p, eval_at=x)
else:
raise ValueError(
f"The registered criterion '{self.criterion}' "
"cannot be used for randomization!"
)
if self.verbose:
print(" Done...")
self._RANDOMIZATION_VECTORS = rvecs
return self._RANDOMIZATION_VECTORS
@property
def frozen_goal_operator(self):
"""If the inverse problem provides `goal_operator`, construct it as a matrix, otherwise `None`"""
if self.debug:
print(
"DEBUG: Evaluating FROZEN GOAL OPERATOR\n"
f" - {self.inverse_problem}"
)
if hasattr(self.inverse_problem, 'goal_operator'):
if hasattr(self, "_FROZEN_GOAL_OPERATOR"):
pred_oper = self._FROZEN_GOAL_OPERATOR
else:
pred_oper = None
# Recalculate
if pred_oper is None:
# Create the prediction operator
self._FROZEN_GOAL_OPERATOR = utility.asarray(
self.inverse_problem.goal_operator.apply,
shape=self.inverse_problem.goal_operator.shape,
)
else:
self._FROZEN_GOAL_OPERATOR = None
if self.debug:
print(
"DEBUG: Exiting FROZEN GOAL OPERATOR CALLER"
f" Returning {self._FROZEN_GOAL_OPERATOR=}"
)
return self._FROZEN_GOAL_OPERATOR
@property
def frozen_goal_operator_adjoint(self):
"""If the inverse problem provides `goal_operator`, construct it as a matrix, otherwise `None`"""
if hasattr(self.inverse_problem, 'goal_operator'):
if hasattr(self, "_FROZEN_GOAL_OPERATOR_ADJOINT"):
pred_oper = self._FROZEN_GOAL_OPERATOR_ADJOINT
else:
pred_oper = None
# Recalculate
if pred_oper is None:
# Create the prediction operator
self._FROZEN_GOAL_OPERATOR_ADJOINT = utility.asarray(
self.inverse_problem.goal_operator.apply_adjoint,
shape=self.inverse_problem.goal_operator.shape[::-1],
)
else:
self._FROZEN_GOAL_OPERATOR_ADJOINT = None
return self._FROZEN_GOAL_OPERATOR_ADJOINT
# Add an alias so that utility functions and optimality criteria can be perceived
# equally
TrajectoryCriterionConfigs = TrajectoryUtilityFunctionConfigs
TrajectoryCriterion = TrajectoryUtilityFunction