Source code for pyoed.models.error_models.Gaussian._fenics_Gaussian

# Copyright © 2023, UChicago Argonne, LLC
# All Rights Reserved


"""
This module implements a Gaussian error model for support of fenics (dolfin)
models.
"""

import sys
import inspect
import re
from dataclasses import dataclass, replace
from typing import (
    Callable,
    Any,
)

import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as splinalg

from pyoed.models.core import (
    ErrorModel,
    ErrorModelConfigs,
)
from pyoed import utility
from pyoed.utility.mixins import (
    RandomNumberGenerationMixin,
)
from pyoed.configs import (
    validate_key,
    set_configurations,
    PyOEDConfigsValidationError,
)

try:
    import dolfin as dl
    import ufl
except:
    dl = ufl = None


# Global (ish) variables
_LINSOLVE_TOL = 1e-12  # Base tolerance of linear system solves
_RANDOMIZATION_SAMPLE_SIZE = 50  # 250


[docs] @dataclass(kw_only=True, slots=True) class DolfinGaussianErrorModelConfigs(ErrorModelConfigs): """ Configuration settings for :py:class:`DolfinGaussianErrorModel`. :param Vh: finite element space to which the distribution is assocciated :param mean: mean of the error model :param variance: variance/covariance of the error model :param random_seed: random seed used when the Gaussian model is initiated. This is useful for reproductivity. """ Vh: None | Any = None mean: float | np.ndarray = 0.0 variance: float | np.ndarray | sp.spmatrix = 1.0 random_seed: int | None = None
[docs] @set_configurations(DolfinGaussianErrorModelConfigs) class DolfinGaussianErrorModel(ErrorModel, RandomNumberGenerationMixin): """ This class provides an implementation of Gaussian Models suitable for Dolfin-based formulation of forward and inverse problems. The Gaussian distribution is on the form :math:`\\mathcal{N}(\\mathbf{x}, \\mathbf{B})`, with :math:`\\mathbf{x}` being the mean, and :math:`\\mathbf{B}` being the variance/covariance matrix/operator :math:`\\mathbf{B}`. The underlying finite element space is assumed to be :math:`\\mathtt{R}`. :param configs: the configurations object: :raises: - `ImportError`: if dolfin or ufl are not available/installed - `TypeError`: if the passed mean or variance types are not supported - `ValueError`: if no valid FEM space (DOF) `Vh` is found in `configs` """ # TODO: Add PDF functionality to this class following implementation in `GaussianErrorModel` above. # TODO: This should be very straigh forward except for handling input dtype if needed.
[docs] def __init__(self, configs: DolfinGaussianErrorModelConfigs | dict | None = None): if None in (dl, ufl): raise ImportError( f"This class can be used only with dolfin (Python's " f"Fenics interface) and UFL installed." f"Failed to load/import these modules." ) configs = self.configurations_class.data_to_dataclass(configs) super().__init__(configs) # Extract configurations Vh = self.configurations.Vh mean = self.configurations.mean variance = self.configurations.variance random_seed = self.configurations.random_seed # Check the finite element space (Vh) if Vh is None: print( "You MUST pass a vaild FEM space (DOF) in the configurations dictionary; Found None!" ) raise ValueError else: # Validate the passed DOF as needed! element_family = Vh.ufl_element().family().lower() if element_family != "real": print("Only meshes with finite elements of family 'Real' are supported") print("The family associated with the passed DOF is '{0}'".format()) raise TypeError else: # all good pass # FEM space defines the dimension of the Gaussian distribution space size = Vh.dim() # Associate attributes; allow changing later by properties self._Vh = Vh self._SIZE = size # Define a domain measure self._DOM_MEASURE = dl.Constant( 1.0 / dl.assemble(dl.Constant(1.0) * ufl.dx(Vh.mesh())) ) # DEfine the Mass matrix (Identity) trial = dl.TrialFunction(Vh) test = dl.TestFunction(Vh) self._M = dl.assemble(self._DOM_MEASURE * ufl.inner(trial, test) * ufl.dx) # Set mean and covariances self.update_mean(mean) self.update_covariance_matrix(variance) # Mean is good; set the mean (convert properly to proper dl vector) # Check mean (type and size) and variance data types and sizes/shapes # 2- Covariances (and related factors (Cholesky, etc.) self.update_covariance_matrix(variance) # # Initiate a design self.update_design(np.ones(self._SIZE, dtype=bool)) # From variances, construct the lower cholesky decomposition (R) of the covariance matrix # Update/reset the associated random number generator/seed self.update_random_number_generator(random_seed=random_seed)
[docs] def update_mean(self, mean): """ Update the mean of the distribution """ # Check mean (type and size) and variance data types and sizes/shapes if utility.isnumber(mean) or isinstance(mean, np.ndarray): mean = np.array(mean).flatten() # local copy elif sp.issparse(mean): mean = mean.toarray().flatten() # local copy elif isinstance(mean, dl.Vector): mean = mean.get_local() # this might be redundant, but let's be consistent else: print("`mean` is of unsupported type: {0}".format(type(mean))) raise TypeError # Assert/validate mean size if self._SIZE == mean.size: # all good (mean has the right size) pass elif mean.size == 1: # repeat scalar mean (mean is passed as a scalar) mean = np.ones(self._SIZE) * mean[0] else: # vector mean size mismatches passed size print( "size of '{0}' mismatches the size of the passed mean of '{1}'".format( self._SIZE, mean.size ) ) raise AssertionError # Associate Mean attributes; allow changing later by properties # 1- Mean (convert from np array to dl.Vector and set entries) _mean = dl.Vector() self._M.init_vector(_mean, 0) _mean.set_local(mean) self._MEAN = _mean
[docs] def update_covariance_matrix(self, cov): """ Define the Variational forms of the covariance, precision, and factors of the covariance matrix, then associate the passed covariance matrix """ # Check variance/covariance matrix shape if isinstance(cov, np.ndarray) or utility.isnumber(cov): cov = np.array(cov) # local copy elif sp.issparse(cov): pass # cov = cov.copy() # local copy else: print("`variance` is of unsupported type: {0}".format(type(cov))) raise TypeError if cov.size == 1: if sp.issparse(cov): cov = cov.toarray() cov = cov.flatten()[0] # 1d variance cov = sp.diags( np.ones(self._SIZE) * cov, offsets=0, shape=(self._SIZE, self._SIZE), format="csc", ) elif self._SIZE == cov.size: if sp.issparse(cov): cov = cov.toarray() # scalar/1d variance cov = sp.diags(cov, offsets=0, shape=(self._SIZE, self._SIZE), format="csc") elif (self._SIZE, self._SIZE) == cov.shape: # Passed square matrix of the right shape pass else: print("The passed covariances have incorrect shape {0}".format(cov.shape)) raise AssertionError # Factorize (Lower Cholesky decomposition) L = utility.factorize_spsd_matrix(cov, lower=True) L_inv = splinalg.spsolve_triangular(L, np.eye(self._SIZE), lower=True) # Cast covariances and precisions as ufl matrices/(2d)tensors if sp.issparse(L): stdev_op = ufl.as_matrix(L.toarray()) cov_op = ufl.as_matrix(L.dot(L.T).toarray()) else: stdev_op = ufl.as_matrix(L) cov_op = ufl.as_matrix(L.dot(L.T)) # if sp.issparse(L_inv): stdev_inv_op = ufl.as_matrix(L_inv.toarray()) cov_inv_op = ufl.as_matrix(Linv.T.dot(L_inv).toarray()) else: stdev_inv_op = ufl.as_matrix(L_inv) cov_inv_op = ufl.as_matrix(L_inv.T.dot(L_inv)) # Assemble matrices and associate attributes if self._SIZE == 1: trial = ufl.as_matrix([[dl.TrialFunction(self._Vh)]]) test = ufl.as_matrix([[dl.TestFunction(self._Vh)]]) else: trial = dl.TrialFunction(self._Vh) test = dl.TestFunction(self._Vh) # i) Covariance matrix Cholesky Factor (STDEV) varform = self._DOM_MEASURE * ufl.inner(test, ufl.dot(stdev_op, trial)) * ufl.dx self._STDEV = dl.assemble(varform) # ii) Cholesky Factor Inverse (STDEV_INV) varform = ( self._DOM_MEASURE * ufl.inner(test, ufl.dot(stdev_inv_op, trial)) * ufl.dx ) self._STDEV_INV = dl.assemble(varform) # iii) Covariances are added for consistency; # TODO: We will use factors for covariance operations # TODO: This will be removed once we have full implementation varform = self._DOM_MEASURE * ufl.inner(test, ufl.dot(cov_op, trial)) * ufl.dx self._COV = dl.assemble(varform) # varform = ( self._DOM_MEASURE * ufl.inner(test, ufl.dot(cov_inv_op, trial)) * ufl.dx ) self._PREC = dl.assemble(varform)
[docs] def update_design(self, design): """ Update the design vector (which entries are active) """ if isinstance(design, np.ndarray): design = np.asarray(design).flatten() elif isinstance(design, dl.Vector): design = design.get_local() else: print( "Unsupported data type '{0}' was found in the passed design".format( design.dtype ) ) raise TypeError if design.size != self.size: print( "The passed design vector has the wrong size of {0}; expected {1}".format( design.size, self.size ) ) raise AssertionError if np.issubdtype(design.dtype, np.integer): assert design.min() >= 0, "All design variables must be nonnegagive! " design = design.astype( bool ) # Should we convert to bool or to float allowing higher weights? elif np.issubdtype(design.dtype, np.floating): assert design.min() >= 0, "All design variables must be nonnegagive! " elif np.issubdtype(design.dtype, np.bool_): pass else: print( "Unsupported data type '{0}' was found in the passed design".format( design.dtype ) ) raise TypeError active_size = np.count_nonzero(design) if active_size == 0: print("Need to figure out how to handle the case of zero active entries!") raise NotImplementedError("TODO...") # Active sensors, and design variables self._ACTIVE_SIZE = active_size self._ACTIVE = design.astype(bool) self._DESIGN = design # self._DESIGN_WEIGHTS = 1.0 / np.power(design, 2) self._DESIGN_WEIGHTS = np.power(design, 2) if np.issubdtype(design.dtype, np.bool_): active = np.where(design)[0] data = np.ones(active.size) row = np.arange(active.size) col = active design_mat = sp.csc_matrix( (data, (row, col)), shape=(active.size, self.size) ) elif np.issubdtype(design.dtype, np.floating): active = np.where(design)[0] data = 1.0 / np.sqrt(design[active]) row = np.arange(active.size) col = active design_mat = sp.csc_matrix( (data, (row, col)), shape=(active.size, self.size) ) else: print("This should never happen!") print( "Unsupported data type '{0}' was found in the passed design".format( design.dtype ) ) raise TypeError # design_mat_op = ufl.as_matrix(design_mat.toarray()) out_Vh = dl.VectorFunctionSpace( self._Vh.mesh(), family="Real", degree=0, dim=self._ACTIVE_SIZE ) test = dl.TrialFunction(out_Vh) trial = dl.TestFunction(self._Vh) varform = ( self._DOM_MEASURE * ufl.inner(test, ufl.dot(design_mat_op, trial)) * ufl.dx ) self._DESIGN_MAT = dl.assemble(varform)
[docs] def create_vector(self, return_np=False, ignore_design=False): """ Create a vector compatible with the range/domain of the underlying covariance matrix """ vec = dl.Vector() if ignore_design: self._M.init_vector(vec, 0) else: self._DESIGN_MAT.init_vector(vec, 1) vec.zero() if return_np: vec = vec.get_local() return vec
[docs] def generate_white_noise( self, ignore_design=False, truncate=False, return_np=False ): """ Generate a standard normal random vector of size `size` with values truncated at -/+3 if `truncate` is set to `True` :param bool ignore_design: the size of the white noise vector is in the full (ignore the design) if `True`, otherwise the size matches the dimension of the active subspace. :param bool truncate: if `True`, truncate the samples at -/+3, that is any sample point/entry above 3 is set to 3, and any value below -3 is set to -3. :param bool return_np: if `True`, convert the result into a numpy array (from fenics vector) :returns: numpy array (even if the dimension is 1) containing white noise """ size = self.size if ignore_design else self.active_size # Sample white_noise = self.random_number_generator.standard_normal(size) # Truncate if requested if truncate: white_noise[white_noise > 3] = 3 white_noise[white_noise < -3] = -3 # Convert to Fenics vector (unless `return_np`==`True`) if not return_np: _white_noise = self.create_vector( return_np=False, ignore_design=ignore_design ) _white_noise.set_local(white_noise) white_noise = _white_noise return white_noise
[docs] def add_noise(self, x, in_place=False): """ Add random noise to the passed vector `x` :param x: vector in a space of dimension equal to size of the underlying error model :param bool in_place: overwrite the passed vector `x` (if `True`). If set to `False`, a new instance (based on the distribution size) is created and returned. This is ignored of course if `x` is a non-iterable, i.e., a scalar. :raises: TypeError if x is of the wrong shape or type """ # Check type of the passed vector if isinstance(x, np.ndarray): x_size = x.size elif isinstance(x, dl.Vector): x_size = x.size() else: print("Unspoorted type '{0}' of passed vector x".formt(type(x))) raise TypeError # Check size of the passed vector if self._ACTIVE_SIZE != x_size: print( "Size mismatch; expected vector of size {0}; received vector of size {1}".format( self._ACTIVE_SIZE, x_size ) ) raise TypeError out_vec = x if in_place else x.copy() # Scale and shift the white noise if isinstance(x, np.ndarray): out_vec += self.generate_noise(return_np=True) else: noise = self.generate_noise(return_np=False) out_vec.axpy(1.0, noise) return out_vec
[docs] def generate_noise(self, return_np=False, ignore_design=False): """ Generate a random noise vector sampled from the underlying Gaussian distribution """ # Generate white noise: normal random noise with zero mean and variance 1. white_noise = self.generate_white_noise( ignore_design=True, return_np=False, truncate=False, ) noise = self.covariance_sqrt_matvec( white_noise, lower=True, ignore_design=True, in_place=False, ) # Add mean if ignore_design: noise.axpy(1.0, self._MEAN) if return_np: noise = noise.get_local() else: _noise = noise.get_local()[self._ACTIVE] + self._MEAN[self._ACTIVE] if return_np: noise = _noise else: noise = self.create_vector( return_np=return_np, ignore_design=ignore_design ) noise.set_local(_noise) return noise
[docs] def sample(self, return_np=False, ignore_design=False): """ Sample a random vector from the underlying Gaussian distribution """ # Add a scaled random noise (with the underlying covariance matrix) to the underlying mean out_vec = self._MEAN.get_local() if not ignore_design: out_vec = out_vec[self._ACTIVE] out_vec += self.generate_noise(return_np=True, ignore_design=ignore_design) if not return_np: _out_vec = self.create_vector(return_np=False, ignore_design=ignore_design) _out_vec.set_local(out_vec) out_vec = _out_vec return out_vec
[docs] def pdf( self, x, normalize=False, log=False, ): """ Evaluate the value of the density function (normalized or upto a *fixed* scaling constant) at the passed state/vector. :param x: realization of the random variable :param bool normalize: scale the PDF by the normalization factor :param bool log: if True, evaluate the gradient of the logarithm of the PDF :returns: the value of the density function evaluated at the passed state/vector. """ pdf = self.log_density( x=x, normalize=normalize, ) # scale (if needed) and return if not log: pdf = np.exp(pdf) if abs(pdf) == 0: pdf = 0 # Avoid retrning -0.0 return pdf
[docs] def pdf_gradient( self, x, normalize=False, log=False, ): """ Evaluate the gradient of the density function at the passed state/vector. :param x: realization of the random variable :param bool normalize: scale the PDF by the normalization factor :param bool log: if True, evaluate the gradient of the logarithm of the PDF :returns: the value of the density function evaluated at the passed state/vector. """ grad = self.log_density_gradient( x=x, ) if not log: grad *= self.pdf( x=x, log=False, normalize=False, ) if normalize: scl = -self.active_size / 2.0 * np.log(2 * np.pi) scl -= 0.5 * self.covariance_logdet() grad *= scl return grad
[docs] def log_density( self, x, normalize=False, ): """ Evaluate the logarithm of the density function at the passed state `x`. :param x: realization of the random variable :param bool normalize: scale the PDF by the normalization factor :returns: the logarithm of the density function evaluated at the passed state. """ # Check passed state vs. the current distribution configurations # Check type of the passed vector if isinstance(x, dl.Vector): x = x.get_local() else: try: x = utility.asarray(x).flatten() except: print("Unspoorted type '{0}' of passed vector x".formt(type(x))) raise TypeError x_size = x.size size = self.active_size # Check size of the passed vector if size != x_size: raise TypeError( f"Size mismatch; expected vector of size {size};" f"received vector of size {x_size}" ) # Get the mean mean = self._MEAN.get_local()[self._ACTIVE] # Evaluate the exponent innov = x - mean scaled_innov = self.covariance_inv_matvec(innov) pdf = -0.5 * np.dot(innov, scaled_innov) # Normalization constant (log-scale) if normalize: scl = -size / 2.0 * np.log(2 * np.pi) scl -= 0.5 * utility.matrix_logdet( self.covariance_matvec, size=size, method="exact", ) pdf += scl return pdf
[docs] def log_density_gradient( self, x, normalize=False, ): """ Evaluate the gradient of the logarithm of the density function at the passed state `x`. :param x: realization of the random variable :param bool normalize: scale the PDF by the normalization factor :returns: the gradient of the logarithm of the density function evaluated at the passed state. """ # Check passed state vs. the current distribution configurations # Check type of the passed vector if isinstance(x, dl.Vector): x = x.get_local() else: try: x = utility.asarray(x).flatten() except: print("Unspoorted type '{0}' of passed vector x".formt(type(x))) raise TypeError x_size = x.size size = self.active_size # Check size of the passed vector if size != x_size: raise TypeError( f"Size mismatch; expected vector of size {size};" f"received vector of size {x_size}" ) # Get the mean mean = self._MEAN.get_local()[self._ACTIVE] # Evaluate the exponent innov = x - mean grad = -self.covariance_inv_matvec(innov) return grad
[docs] def covariance_sqrt_matvec( self, x, lower=True, return_np=False, in_place=False, ignore_design=False ): """ Evaluate and return the product of Square root (Lower Cholesky) covariance matrix by a vector `x`. The product is determined by applying the cholesky decomposition of the covariance matrix once (dot product) to the passed state `x` :param x: vector in a space of dimension equal to the size of the underlying error model. x` could be scalar or number array, however, it must match the distribution size. :param bool lower: use the lower Cholesky as the square root :param bool in_place: overwrite the passed vector `x` (if `True`). If set to `False`, a new instance (based on the distribution size) is created and returned. This is ignored of course if `x` is a non-iterable, i.e., a scalar. :returns: the product of the sqrt-covariance matrix by `x` """ if ignore_design: in_size = out_size = self._SIZE elif lower: in_size = self._SIZE out_size = self._ACTIVE_SIZE else: in_size = self._ACTIVE_SIZE out_size = self._SIZE if out_size != in_size: # print("Can't overwrite passed vector; size mismatch given the effect of the design!") in_place = False if isinstance(x, np.ndarray): x_size = x.size elif isinstance(x, dl.Vector): x_size = x.size() else: print("Unspoorted type '{0}' of passed vector x".formt(type(x))) raise TypeError if not ignore_design: if lower and (x_size == self._ACTIVE_SIZE < self._SIZE): _x = self.create_vector(return_np=False, ignore_design=True) _x.zero() active = np.where(self.design)[0] for i in range(self._ACTIVE_SIZE): _x[active[i]] = x[i] x = _x x_size = x.size() in_place = True if x_size != in_size: print( "The passed vector has wrong size; expected {0}; found {1}".format( in_size, x_size ) ) raise TypeError # Check type of the passed vector if isinstance(x, np.ndarray): in_vec = self.create_vector(return_np=False, ignore_design=ignore_design) in_vec.set_local(x) else: in_vec = x.copy() if lower: tvec = self.create_vector(return_np=False, ignore_design=True) self._STDEV.mult(in_vec, tvec) if not ignore_design: result = self.create_vector(return_np=False, ignore_design=False) self._DESIGN_MAT.transpmult(tvec, result) else: result = tvec else: tvec = self.create_vector(return_np=False, ignore_design=True) if ignore_design: self._STDEV.transpmult(in_vec, tvec) result = tvec else: result = self.create_vector(return_np=False, ignore_design=True) # self._DESIGN_MAT.mult(in_vec, tvec) self._STDEV.transpmult(tvec, result) # Multiply x by the covariance matrix if out_size != in_size: out_vec = self.create_vector(return_np=return_np, ignore_design=not lower) else: out_vec = x if in_place else x.copy() if isinstance(out_vec, np.ndarray): out_vec[:] = result.get_local() else: out_vec.set_local(result.get_local()) return out_vec
[docs] def covariance_matvec( self, x, return_np=False, in_place=False, ignore_design=False ): """ Evaluate and return the product of covariance matrix by a vector `x`. The product is determined by applying the cholesky decomposition of the covariance matrix twice (dot product) to the passed state `x` Specifically, we assume the covariance matrix is on the form :math:`\\mathbf{L} \\mathbf{S} \\mathbf{S}^T \\mathbf{L}^T`, where :math:`\\mathbf{L}` is the design matrix (fat-short matrix), and :math:`\\mathbf{S}` is the lower Cholesky factor of the covariance matrix. :param x: vector in a space of dimension equal to the size of the underlying error model. `x` could be scalar or number array, however, it must match the distribution size. :param bool in_place: overwrite the passed vector `x` (if `True`). If set to `False`, a new instance (based on the distribution size) is created and returned. This is ignored of course if `x` is a non-iterable, i.e., a scalar. :returns: the product of the covariance matrix by `x` """ out_vec = self.covariance_sqrt_matvec( x, lower=False, in_place=False, ignore_design=ignore_design, return_np=False ) out_vec = self.covariance_sqrt_matvec( out_vec, lower=True, in_place=True, ignore_design=ignore_design, return_np=return_np, ) # TODO: Compare to assembling the full covariances matrix, and applying it! return out_vec
[docs] def covariance_diagonal(self, ignore_design=False): """ Return a copy the diagonal of the covariance matrix, i.e., the variances :returns: 1D numpy array holding the diagonal of the underlying covariance matrix. If the size of the underlying distribution is 1, this returns a scalar (variance) """ size = self._SIZE if ignore_design else self._ACTIVE_SIZE diag = np.empty(size) e_vec = self.create_vector(return_np=False, ignore_design=ignore_design) for i in range(size): e_vec.zero() e_vec[i] = 1.0 diag[i] = self.covariance_matvec(e_vec, ignore_design=ignore_design)[i] return diag
[docs] def variance(self, ignore_design=False): """Same as :py:meth:`covariance_diagonal`""" return self.covariance_diagonal(ignore_design=ignore_design)
[docs] def covariance_inv_matvec( self, x, in_place=False, return_np=False, ignore_design=False ): """ Evaluate and return the product of inverse of the error covariance matrix by a vector `x`. A linear system is solved using the cholesky decomposition of the covariance matrix :param x: vector in a space of dimension equal to the size of the underlying error model. `x` could be scalar or number array, however, it must match the distribution size. :param bool in_place: overwrite the passed vector `x` (if `True`). If set to `False`, a new instance (based on the distribution size) is created and returned. This is ignored of course if `x` is a non-iterable, i.e., a scalar. :returns: the product of inverse of the error covariance matrix by `x` :raises: ValueError if solving the linear system using LGMRES fails """ if isinstance(x, np.ndarray): is_dl = False in_vec = x.copy() pass elif isinstance(x, dl.Vector): is_dl = True in_vec = x.get_local() else: print("Unspoorted type '{0}' of passed vector x".formt(type(x))) raise TypeError # Define an operator out of self.covariance_matvec, then solve a linear system # solve a linear system to find covariance-inverse vector-product matvec = lambda x: self.covariance_matvec( x, in_place=False, return_np=True, ignore_design=ignore_design ) size = self._SIZE if ignore_design else self._ACTIVE_SIZE A = splinalg.LinearOperator((size, size), matvec=matvec) _x = x[:] # print("DEBUG: Gaussian covariance_inv_matvec: x: ", _x) if _x.size == 0: # When all entries are deactivated _out = _x.copy() else: # TODO: Add other solvers and preconditioners as needed! _out = splinalg.gmres(A=A, b=_x) if not _out[1]: # Solution is successful _out = _out[0] else: print( "Failed to apply LGMRES to solve the linear system involving the covariance matrix" ) raise ValueError if in_place: out = x if is_dl: out.set_local(_out) else: out[:] = _out else: if not return_np: out = self.create_vector(return_np=False, ignore_design=ignore_design) out.set_local(_out) else: out = _out return out
[docs] def covariance_sqrt_inv_matvec( self, x, lower=True, return_np=False, in_place=False, ignore_design=False ): """ Evaluate and return the product of inverse of the square root of the error covariance matrix by a vector `x`. A linear system is solved using the cholesky decomposition of the covariance matrix :param x: vector in a space of dimension equal to the size of the underlying error model. `x` could be scalar or number array, however, it must match the distribution size. :param bool lower: use the lower Cholesky as the square root :param bool in_place: overwrite the passed vector `x` (if `True`). If set to `False`, a new instance (based on the distribution size) is created and returned. This is ignored of course if `x` is a non-iterable, i.e., a scalar. :returns: the product of inverse of the square root of the error covariance matrix by `x` :raises: ValueError if solving the linear system using LGMRES fails """ if isinstance(x, np.ndarray): is_dl = False in_vec = x.copy() pass elif isinstance(x, dl.Vector): is_dl = True in_vec = x.get_local() else: print("Unspoorted type '{0}' of passed vector x".formt(type(x))) raise TypeError # Define an operator out of self.covariance_matvec, then solve a linear system # solve a linear system to find covariance-inverse vector-product matvec = lambda x: self.covariance_sqrt_matvec( x, lower=lower, in_place=False, return_np=True, ignore_design=ignore_design ) size = self._SIZE if ignore_design else self._ACTIVE_SIZE A = splinalg.LinearOperator((size, size), matvec=matvec) _x = x[:] # print("DEBUG: Gaussian covariance_inv_matvec: x: ", _x) if _x.size == 0: # When all entries are deactivated _out = _x.copy() else: # TODO: Add other solvers and preconditioners as needed! _out = splinalg.gmres(A=A, b=_x) if not _out[1]: # Solution is successful _out = _out[0] else: print( "Failed to apply LGMRES to solve the linear system involving the covariance matrix" ) raise ValueError if in_place: out = x if is_dl: out.set_local(_out) else: out[:] = _out else: if not return_np: out = self.create_vector(return_np=False, ignore_design=ignore_design) out.set_local(_out) else: out = _out return out
[docs] def covariance_matrix(self, ignore_design=False): """ Construct a numpy array representation (if the dimension is more than 1) of the underlying covariance matrix. (projected onto the observation space) """ e_vec = self.create_vector(return_np=False, ignore_design=ignore_design) size = e_vec.size() cov = np.empty((size, size)) for j in range(size): e_vec.zero() e_vec[j] = 1.0 cov[:, j] = self.covariance_matvec( e_vec, in_place=True, return_np=False, ignore_design=ignore_design ).get_local() return cov
[docs] def precision_matrix(self, ignore_design=False): """ Construct a numpy array representation (if the dimension is more than 1) of the precision matrix (inverse of the underlying covariance matrix). """ e_vec = self.create_vector(return_np=False, ignore_design=ignore_design) size = e_vec.size() precisions = np.empty((size, size)) for j in range(size): e_vec.zero() e_vec[j] = 1.0 precisions[:, j] = self.covariance_inv_matvec( e_vec, in_place=True, return_np=False, ignore_design=ignore_design ).get_local() return precisions
[docs] def covariance_trace( self, ignore_design=False, method="exact", sample_size: int = _RANDOMIZATION_SAMPLE_SIZE, optimize_sample_size: bool = False, min_sample_size: int = 10, max_sample_size: int = 100, distribution: str = "Rademacher", accuracy: float = 1e-2, ) -> float: """ Evaluate/Approximate the trace of the covariance matrix .. note:: This is a wrapper around the utility function `pyoed.utility.math.matrix_trace` that is called with the method :py:meth:`covariance_matvec` as the operator to compute the trace of. :param bool ignore_design: the size of the white noise vector is in the full (ignore the design) if `True`, otherwise the size matches the dimension of the active subspace. :param method: Method of evaluation. If "exact", we simply sum the diagonal of A. If "randomized", a Hutchinson style trace estimator, as described in :py:meth:`pyoed.utility.trace_estimator`, is employed. All of the futher keyword arguments are in the case of method="randomized" and are otherwise ignored if method="exact". :param randomization_vectors: The randomization vectors drawn to perform the computation. If not passed, they will be sampled. :param sample_size: The number of samples to use. :param optimize_sample_size: if `True`, `sample_size` is replaced with an optimized version, where the optimal sample size minimizes the variance of the estimator and is between `min_sample_size`, and `max_sample_size`. :param min_sample_size: Minimum number of random samples. :param max_sample_size: Maximum number of random samples. :param distribution: The probability distribution used for random sampling. Both 'Rademacher' and 'Gaussian' are supported. :param random_seed: The random seed to use for the distribution samples. :param accuracy: convergence criterion. :returns: exact/approximate value of the trace of the covariance matrix. of the underlying model. """ if re.match(r"\Aexact\Z", method, re.IGNORECASE): variance = self.covariance_diagonal(ignore_design=ignore_design) tr = np.sum(variance) elif re.match(r"\Arandom(ized)*\Z", method, re.IGNORECASE): size = self._SIZE if ignore_design else self._ACTIVE_SIZE matvec = lambda x: self.covariance_matvec( x, ignore_design=ignore_design, return_np=True, in_place=False ) out = utility.trace_estimator( matvec_fun=matvec, vector_size=size, sample_size=sample_size, optimize_sample_size=optimize_sample_size, min_sample_size=min_sample_size, max_sample_size=max_sample_size, distribution=distribution, accuracy=accuracy, ) if out[0]: # success tr = out[1] else: print( "The randomized approximation of the trace has not converged, hence is untrusted!" ) tr = out[1] else: print("Unsupported evaluation method '{0}'".format(method)) raise ValueError return tr
[docs] def covariance_logdet( self, ignore_design=False, method="exact", sample_size: int = _RANDOMIZATION_SAMPLE_SIZE, optimize_sample_size: bool = False, min_sample_size: int = 10, max_sample_size: int = 100, distribution: str = "Rademacher", accuracy: float = 1e-2, ) -> float: """ Evaluate/approximate the logarithm of the determinant of the covariance matrix of the passed error model .. note:: This is a wrapper around the utility function `pyoed.utility.math.matrix_logdet` that is called with the method :py:meth:`covariance_matvec` as the operator to compute the trace of. :param bool ignore_design: the size of the white noise vector is in the full (ignore the design) if `True`, otherwise the size matches the dimension of the active subspace. :param method: Method of evaluation. If "exact", we simply sum the diagonal of A. If "randomized", a chebyshev-approximation type-method is employed from :py:meth:`pyoed.utility.logdet_estimator`. All of the futher keyword arguments are in the case of method="randomized" and are otherwise ignored if method="exact". :param randomization_vectors: The randomization vectors drawn to perform the computation. If not passed, they will be sampled. :param sample_size: The number of samples to use. :param optimize_sample_size: if `True`, `sample_size` is replaced with an optimized version, where the optimal sample size minimizes the variance of the estimator and is between `min_sample_size`, and `max_sample_size`. :param min_sample_size: Minimum number of random samples. :param max_sample_size: Maximum number of random samples. :param distribution: The probability distribution used for random sampling. Both 'Rademacher' and 'Gaussian' are supported. :param random_seed: The random seed to use for the distribution samples. :param accuracy: convergence criterion. :returns: exact/approximate value of the logdet of the covariance matrix of the underlying model. """ if re.match(r"\Aexact\Z", method, re.IGNORECASE): # Determinant of a triangular matrix L is the product of it's diagonal # The idea below # det (L L^T) = det(L) det (L^T) --> log(det(L L^T)) = 2 sum log(diagonal(L)) # Note taht this is valid ONLY when all entries are active; # Otherwise, refactoring the projected covariance matrix is needed; if ignore_design: ldiag = self.create_vector(ignore_design=ignore_design, return_np=False) self._STDEV.get_diagonal(ldiag) ldiag = ldiag.get_local() logdet = 2 * np.sum(np.log(ldiag)) elif self._SIZE == self._ACTIVE_SIZE: # Do not ignore design; it only scales, but don't project # extract diagonal of self._STDEV ldiag = self.create_vector(ignore_design=ignore_design, return_np=False) self._STDEV.get_diagonal(ldiag) self._DESIGN_MAT.transpmult(ldiag.copy(), ldiag) ldiag = ldiag.get_local() logdet = 2 * np.sum(np.log(ldiag)) else: # Design has effect; get covariance matrix, and refactor # Do not ignore design; it both scales and projects covmat = self.covariance_matrix(ignore_design=False) L = utility.factorize_spsd_matrix(covmat) ldiag = np.diagonal(L).copy() logdet = 2 * np.sum(np.log(ldiag)) elif re.match(r"\Arandom(ized)*\Z", method, re.IGNORECASE): size = self._SIZE if ignore_design else self._ACTIVE_SIZE matvec = lambda x: self.covariance_matvec( x, ignore_design=ignore_design, return_np=True, in_place=False ) logdet = utility.logdet_estimator( matvec, vector_size=size, sample_size=sample_size, optimize_sample_size=optimize_sample_size, min_sample_size=min_sample_size, max_sample_size=max_sample_size, distribution=distribution, accuracy=accuracy, ) else: print("Unsupported evaluation method '{0}'".format(method)) raise ValueError return logdet
[docs] def kl_divergence(self, other): return kl_divergence(self, other)
@property def size(self): """Dimension of the underlying probability distribution""" return self._SIZE # This property will be confusing; removing. Will build another function if needed to retrieve the Cholesky factor in the full space # @property # def stdv(self): # return utility.asarray(self._STDEV) @property def design(self): """Get a copy of the design vector""" return self._DESIGN.copy() @design.setter def design(self, design): """ Update the design vector; If the design is non-binary; a design weight matrix is constructed as the square root of the passed design; this pre- and post-multiply the observation error covariance matrix. Othe behaviour can be define if needed! """ self.update_design(design) @property def extended_design(self): """ Return a vector indicating active/inactive entries (with proper replication for multiple prognostic variables) of the observation vector/range """ raise NotImplementedError("TODO") @property def active(self): """flags corresponding to active/inactive dimensions""" return np.where(self._ACTIVE)[0] # copy()
[docs] def active_size(self): """Dimension of the probability distribution projected onto the design space (only active/nonzero entries of the design)""" return np.count_nonzero(self.active)
@property def mean(self): """Get a copy of the distribution mean""" return self._MEAN.get_local()[self._ACTIVE].copy() @mean.setter def mean(self, new_mean): """Update distribution mean (in the full space; regardless the design)""" self.update_mean(new_mean)