Source code for pyoed.oed.utility_functions.trajectory_criteria

# Copyright © 2023, UChicago Argonne, LLC
# All Rights Reserved

"""
This module provides implementations of utility functions valid for trajectory OED
Eventually, this can be moved into the OED subpackage under utility_functions
"""

import warnings
from dataclasses import dataclass
import numpy as np
from scipy import sparse as sp
from scipy.sparse import linalg as splinalg
from scipy.sparse.linalg import LinearOperator

from pyoed import utility
from pyoed.models.error_models.Gaussian import (kl_divergence, GaussianErrorModel, isGaussian)
from pyoed.configs import (
    validate_key,
    set_configurations,
    PyOEDConfigsValidationError,
)
from pyoed.assimilation import (
    InverseProblem,
    Smoother,
    assimilation_utils,
)
from pyoed.assimilation.extras.reduced_order_modeling import (SVDReducedHessian)
from pyoed.oed.core import (
    CriterionConfigs,
    Criterion,
)

[docs] @dataclass(kw_only=True, slots=True) class TrajectoryUtilityFunctionConfigs(CriterionConfigs): """ Configuration class for :py:class:`TrajectoryUtilityFunction`. :param inverse_problem: an instance of an inverse problem :py:class:`Smoother`. :param criterion: name of the OED criterion to use as the objective (utility) function. :param sensor_size: number of sensors to use/activate around the trajectory state in order to build the observation operator. :param navigation_grid: the navigation grid. This could be the same as the observation operator (if `None`) (base) grid that defines all candidate locations. The navigation grid, however could be different from the observation grid, thus it must be passed here. """ inverse_problem: Smoother | None = None criterion: str = "a_optimal" sensor_size: int = 1 navigation_grid: np.ndarray | None = None
[docs] @set_configurations(TrajectoryUtilityFunctionConfigs) class TrajectoryUtilityFunction(Criterion): """ A utility function for trajectory OED problems. """ _ACCEPTABLE_CRITERIA = [ "a_optimal", "d_optimal", "e_optimal", "precond_d_optimal", "fim_a_optimal", "fim_d_optimal", "fim_e_optimal", "kl-divergence", "randomized_a_optimal", "randomized_d_optimal", ] ## TODO: Define a utility function (KL-based) with (\tilde{H}+I): https://arxiv.org/pdf/2212.11466
[docs] def __init__(self, configs: dict | TrajectoryUtilityFunctionConfigs | None = None): configs = self.configurations_class.data_to_dataclass(configs) super().__init__(configs) self.configurations.criterion = self.configurations.criterion.lower().strip() if self.configurations.navigation_grid is None: warnings.warn("Setting the navigation grid to observation grid for backward compatibility") self.configurations.navigation_grid = self.inverse_problem.observation_operator.base_observation_operator.get_observation_grid()
[docs] def validate_configurations( self, configs: dict | TrajectoryUtilityFunctionConfigs , raise_for_invalid: bool = True, ) -> bool: """ Validate the passed configurations object. :param configs: configurations to validate. If a :py:class:`TrajectoryUtilityFunctionConfigs` object is passed, validation is performed on the entire set of configurations. However, if a dictionary is passed, validation is performed only on the configurations corresponding to the keys in the dictionary. :raises PyOEDConfigsValidationError: if the configurations are invalid and `raise_for_invalid` is set to True. """ # Fuse configs into current/default configurations aggregated_configs = self.aggregate_configurations( configs=configs, ) # Local tests is_positive_int = lambda x: utility.isnumber(x) and int(x)==x > 0 def is_valid_criterion(criterion): if isinstance(criterion, str): # TODO: Relax this comparison with re.matchb)! if criterion.lower().strip() in self._ACCEPTABLE_CRITERIA: return True else: return False else: return False def is_valid_grid(grid): if grid is None: return True if not isinstance(grid, np.ndarray): return False if grid.ndim != 2: return False nx, ny = grid.shape if not (nx> 1 and ny >=1 ): return False return True def is_valid_inverse_problem(ip): if isinstance(ip, Smoother): return True elif isinstance(ip, InverseProblem): if hasattr(ip, 'base_inverse_problem'): if isinstance(ip.base_inverse_problem, Smoother): return True else: return False else: return False else: return False ## Validation Stage # `inverse_problem`: None or string if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="inverse_problem", test=lambda v: is_valid_inverse_problem, message=f"inverse_problem is of invalid type. Expected instance of Smoother or InverseProblem with Smoother base inverse problem", raise_for_invalid=raise_for_invalid, ): return False # `criterion`: string if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="criterion", test=is_valid_criterion, message=f"criterion is invalid! Expected on of: {self._ACCEPTABLE_CRITERIA}", raise_for_invalid=raise_for_invalid, ): return False # `sensor_size`: positive integer if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="sensor_size", test=is_positive_int, message=f"sensor_size must be a positive integer", raise_for_invalid=raise_for_invalid, ): return False # `navigation_grid`: if not validate_key( current_configs=aggregated_configs, new_configs=configs, key="navigation_grid", test=is_valid_grid, message=f"Navigation grid must be None of valid numpy array", raise_for_invalid=raise_for_invalid, ): return False # All good! return super().validate_configurations(configs, raise_for_invalid)
[docs] def update_configurations(self, **kwargs): """ Take any set of keyword arguments, and lookup each in the configurations, and update as nessesary/possible/valid :raises PyOEDConfigsValidationError: if invalid configurations passed """ # Validate and udpate super().update_configurations(**kwargs) ## Special updates based on specific arguments # Resetting inverse problem? update problem_is_linear flag & reset number of solves if "inverse_problem" in kwargs: if hasattr(self, "_PINV"): self._PINV = None if hasattr(self, "_PSQRTT"): self._PSQRTT = None if "criterion" in kwargs: self.configurations.criterion = kwargs['criterion'].lower().strip() if "navigation_grid" in kwargs: if kwargs['navigation_grid'] is None: warnings.warn("Setting the navigation grid to observation grid for backward compatibility") self.configurations.navigation_grid = self.inverse_problem.observation_operator.base_observation_operator.get_observation_grid()
[docs] def evaluate(self, trajectory, ): """ Evaluate the utility function / optimality criterion at the given design (trajectory). """ # TODO: Proceed here by implementing RMSE-based criterion ... if self.debug: print( "DEBUG: Evaluating Criteriorn with: \n" f" - {trajectory=}: \n" f" - {self.criterion=}: \n" ) # Utility value based on the criterion if self.criterion == "a_optimal": utility_value = utility.matrix_trace( self.posterior_covariance(trajectory=trajectory, ), ) elif self.criterion == "d_optimal": utility_value = utility.matrix_logdet( self.posterior_covariance(trajectory=trajectory, precond=True, ), ) elif self.criterion == "precond_d_optimal": utility_value = utility.matrix_logdet( self.posterior_covariance(trajectory=trajectory, precond=True, ), ) elif self.criterion == "e_optimal": utility_value = splinalg.eigsh( self.posterior_covariance(trajectory=trajectory, ), k=1, )[0].item() elif self.criterion == "fim_a_optimal": utility_value = utility.matrix_trace( self.FIM(trajectory=trajectory, data_misfit_only=True, ), ) elif self.criterion == "fim_d_optimal": utility_value = utility.matrix_logdet( self.FIM(trajectory=trajectory, data_misfit_only=True, ), ) elif self.criterion == "fim_e_optimal": utility_value = splinalg.eigsh( self.FIM(trajectory=trajectory, data_misfit_only=True, ), k=1, )[0].item() elif self.criterion == "kl_divergence": # NOTE: THIS IS VERY LIMITED AS WE NEED AVERAGE/EXPECTATION OVER DATA DISTRIBUTION prior = self.inverse_problem.prior if not isGaussian(prior): prior = GaussianErrorModel({'mean': prior.mean, 'variance': prior.covariance_matrix()}) with self.inverse_problem.observation_operator.design_manager(design) as oper_mngr: with self.inverse_problem.observation_error_model.design_manager(design) as eroperr_mngr: self.inverse_problem.solve(update_posterior=True) posterior = self.inverse_problem.posterior utility_value = kl_divergence(prior, posterior) elif self.criterion == "randomized_a_optimal": randomization_vectors = self.randomization_vectors precond = self._REDUCED_HESSIAN with self.inverse_problem.observation_operator.design_manager(design) as oper_mngr: with self.inverse_problem.observation_error_model.design_manager(design) as eroperr_mngr: utility_value = utility.matrix_trace( lambda x: self.inverse_problem.Hessian_inv_matvec( x, eval_at=self.linearization_point, precond=precond, ), size=size, method="randomized", randomization_vectors=randomization_vectors, # verbose=self.verbose, ) elif self.criterion == "randomized_d_optimal": randomization_vectors = self.randomization_vectors precond = self._REDUCED_HESSIAN with self.inverse_problem.observation_operator.design_manager(design) as oper_mngr: with self.inverse_problem.observation_error_model.design_manager(design) as eroperr_mngr: utility_value = utility.matrix_logdet( lambda x: self.inverse_problem.Hessian_inv_matvec( x, eval_at=self.linearization_point, precond=precond, ), size=size, method="randomized", randomization_vectors=self.randomization_vectors, # verbose=self.verbose, ) else: # TODO: Consider putting this as a special class variable raise NotImplementedError( f"{self.criterion} utility criterion is not implemented yet." f" Available criteria are: {', '.join(self._ACCEPTABLE_CRITERIA)}" ) if self.debug: print( "DEBUG: Returning Criterion Value: \n" f" - {utility_value=}: \n" ) return utility_value
[docs] def get_time_dependent_design(self, trajectory): """Convert a trajectory (of indexes) into time-dependent design (dict)""" if trajectory is None: design = {} elif isinstance(trajectory, dict): design = trajectory elif utility.isiterable(trajectory): trajectory = utility.asarray(trajectory).flatten() if trajectory.size != self.inverse_problem.observation_times.size: raise TypeError( "The trajectory size must match observation times! \n" f"{self.inverse_problem.observation_times=}\n" f"{trajectory.size=}" ) # design placeholder (time-independent) d = np.empty( self.inverse_problem.observation_operator.shape[0], dtype=bool, ) # Create time-dependent designs design = {} for t, k in zip(self.inverse_problem.observation_times, trajectory): # k is the trajectory element (index of the navigation grid) idx_closest, _ = self.get_observation_targets( center=self.navigation_grid[k, :] ) d[:] = False d[idx_closest] = True design.update( { t: d.copy() } ) else: raise TypeError( f"Unexpected design type: {trajectory=} of {type(trajectory)=}" ) obs_times = self.inverse_problem.observation_times design_times = utility.asarray(design.keys()) if obs_times.size != design_times.size: raise TypeError( "The design must match observation times! \n" f"{self.inverse_problem.observation_times=}\n" f"design.keys()={design_times}" ) if not np.allclose(obs_times, design_times): raise TypeError( "The design must match observation times! \n" f"{self.inverse_problem.observation_times=}\n" f"design.keys()={design_times}" ) return design
[docs] def get_observation_targets(self, center, ): """Get the observation target locations based on the agent state (center sensor location) stated as (x, y) coordinates. This is a methos to determine observation targets (gridpoints given the center of a batch of sensors) :returns: (idx_closest, targets) where: - 'idx_closest': indexes (in the observation grid) of coordinates to choose - 'targets': the actual targets """ # The center point is the trajectory index around which observations are made center = utility.asarray(center).flatten() # The observation grid from which we pick observation points around trajectory point observation_grid = self.inverse_problem.observation_operator.base_observation_operator.get_observation_grid() assert self.sensor_size <= len( observation_grid ), "self.sensor_size must be less than or equal to the number of agent states." # Get distance of all observation points from the trajectory (center) point distance = np.linalg.norm( observation_grid - center.reshape(1, -1), axis=1, ) # Extract teh closes points based on the number of sensors idx_closest = np.argsort(distance)[: self.sensor_size] idx_closest.sort() targets = observation_grid[idx_closest, :] return idx_closest, targets
[docs] def posterior_covariance(self, trajectory, precond=False, ): """Evaluate the posterior covariance matrix (in full)""" # Get the FIM FIM = self.FIM(trajectory=trajectory, data_misfit_only=False, precond=precond, ) # Invert cov = np.linalg.inv(FIM) if self.debug: print( f"DEBUG: Parameter Posterior Covariance:\n {cov=}" ) # Check if goal-oriented g = self.frozen_goal_operator if g is not None: cov = g @ (cov @ g.T) if self.debug: print( f"DEBUG: Goal Posterior Covariance:\n {cov=} " ) return cov
[docs] def FIM_inv(self, trajectory, data_misfit_only=True, precond=False, ): return np.linalg.inv( self.FIM( trajectory=trajectory, data_misfit_only=data_misfit_only, precond=precond, ), )
[docs] def FIM(self, trajectory, data_misfit_only=True, precond=False, ): """Construct the FIM (Hessian of 4dvar without prior) in full for a given trajectory""" if self.debug: print( "DEBUG: FIM with: \n" f" - {trajectory=}: \n" ) design = self.get_time_dependent_design( trajectory, ) if self.debug: _tr = {} for t, d in design.items(): _tr.update( {t: np.where(d)[0]} ) print( "DEBUG: Trajectory -> Design : \n" f" - {trajectory=} \n" f" - Desin active indexes = {_tr} \n" ) # Evaluate the frozen model if not evaluated _ = self.frozen_model # Initialize the Hessian (FIM) based on `data_misfit_only` if data_misfit_only: Hess = None elif precond: Hess = np.eye(self.inverse_problem.prior.size) else: # Precision matrices of the error models Hess = self.prior_precision.copy() # 2- Hessian evaluation # Update design and evaluate Hessian for this design with self.inverse_problem.observation_error_model.design_manager(design) as err_mngr: with self.inverse_problem.observation_operator.design_manager(design) as oper_mngr: # Loop over observation times for t in self.inverse_problem.observation_times: # Observation error covariance matrix (at this time) Rinv = err_mngr.precision_matrix(t=t) # Observation operator at this time; maybe initialize (full one) + slice -> fast execution if oper_mngr.project_onto_active_design_space: obs_shape = oper_mngr.active_shape[t] obs_indexes = oper_mngr.active_indexes[t] # ObsOper = np.zeros((obs_shape[0], oper_mngr.shape[0])) # for i, j in enumerate(obs_indexes): # ObsOper[i, j] = 1.0 # Replacing the above with sparse instead since we know rows & data! ObsOper = sp.csc_array( ( np.ones(obs_indexes.size), (np.arange(obs_indexes.size), obs_indexes) ), shape=(obs_shape[0], oper_mngr.shape[0]), ) else: ObsOper = None # obs_shape = oper_mngr.shape # obs_indexes = oper_mngr.active_indexes[t] # ObsOper = np.eye(obs_shape[0]) # The simulation model TMG model_tlm = self.frozen_model[t] if ObsOper is None: rpart = model_tlm else: rpart = ObsOper @ model_tlm if precond: rpart = rpart @ self.prior_covariance_sqrt_T if Hess is None: Hess = rpart.T @ (Rinv @ rpart) else: Hess += rpart.T @ (Rinv @ rpart) if self.debug: print( "DEBUG: Covariance Iteration: \n" f" - {t=}: \n" f" - {Rinv=}: \n" f" - {ObsOper=}: ; (None means ObsOper is set to 1) \n" f" - {model_tlm =}: \n" f" - {rpart=}: \n" f" - {Hess=}: \n" ) return Hess
@property def navigation_grid(self): return self.configurations.navigation_grid @property def inverse_problem(self): return self.configurations.inverse_problem @inverse_problem.setter def inverse_problem(self, val): return self.update_configurations(inverse_problem=val, ) @property def criterion(self): return self.configurations.criterion @criterion.setter def criterion(self, val): return self.update_configurations(criterion=val, ) @property def sensor_size(self): return self.configurations.sensor_size @sensor_size.setter def sensor_size(self, val): return self.update_configurations(sensor_size=val, ) @property def prior_covariance_sqrt_T(self): """Lazy sqrt of the prior covariance matrix (compute once)""" if hasattr(self, "_PSQRTT") and self._PSQRTT is not None: pass else: if hasattr(self.inverse_problem, "base_inverse_problem"): ip = self.inverse_problem.base_inverse_problem else: ip = self.inverse_problem self._PSQRTT = utility.asarray( ip.prior.covariance_sqrt_matvec, shape=(ip.prior.size, ip.prior.size), ).T return self._PSQRTT @property def prior_precision(self): """Lazy inverse of the prior covariance matrix (compute once)""" if hasattr(self, "_PINV") and self._PINV is not None: pass else: if hasattr(self.inverse_problem, "base_inverse_problem"): self._PINV = self.inverse_problem.base_inverse_problem.prior.precision_matrix() else: self._PINV = self.inverse_problem.prior.precision_matrix() return self._PINV @property def frozen_model(self): """Reference to the lazy object self._FROZEN_MODEL If this is Called, the object is constructed if not available. The returned object is actually a dictionary of numpy array representation of the simulation model over registered observaiton times. """ if hasattr(self, "_FROZEN_MODEL"): model = self._FROZEN_MODEL else: model = None # Recalculate if model is None: if self.verbose: print("Constructing frozen model snapshots (only once)") # Retrieve observation times observation_times = self.inverse_problem.observation_times # TODO: Update/Fix This! if hasattr(self.inverse_problem, 'base_inverse_problem'): ip = self.inverse_problem.base_inverse_problem else: ip = self.inverse_problem self._FROZEN_MODEL = {} design = ip.observation_operator.design if isinstance(design, dict): for k in design.keys(): design[k][:] = 1 else: design[:] = 1 with ip.observation_operator.design_manager(design): def A(x, t): out = assimilation_utils.apply_TLM_operator( ip, state=x, obs_time=t, save_all=False, eval_at=self.linearization_point, scale_by_noise=False, ) return out[0] for t in self.inverse_problem.observation_times: # Create the model self._FROZEN_MODEL.update( { t: utility.asarray( lambda x: A(x, t=t), shape=ip.observation_operator.shape, ) } ) return self._FROZEN_MODEL @property def linearization_point(self): """Reference to the lazy object self._LINEARIZATION_POINT. If this is Called, the object is constructed if not available. The returned object is either `None` (if the underlying inverse problem is linear) or the solution of the inverse problem (of the inference parameter) to be used as linearization point of the model, operators, etc in the case of nonlinear inference. """ if not hasattr(self, "_LINEARIZATION_POINT"): # Check if the problem is linear; if not, solve to generate linearization point if hasattr(self.inverse_problem, 'goal_operator'): ip = self.inverse_problem.base_inverse_problem else: ip = self.inverse_problem problem_is_linear = utility.inverse_problem_linearity_test(ip) if problem_is_linear: eval_at = None else: # solve the inverse problem map_point = ip.solve() eval_at = map_point self._LINEARIZATION_POINT = eval_at return self._LINEARIZATION_POINT @property def randomization_vectors(self): """Reference to the lazy object self._RANDOMIZATION_VECTORS If this is Called, the object is constructed if not available. The returned object is actually a list of random vectors used to estimate the requested criterion. """ if hasattr(self, "_RANDOMIZATION_VECTORS"): rvecs = self._RANDOMIZATION_VECTORS else: rvecs = None # Recalculate if hasattr(self.inverse_problem, 'goal_operator'): size = self.inverse_problem.goal_operator.size else: size = self.inverse_problem.prior.size if rvecs is None: if self.verbose: print( "\n***\nGenerating randomization vectors for the first time...", end= "" ) # Generate the randomization vectors if self.criterion == "randomized_a_optimal": # Create preconditioner try: hess = SVDReducedHessian( inverse_problem=self.inverse_problem, rank_p=25, # Make this optional verbose=self.verbose, data_misfit_only=False, eval_at=self.linearization_point, ) self._REDUCED_HESSIAN = lambda x, p: hess.Hessian_matvec( state=p, eval_at=self.linearization_point, ) except (Exception) as err: print( "Failed to construct reduced Hessian; ignoring the error raised below:\n" f"{err=} or {type(err)=}" ) self._REDUCED_HESSIAN = None _, rvecs = utility.matrix_trace( lambda x: self.inverse_problem.Hessian_inv_matvec( x, eval_at=self.linearization_point, precond=self._REDUCED_HESSIAN, ), size=size, method="randomized", return_randomization_vectors=True, optimize_sample_size=True, min_sample_size=32, # TODO: Should be an option/config, or use default values verbose=self.verbose, ) elif self.criterion == "randomized_d_optimal": # Create preconditioner try: hess = SVDReducedHessian( inverse_problem=self.inverse_problem, rank_p=25, # Make this optional verbose=self.verbose, data_misfit_only=False, eval_at=self.linearization_point, ) self._REDUCED_HESSIAN = lambda x, p: hess.Hessian_matvec( state=p, eval_at=self.linearization_point, ) self._REDUCED_HESSIAN = None except (Exception) as err: print( "Failed to construct reduced Hessian; ignoring the error raised below:\n" f"{err=} or {type(err)=}" ) _, rvecs = utility.matrix_logdet( lambda x: self.inverse_problem.Hessian_inv_matvec( x, eval_at=self.linearization_point, precond=self._REDUCED_HESSIAN, ), size=size, method="randomized", return_randomization_vectors=True, optimize_sample_size=True, min_sample_size=32, # TODO: Should be an option/config, or use default values verbose=self.verbose, ) self._REDUCED_HESSIAN = lambda x, p: hess.Hessian_matvec(state=p, eval_at=x) else: raise ValueError( f"The registered criterion '{self.criterion}' " "cannot be used for randomization!" ) if self.verbose: print(" Done...") self._RANDOMIZATION_VECTORS = rvecs return self._RANDOMIZATION_VECTORS @property def frozen_goal_operator(self): """If the inverse problem provides `goal_operator`, construct it as a matrix, otherwise `None`""" if self.debug: print( "DEBUG: Evaluating FROZEN GOAL OPERATOR\n" f" - {self.inverse_problem}" ) if hasattr(self.inverse_problem, 'goal_operator'): if hasattr(self, "_FROZEN_GOAL_OPERATOR"): pred_oper = self._FROZEN_GOAL_OPERATOR else: pred_oper = None # Recalculate if pred_oper is None: # Create the prediction operator self._FROZEN_GOAL_OPERATOR = utility.asarray( self.inverse_problem.goal_operator.apply, shape=self.inverse_problem.goal_operator.shape, ) else: self._FROZEN_GOAL_OPERATOR = None if self.debug: print( "DEBUG: Exiting FROZEN GOAL OPERATOR CALLER" f" Returning {self._FROZEN_GOAL_OPERATOR=}" ) return self._FROZEN_GOAL_OPERATOR @property def frozen_goal_operator_adjoint(self): """If the inverse problem provides `goal_operator`, construct it as a matrix, otherwise `None`""" if hasattr(self.inverse_problem, 'goal_operator'): if hasattr(self, "_FROZEN_GOAL_OPERATOR_ADJOINT"): pred_oper = self._FROZEN_GOAL_OPERATOR_ADJOINT else: pred_oper = None # Recalculate if pred_oper is None: # Create the prediction operator self._FROZEN_GOAL_OPERATOR_ADJOINT = utility.asarray( self.inverse_problem.goal_operator.apply_adjoint, shape=self.inverse_problem.goal_operator.shape[::-1], ) else: self._FROZEN_GOAL_OPERATOR_ADJOINT = None return self._FROZEN_GOAL_OPERATOR_ADJOINT
# Add an alias so that utility functions and optimality criteria can be perceived # equally TrajectoryCriterionConfigs = TrajectoryUtilityFunctionConfigs TrajectoryCriterion = TrajectoryUtilityFunction