# Copyright © 2023, UChicago Argonne, LLC
# All Rights Reserved
"""
This module implements a Gaussian error model for support of fenics (dolfin)
models.
"""
import sys
import inspect
import re
from dataclasses import dataclass, replace
from typing import (
Callable,
Any,
)
import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as splinalg
from pyoed.models.core import (
ErrorModel,
ErrorModelConfigs,
)
from pyoed import utility
from pyoed.utility.mixins import (
RandomNumberGenerationMixin,
)
from pyoed.configs import (
validate_key,
set_configurations,
PyOEDConfigsValidationError,
)
try:
import dolfin as dl
import ufl
except:
dl = ufl = None
# Global (ish) variables
_LINSOLVE_TOL = 1e-12 # Base tolerance of linear system solves
_RANDOMIZATION_SAMPLE_SIZE = 50 # 250
[docs]
@dataclass(kw_only=True, slots=True)
class DolfinGaussianErrorModelConfigs(ErrorModelConfigs):
"""
Configuration settings for :py:class:`DolfinGaussianErrorModel`.
:param Vh: finite element space to which the distribution is assocciated
:param mean: mean of the error model
:param variance: variance/covariance of the error model
:param random_seed: random seed used when the Gaussian model is initiated. This is
useful for reproductivity.
"""
Vh: None | Any = None
mean: float | np.ndarray = 0.0
variance: float | np.ndarray | sp.spmatrix = 1.0
random_seed: int | None = None
[docs]
@set_configurations(DolfinGaussianErrorModelConfigs)
class DolfinGaussianErrorModel(ErrorModel, RandomNumberGenerationMixin):
"""
This class provides an implementation of Gaussian Models suitable for
Dolfin-based formulation of forward and inverse problems.
The Gaussian distribution is on the form
:math:`\\mathcal{N}(\\mathbf{x}, \\mathbf{B})`,
with :math:`\\mathbf{x}` being the mean, and :math:`\\mathbf{B}` being
the variance/covariance matrix/operator :math:`\\mathbf{B}`.
The underlying finite element space is assumed to be :math:`\\mathtt{R}`.
:param configs: the configurations object:
:raises:
- `ImportError`: if dolfin or ufl are not available/installed
- `TypeError`: if the passed mean or variance types are not supported
- `ValueError`: if no valid FEM space (DOF) `Vh` is found in `configs`
"""
# TODO: Add PDF functionality to this class following implementation in `GaussianErrorModel` above.
# TODO: This should be very straigh forward except for handling input dtype if needed.
[docs]
def __init__(self, configs: DolfinGaussianErrorModelConfigs | dict | None = None):
if None in (dl, ufl):
raise ImportError(
f"This class can be used only with dolfin (Python's "
f"Fenics interface) and UFL installed."
f"Failed to load/import these modules."
)
configs = self.configurations_class.data_to_dataclass(configs)
super().__init__(configs)
# Extract configurations
Vh = self.configurations.Vh
mean = self.configurations.mean
variance = self.configurations.variance
random_seed = self.configurations.random_seed
# Check the finite element space (Vh)
if Vh is None:
print(
"You MUST pass a vaild FEM space (DOF) in the configurations dictionary; Found None!"
)
raise ValueError
else:
# Validate the passed DOF as needed!
element_family = Vh.ufl_element().family().lower()
if element_family != "real":
print("Only meshes with finite elements of family 'Real' are supported")
print("The family associated with the passed DOF is '{0}'".format())
raise TypeError
else:
# all good
pass
# FEM space defines the dimension of the Gaussian distribution space
size = Vh.dim()
# Associate attributes; allow changing later by properties
self._Vh = Vh
self._SIZE = size
# Define a domain measure
self._DOM_MEASURE = dl.Constant(
1.0 / dl.assemble(dl.Constant(1.0) * ufl.dx(Vh.mesh()))
)
# DEfine the Mass matrix (Identity)
trial = dl.TrialFunction(Vh)
test = dl.TestFunction(Vh)
self._M = dl.assemble(self._DOM_MEASURE * ufl.inner(trial, test) * ufl.dx)
# Set mean and covariances
self.update_mean(mean)
self.update_covariance_matrix(variance)
# Mean is good; set the mean (convert properly to proper dl vector)
# Check mean (type and size) and variance data types and sizes/shapes
# 2- Covariances (and related factors (Cholesky, etc.)
self.update_covariance_matrix(variance)
#
# Initiate a design
self.update_design(np.ones(self._SIZE, dtype=bool))
# From variances, construct the lower cholesky decomposition (R) of the covariance matrix
# Update/reset the associated random number generator/seed
self.update_random_number_generator(random_seed=random_seed)
[docs]
def update_mean(self, mean):
"""
Update the mean of the distribution
"""
# Check mean (type and size) and variance data types and sizes/shapes
if utility.isnumber(mean) or isinstance(mean, np.ndarray):
mean = np.array(mean).flatten() # local copy
elif sp.issparse(mean):
mean = mean.toarray().flatten() # local copy
elif isinstance(mean, dl.Vector):
mean = mean.get_local() # this might be redundant, but let's be consistent
else:
print("`mean` is of unsupported type: {0}".format(type(mean)))
raise TypeError
# Assert/validate mean size
if self._SIZE == mean.size:
# all good (mean has the right size)
pass
elif mean.size == 1:
# repeat scalar mean (mean is passed as a scalar)
mean = np.ones(self._SIZE) * mean[0]
else:
# vector mean size mismatches passed size
print(
"size of '{0}' mismatches the size of the passed mean of '{1}'".format(
self._SIZE, mean.size
)
)
raise AssertionError
# Associate Mean attributes; allow changing later by properties
# 1- Mean (convert from np array to dl.Vector and set entries)
_mean = dl.Vector()
self._M.init_vector(_mean, 0)
_mean.set_local(mean)
self._MEAN = _mean
[docs]
def update_covariance_matrix(self, cov):
"""
Define the Variational forms of the covariance, precision,
and factors of the covariance matrix, then associate the passed covariance matrix
"""
# Check variance/covariance matrix shape
if isinstance(cov, np.ndarray) or utility.isnumber(cov):
cov = np.array(cov) # local copy
elif sp.issparse(cov):
pass
# cov = cov.copy() # local copy
else:
print("`variance` is of unsupported type: {0}".format(type(cov)))
raise TypeError
if cov.size == 1:
if sp.issparse(cov):
cov = cov.toarray()
cov = cov.flatten()[0]
# 1d variance
cov = sp.diags(
np.ones(self._SIZE) * cov,
offsets=0,
shape=(self._SIZE, self._SIZE),
format="csc",
)
elif self._SIZE == cov.size:
if sp.issparse(cov):
cov = cov.toarray()
# scalar/1d variance
cov = sp.diags(cov, offsets=0, shape=(self._SIZE, self._SIZE), format="csc")
elif (self._SIZE, self._SIZE) == cov.shape:
# Passed square matrix of the right shape
pass
else:
print("The passed covariances have incorrect shape {0}".format(cov.shape))
raise AssertionError
# Factorize (Lower Cholesky decomposition)
L = utility.factorize_spsd_matrix(cov, lower=True)
L_inv = splinalg.spsolve_triangular(L, np.eye(self._SIZE), lower=True)
# Cast covariances and precisions as ufl matrices/(2d)tensors
if sp.issparse(L):
stdev_op = ufl.as_matrix(L.toarray())
cov_op = ufl.as_matrix(L.dot(L.T).toarray())
else:
stdev_op = ufl.as_matrix(L)
cov_op = ufl.as_matrix(L.dot(L.T))
#
if sp.issparse(L_inv):
stdev_inv_op = ufl.as_matrix(L_inv.toarray())
cov_inv_op = ufl.as_matrix(Linv.T.dot(L_inv).toarray())
else:
stdev_inv_op = ufl.as_matrix(L_inv)
cov_inv_op = ufl.as_matrix(L_inv.T.dot(L_inv))
# Assemble matrices and associate attributes
if self._SIZE == 1:
trial = ufl.as_matrix([[dl.TrialFunction(self._Vh)]])
test = ufl.as_matrix([[dl.TestFunction(self._Vh)]])
else:
trial = dl.TrialFunction(self._Vh)
test = dl.TestFunction(self._Vh)
# i) Covariance matrix Cholesky Factor (STDEV)
varform = self._DOM_MEASURE * ufl.inner(test, ufl.dot(stdev_op, trial)) * ufl.dx
self._STDEV = dl.assemble(varform)
# ii) Cholesky Factor Inverse (STDEV_INV)
varform = (
self._DOM_MEASURE * ufl.inner(test, ufl.dot(stdev_inv_op, trial)) * ufl.dx
)
self._STDEV_INV = dl.assemble(varform)
# iii) Covariances are added for consistency;
# TODO: We will use factors for covariance operations
# TODO: This will be removed once we have full implementation
varform = self._DOM_MEASURE * ufl.inner(test, ufl.dot(cov_op, trial)) * ufl.dx
self._COV = dl.assemble(varform)
#
varform = (
self._DOM_MEASURE * ufl.inner(test, ufl.dot(cov_inv_op, trial)) * ufl.dx
)
self._PREC = dl.assemble(varform)
[docs]
def update_design(self, design):
"""
Update the design vector (which entries are active)
"""
if isinstance(design, np.ndarray):
design = np.asarray(design).flatten()
elif isinstance(design, dl.Vector):
design = design.get_local()
else:
print(
"Unsupported data type '{0}' was found in the passed design".format(
design.dtype
)
)
raise TypeError
if design.size != self.size:
print(
"The passed design vector has the wrong size of {0}; expected {1}".format(
design.size, self.size
)
)
raise AssertionError
if np.issubdtype(design.dtype, np.integer):
assert design.min() >= 0, "All design variables must be nonnegagive! "
design = design.astype(
bool
) # Should we convert to bool or to float allowing higher weights?
elif np.issubdtype(design.dtype, np.floating):
assert design.min() >= 0, "All design variables must be nonnegagive! "
elif np.issubdtype(design.dtype, np.bool_):
pass
else:
print(
"Unsupported data type '{0}' was found in the passed design".format(
design.dtype
)
)
raise TypeError
active_size = np.count_nonzero(design)
if active_size == 0:
print("Need to figure out how to handle the case of zero active entries!")
raise NotImplementedError("TODO...")
# Active sensors, and design variables
self._ACTIVE_SIZE = active_size
self._ACTIVE = design.astype(bool)
self._DESIGN = design
# self._DESIGN_WEIGHTS = 1.0 / np.power(design, 2)
self._DESIGN_WEIGHTS = np.power(design, 2)
if np.issubdtype(design.dtype, np.bool_):
active = np.where(design)[0]
data = np.ones(active.size)
row = np.arange(active.size)
col = active
design_mat = sp.csc_matrix(
(data, (row, col)), shape=(active.size, self.size)
)
elif np.issubdtype(design.dtype, np.floating):
active = np.where(design)[0]
data = 1.0 / np.sqrt(design[active])
row = np.arange(active.size)
col = active
design_mat = sp.csc_matrix(
(data, (row, col)), shape=(active.size, self.size)
)
else:
print("This should never happen!")
print(
"Unsupported data type '{0}' was found in the passed design".format(
design.dtype
)
)
raise TypeError
#
design_mat_op = ufl.as_matrix(design_mat.toarray())
out_Vh = dl.VectorFunctionSpace(
self._Vh.mesh(), family="Real", degree=0, dim=self._ACTIVE_SIZE
)
test = dl.TrialFunction(out_Vh)
trial = dl.TestFunction(self._Vh)
varform = (
self._DOM_MEASURE * ufl.inner(test, ufl.dot(design_mat_op, trial)) * ufl.dx
)
self._DESIGN_MAT = dl.assemble(varform)
[docs]
def create_vector(self, return_np=False, ignore_design=False):
"""
Create a vector compatible with the range/domain of the underlying covariance matrix
"""
vec = dl.Vector()
if ignore_design:
self._M.init_vector(vec, 0)
else:
self._DESIGN_MAT.init_vector(vec, 1)
vec.zero()
if return_np:
vec = vec.get_local()
return vec
[docs]
def generate_white_noise(
self, ignore_design=False, truncate=False, return_np=False
):
"""
Generate a standard normal random vector of size `size` with values truncated
at -/+3 if `truncate` is set to `True`
:param bool ignore_design: the size of the white noise vector is in the full
(ignore the design) if `True`, otherwise the size matches the dimension
of the active subspace.
:param bool truncate: if `True`, truncate the samples at -/+3, that is any sample
point/entry above 3 is set to 3, and any value below -3 is set to -3.
:param bool return_np: if `True`, convert the result into a numpy array (from fenics vector)
:returns: numpy array (even if the dimension is 1) containing white noise
"""
size = self.size if ignore_design else self.active_size
# Sample
white_noise = self.random_number_generator.standard_normal(size)
# Truncate if requested
if truncate:
white_noise[white_noise > 3] = 3
white_noise[white_noise < -3] = -3
# Convert to Fenics vector (unless `return_np`==`True`)
if not return_np:
_white_noise = self.create_vector(
return_np=False, ignore_design=ignore_design
)
_white_noise.set_local(white_noise)
white_noise = _white_noise
return white_noise
[docs]
def add_noise(self, x, in_place=False):
"""
Add random noise to the passed vector `x`
:param x: vector in a space of dimension equal to size of the underlying error model
:param bool in_place: overwrite the passed vector `x` (if `True`).
If set to `False`, a new instance (based on the distribution size) is created and returned.
This is ignored of course if `x` is a non-iterable, i.e., a scalar.
:raises: TypeError if x is of the wrong shape or type
"""
# Check type of the passed vector
if isinstance(x, np.ndarray):
x_size = x.size
elif isinstance(x, dl.Vector):
x_size = x.size()
else:
print("Unspoorted type '{0}' of passed vector x".formt(type(x)))
raise TypeError
# Check size of the passed vector
if self._ACTIVE_SIZE != x_size:
print(
"Size mismatch; expected vector of size {0}; received vector of size {1}".format(
self._ACTIVE_SIZE, x_size
)
)
raise TypeError
out_vec = x if in_place else x.copy()
# Scale and shift the white noise
if isinstance(x, np.ndarray):
out_vec += self.generate_noise(return_np=True)
else:
noise = self.generate_noise(return_np=False)
out_vec.axpy(1.0, noise)
return out_vec
[docs]
def generate_noise(self, return_np=False, ignore_design=False):
"""
Generate a random noise vector sampled from the underlying Gaussian distribution
"""
# Generate white noise: normal random noise with zero mean and variance 1.
white_noise = self.generate_white_noise(
ignore_design=True,
return_np=False,
truncate=False,
)
noise = self.covariance_sqrt_matvec(
white_noise,
lower=True,
ignore_design=True,
in_place=False,
)
# Add mean
if ignore_design:
noise.axpy(1.0, self._MEAN)
if return_np:
noise = noise.get_local()
else:
_noise = noise.get_local()[self._ACTIVE] + self._MEAN[self._ACTIVE]
if return_np:
noise = _noise
else:
noise = self.create_vector(
return_np=return_np, ignore_design=ignore_design
)
noise.set_local(_noise)
return noise
[docs]
def sample(self, return_np=False, ignore_design=False):
"""
Sample a random vector from the underlying Gaussian distribution
"""
# Add a scaled random noise (with the underlying covariance matrix) to the underlying mean
out_vec = self._MEAN.get_local()
if not ignore_design:
out_vec = out_vec[self._ACTIVE]
out_vec += self.generate_noise(return_np=True, ignore_design=ignore_design)
if not return_np:
_out_vec = self.create_vector(return_np=False, ignore_design=ignore_design)
_out_vec.set_local(out_vec)
out_vec = _out_vec
return out_vec
[docs]
def pdf(
self,
x,
normalize=False,
log=False,
):
"""
Evaluate the value of the density function (normalized or upto a *fixed* scaling constant)
at the passed state/vector.
:param x: realization of the random variable
:param bool normalize: scale the PDF by the normalization factor
:param bool log: if True, evaluate the gradient of the logarithm of the PDF
:returns: the value of the density function evaluated at the passed state/vector.
"""
pdf = self.log_density(
x=x,
normalize=normalize,
)
# scale (if needed) and return
if not log:
pdf = np.exp(pdf)
if abs(pdf) == 0:
pdf = 0 # Avoid retrning -0.0
return pdf
[docs]
def pdf_gradient(
self,
x,
normalize=False,
log=False,
):
"""
Evaluate the gradient of the density function at the passed state/vector.
:param x: realization of the random variable
:param bool normalize: scale the PDF by the normalization factor
:param bool log: if True, evaluate the gradient of the logarithm of the PDF
:returns: the value of the density function evaluated at the passed state/vector.
"""
grad = self.log_density_gradient(
x=x,
)
if not log:
grad *= self.pdf(
x=x,
log=False,
normalize=False,
)
if normalize:
scl = -self.active_size / 2.0 * np.log(2 * np.pi)
scl -= 0.5 * self.covariance_logdet()
grad *= scl
return grad
[docs]
def log_density(
self,
x,
normalize=False,
):
"""
Evaluate the logarithm of the density function at the passed state `x`.
:param x: realization of the random variable
:param bool normalize: scale the PDF by the normalization factor
:returns: the logarithm of the density function evaluated at the passed state.
"""
# Check passed state vs. the current distribution configurations
# Check type of the passed vector
if isinstance(x, dl.Vector):
x = x.get_local()
else:
try:
x = utility.asarray(x).flatten()
except:
print("Unspoorted type '{0}' of passed vector x".formt(type(x)))
raise TypeError
x_size = x.size
size = self.active_size
# Check size of the passed vector
if size != x_size:
raise TypeError(
f"Size mismatch; expected vector of size {size};"
f"received vector of size {x_size}"
)
# Get the mean
mean = self._MEAN.get_local()[self._ACTIVE]
# Evaluate the exponent
innov = x - mean
scaled_innov = self.covariance_inv_matvec(innov)
pdf = -0.5 * np.dot(innov, scaled_innov)
# Normalization constant (log-scale)
if normalize:
scl = -size / 2.0 * np.log(2 * np.pi)
scl -= 0.5 * utility.matrix_logdet(
self.covariance_matvec,
size=size,
method="exact",
)
pdf += scl
return pdf
[docs]
def log_density_gradient(
self,
x,
normalize=False,
):
"""
Evaluate the gradient of the logarithm of the density function at the passed state `x`.
:param x: realization of the random variable
:param bool normalize: scale the PDF by the normalization factor
:returns: the gradient of the logarithm of the density function evaluated at the passed state.
"""
# Check passed state vs. the current distribution configurations
# Check type of the passed vector
if isinstance(x, dl.Vector):
x = x.get_local()
else:
try:
x = utility.asarray(x).flatten()
except:
print("Unspoorted type '{0}' of passed vector x".formt(type(x)))
raise TypeError
x_size = x.size
size = self.active_size
# Check size of the passed vector
if size != x_size:
raise TypeError(
f"Size mismatch; expected vector of size {size};"
f"received vector of size {x_size}"
)
# Get the mean
mean = self._MEAN.get_local()[self._ACTIVE]
# Evaluate the exponent
innov = x - mean
grad = -self.covariance_inv_matvec(innov)
return grad
[docs]
def covariance_sqrt_matvec(
self, x, lower=True, return_np=False, in_place=False, ignore_design=False
):
"""
Evaluate and return the product of Square root (Lower Cholesky) covariance matrix
by a vector `x`. The product is determined by applying the cholesky decomposition
of the covariance matrix once (dot product) to the passed state `x`
:param x: vector in a space of dimension equal to the size of the
underlying error model. x` could be scalar or number array,
however, it must match the distribution size.
:param bool lower: use the lower Cholesky as the square root
:param bool in_place: overwrite the passed vector `x` (if `True`).
If set to `False`, a new instance (based on the distribution size)
is created and returned.
This is ignored of course if `x` is a non-iterable, i.e., a scalar.
:returns: the product of the sqrt-covariance matrix by `x`
"""
if ignore_design:
in_size = out_size = self._SIZE
elif lower:
in_size = self._SIZE
out_size = self._ACTIVE_SIZE
else:
in_size = self._ACTIVE_SIZE
out_size = self._SIZE
if out_size != in_size:
# print("Can't overwrite passed vector; size mismatch given the effect of the design!")
in_place = False
if isinstance(x, np.ndarray):
x_size = x.size
elif isinstance(x, dl.Vector):
x_size = x.size()
else:
print("Unspoorted type '{0}' of passed vector x".formt(type(x)))
raise TypeError
if not ignore_design:
if lower and (x_size == self._ACTIVE_SIZE < self._SIZE):
_x = self.create_vector(return_np=False, ignore_design=True)
_x.zero()
active = np.where(self.design)[0]
for i in range(self._ACTIVE_SIZE):
_x[active[i]] = x[i]
x = _x
x_size = x.size()
in_place = True
if x_size != in_size:
print(
"The passed vector has wrong size; expected {0}; found {1}".format(
in_size, x_size
)
)
raise TypeError
# Check type of the passed vector
if isinstance(x, np.ndarray):
in_vec = self.create_vector(return_np=False, ignore_design=ignore_design)
in_vec.set_local(x)
else:
in_vec = x.copy()
if lower:
tvec = self.create_vector(return_np=False, ignore_design=True)
self._STDEV.mult(in_vec, tvec)
if not ignore_design:
result = self.create_vector(return_np=False, ignore_design=False)
self._DESIGN_MAT.transpmult(tvec, result)
else:
result = tvec
else:
tvec = self.create_vector(return_np=False, ignore_design=True)
if ignore_design:
self._STDEV.transpmult(in_vec, tvec)
result = tvec
else:
result = self.create_vector(return_np=False, ignore_design=True)
#
self._DESIGN_MAT.mult(in_vec, tvec)
self._STDEV.transpmult(tvec, result)
# Multiply x by the covariance matrix
if out_size != in_size:
out_vec = self.create_vector(return_np=return_np, ignore_design=not lower)
else:
out_vec = x if in_place else x.copy()
if isinstance(out_vec, np.ndarray):
out_vec[:] = result.get_local()
else:
out_vec.set_local(result.get_local())
return out_vec
[docs]
def covariance_matvec(
self, x, return_np=False, in_place=False, ignore_design=False
):
"""
Evaluate and return the product of covariance matrix by a vector `x`.
The product is determined by applying the cholesky decomposition of the covariance matrix
twice (dot product) to the passed state `x`
Specifically, we assume the covariance matrix is on the form :math:`\\mathbf{L} \\mathbf{S} \\mathbf{S}^T \\mathbf{L}^T`, where :math:`\\mathbf{L}` is the design matrix (fat-short matrix), and :math:`\\mathbf{S}` is the lower Cholesky factor of the covariance matrix.
:param x: vector in a space of dimension equal to the size of the underlying error model.
`x` could be scalar or number array, however, it must match the distribution size.
:param bool in_place: overwrite the passed vector `x` (if `True`).
If set to `False`, a new instance (based on the distribution size) is created and returned.
This is ignored of course if `x` is a non-iterable, i.e., a scalar.
:returns: the product of the covariance matrix by `x`
"""
out_vec = self.covariance_sqrt_matvec(
x, lower=False, in_place=False, ignore_design=ignore_design, return_np=False
)
out_vec = self.covariance_sqrt_matvec(
out_vec,
lower=True,
in_place=True,
ignore_design=ignore_design,
return_np=return_np,
)
# TODO: Compare to assembling the full covariances matrix, and applying it!
return out_vec
[docs]
def covariance_diagonal(self, ignore_design=False):
"""
Return a copy the diagonal of the covariance matrix, i.e., the variances
:returns: 1D numpy array holding the diagonal of the underlying covariance matrix.
If the size of the underlying distribution is 1, this returns a scalar (variance)
"""
size = self._SIZE if ignore_design else self._ACTIVE_SIZE
diag = np.empty(size)
e_vec = self.create_vector(return_np=False, ignore_design=ignore_design)
for i in range(size):
e_vec.zero()
e_vec[i] = 1.0
diag[i] = self.covariance_matvec(e_vec, ignore_design=ignore_design)[i]
return diag
[docs]
def variance(self, ignore_design=False):
"""Same as :py:meth:`covariance_diagonal`"""
return self.covariance_diagonal(ignore_design=ignore_design)
[docs]
def covariance_inv_matvec(
self, x, in_place=False, return_np=False, ignore_design=False
):
"""
Evaluate and return the product of inverse of the error covariance matrix by a vector `x`.
A linear system is solved using the cholesky decomposition of the covariance matrix
:param x: vector in a space of dimension equal to the size of the underlying error model.
`x` could be scalar or number array, however, it must match the distribution size.
:param bool in_place: overwrite the passed vector `x` (if `True`).
If set to `False`, a new instance (based on the distribution size) is created and returned.
This is ignored of course if `x` is a non-iterable, i.e., a scalar.
:returns: the product of inverse of the error covariance matrix by `x`
:raises: ValueError if solving the linear system using LGMRES fails
"""
if isinstance(x, np.ndarray):
is_dl = False
in_vec = x.copy()
pass
elif isinstance(x, dl.Vector):
is_dl = True
in_vec = x.get_local()
else:
print("Unspoorted type '{0}' of passed vector x".formt(type(x)))
raise TypeError
# Define an operator out of self.covariance_matvec, then solve a linear system
# solve a linear system to find covariance-inverse vector-product
matvec = lambda x: self.covariance_matvec(
x, in_place=False, return_np=True, ignore_design=ignore_design
)
size = self._SIZE if ignore_design else self._ACTIVE_SIZE
A = splinalg.LinearOperator((size, size), matvec=matvec)
_x = x[:]
# print("DEBUG: Gaussian covariance_inv_matvec: x: ", _x)
if _x.size == 0:
# When all entries are deactivated
_out = _x.copy()
else:
# TODO: Add other solvers and preconditioners as needed!
_out = splinalg.gmres(A=A, b=_x)
if not _out[1]:
# Solution is successful
_out = _out[0]
else:
print(
"Failed to apply LGMRES to solve the linear system involving the covariance matrix"
)
raise ValueError
if in_place:
out = x
if is_dl:
out.set_local(_out)
else:
out[:] = _out
else:
if not return_np:
out = self.create_vector(return_np=False, ignore_design=ignore_design)
out.set_local(_out)
else:
out = _out
return out
[docs]
def covariance_sqrt_inv_matvec(
self, x, lower=True, return_np=False, in_place=False, ignore_design=False
):
"""
Evaluate and return the product of inverse of the square root of the
error covariance matrix by a vector `x`.
A linear system is solved using the cholesky decomposition of the covariance matrix
:param x: vector in a space of dimension equal to the size of the underlying error model.
`x` could be scalar or number array, however, it must match the distribution size.
:param bool lower: use the lower Cholesky as the square root
:param bool in_place: overwrite the passed vector `x` (if `True`).
If set to `False`, a new instance (based on the distribution size) is created and returned.
This is ignored of course if `x` is a non-iterable, i.e., a scalar.
:returns: the product of inverse of the square root of the error covariance matrix
by `x`
:raises: ValueError if solving the linear system using LGMRES fails
"""
if isinstance(x, np.ndarray):
is_dl = False
in_vec = x.copy()
pass
elif isinstance(x, dl.Vector):
is_dl = True
in_vec = x.get_local()
else:
print("Unspoorted type '{0}' of passed vector x".formt(type(x)))
raise TypeError
# Define an operator out of self.covariance_matvec, then solve a linear system
# solve a linear system to find covariance-inverse vector-product
matvec = lambda x: self.covariance_sqrt_matvec(
x, lower=lower, in_place=False, return_np=True, ignore_design=ignore_design
)
size = self._SIZE if ignore_design else self._ACTIVE_SIZE
A = splinalg.LinearOperator((size, size), matvec=matvec)
_x = x[:]
# print("DEBUG: Gaussian covariance_inv_matvec: x: ", _x)
if _x.size == 0:
# When all entries are deactivated
_out = _x.copy()
else:
# TODO: Add other solvers and preconditioners as needed!
_out = splinalg.gmres(A=A, b=_x)
if not _out[1]:
# Solution is successful
_out = _out[0]
else:
print(
"Failed to apply LGMRES to solve the linear system involving the covariance matrix"
)
raise ValueError
if in_place:
out = x
if is_dl:
out.set_local(_out)
else:
out[:] = _out
else:
if not return_np:
out = self.create_vector(return_np=False, ignore_design=ignore_design)
out.set_local(_out)
else:
out = _out
return out
[docs]
def covariance_matrix(self, ignore_design=False):
"""
Construct a numpy array representation (if the dimension is more than 1)
of the underlying covariance matrix. (projected onto the observation space)
"""
e_vec = self.create_vector(return_np=False, ignore_design=ignore_design)
size = e_vec.size()
cov = np.empty((size, size))
for j in range(size):
e_vec.zero()
e_vec[j] = 1.0
cov[:, j] = self.covariance_matvec(
e_vec, in_place=True, return_np=False, ignore_design=ignore_design
).get_local()
return cov
[docs]
def precision_matrix(self, ignore_design=False):
"""
Construct a numpy array representation (if the dimension is more than 1)
of the precision matrix (inverse of the underlying covariance matrix).
"""
e_vec = self.create_vector(return_np=False, ignore_design=ignore_design)
size = e_vec.size()
precisions = np.empty((size, size))
for j in range(size):
e_vec.zero()
e_vec[j] = 1.0
precisions[:, j] = self.covariance_inv_matvec(
e_vec, in_place=True, return_np=False, ignore_design=ignore_design
).get_local()
return precisions
[docs]
def covariance_trace(
self,
ignore_design=False,
method="exact",
sample_size: int = _RANDOMIZATION_SAMPLE_SIZE,
optimize_sample_size: bool = False,
min_sample_size: int = 10,
max_sample_size: int = 100,
distribution: str = "Rademacher",
accuracy: float = 1e-2,
) -> float:
"""
Evaluate/Approximate the trace of the covariance matrix
.. note::
This is a wrapper around the utility function `pyoed.utility.math.matrix_trace`
that is called with the method :py:meth:`covariance_matvec` as the operator
to compute the trace of.
:param bool ignore_design: the size of the white noise vector is in the full
(ignore the design) if `True`, otherwise the size matches the dimension
of the active subspace.
:param method: Method of evaluation. If "exact", we simply sum the diagonal of A. If
"randomized", a Hutchinson style trace estimator, as described in
:py:meth:`pyoed.utility.trace_estimator`, is employed. All of the futher keyword
arguments are in the case of method="randomized" and are otherwise ignored if
method="exact".
:param randomization_vectors: The randomization vectors drawn to perform the
computation. If not passed, they will be sampled.
:param sample_size: The number of samples to use. :param optimize_sample_size: if
`True`, `sample_size` is replaced with an optimized version, where the optimal
sample size minimizes the variance of the estimator and is between
`min_sample_size`, and `max_sample_size`.
:param min_sample_size: Minimum number of random samples.
:param max_sample_size: Maximum number of random samples.
:param distribution: The probability distribution used for random sampling. Both
'Rademacher' and 'Gaussian' are supported.
:param random_seed: The random seed to use for the distribution samples.
:param accuracy: convergence criterion.
:returns: exact/approximate value of the trace of the covariance matrix.
of the underlying model.
"""
if re.match(r"\Aexact\Z", method, re.IGNORECASE):
variance = self.covariance_diagonal(ignore_design=ignore_design)
tr = np.sum(variance)
elif re.match(r"\Arandom(ized)*\Z", method, re.IGNORECASE):
size = self._SIZE if ignore_design else self._ACTIVE_SIZE
matvec = lambda x: self.covariance_matvec(
x, ignore_design=ignore_design, return_np=True, in_place=False
)
out = utility.trace_estimator(
matvec_fun=matvec,
vector_size=size,
sample_size=sample_size,
optimize_sample_size=optimize_sample_size,
min_sample_size=min_sample_size,
max_sample_size=max_sample_size,
distribution=distribution,
accuracy=accuracy,
)
if out[0]:
# success
tr = out[1]
else:
print(
"The randomized approximation of the trace has not converged, hence is untrusted!"
)
tr = out[1]
else:
print("Unsupported evaluation method '{0}'".format(method))
raise ValueError
return tr
[docs]
def covariance_logdet(
self,
ignore_design=False,
method="exact",
sample_size: int = _RANDOMIZATION_SAMPLE_SIZE,
optimize_sample_size: bool = False,
min_sample_size: int = 10,
max_sample_size: int = 100,
distribution: str = "Rademacher",
accuracy: float = 1e-2,
) -> float:
"""
Evaluate/approximate the logarithm of the determinant of the covariance
matrix of the passed error model
.. note::
This is a wrapper around the utility function `pyoed.utility.math.matrix_logdet`
that is called with the method :py:meth:`covariance_matvec` as the operator
to compute the trace of.
:param bool ignore_design: the size of the white noise vector is in the full
(ignore the design) if `True`, otherwise the size matches the dimension
of the active subspace.
:param method: Method of evaluation. If "exact", we simply sum the diagonal of A.
If "randomized", a chebyshev-approximation type-method is employed from
:py:meth:`pyoed.utility.logdet_estimator`. All of the futher keyword arguments
are in the case of method="randomized" and are otherwise ignored if
method="exact".
:param randomization_vectors: The randomization vectors drawn to perform the
computation. If not passed, they will be sampled.
:param sample_size: The number of samples to use. :param optimize_sample_size: if
`True`, `sample_size` is replaced with an optimized version, where the optimal
sample size minimizes the variance of the estimator and is between
`min_sample_size`, and `max_sample_size`.
:param min_sample_size: Minimum number of random samples.
:param max_sample_size: Maximum number of random samples.
:param distribution: The probability distribution used for random sampling. Both
'Rademacher' and 'Gaussian' are supported.
:param random_seed: The random seed to use for the distribution samples.
:param accuracy: convergence criterion.
:returns: exact/approximate value of the logdet of the covariance matrix
of the underlying model.
"""
if re.match(r"\Aexact\Z", method, re.IGNORECASE):
# Determinant of a triangular matrix L is the product of it's diagonal
# The idea below
# det (L L^T) = det(L) det (L^T) --> log(det(L L^T)) = 2 sum log(diagonal(L))
# Note taht this is valid ONLY when all entries are active;
# Otherwise, refactoring the projected covariance matrix is needed;
if ignore_design:
ldiag = self.create_vector(ignore_design=ignore_design, return_np=False)
self._STDEV.get_diagonal(ldiag)
ldiag = ldiag.get_local()
logdet = 2 * np.sum(np.log(ldiag))
elif self._SIZE == self._ACTIVE_SIZE:
# Do not ignore design; it only scales, but don't project
# extract diagonal of self._STDEV
ldiag = self.create_vector(ignore_design=ignore_design, return_np=False)
self._STDEV.get_diagonal(ldiag)
self._DESIGN_MAT.transpmult(ldiag.copy(), ldiag)
ldiag = ldiag.get_local()
logdet = 2 * np.sum(np.log(ldiag))
else:
# Design has effect; get covariance matrix, and refactor
# Do not ignore design; it both scales and projects
covmat = self.covariance_matrix(ignore_design=False)
L = utility.factorize_spsd_matrix(covmat)
ldiag = np.diagonal(L).copy()
logdet = 2 * np.sum(np.log(ldiag))
elif re.match(r"\Arandom(ized)*\Z", method, re.IGNORECASE):
size = self._SIZE if ignore_design else self._ACTIVE_SIZE
matvec = lambda x: self.covariance_matvec(
x, ignore_design=ignore_design, return_np=True, in_place=False
)
logdet = utility.logdet_estimator(
matvec,
vector_size=size,
sample_size=sample_size,
optimize_sample_size=optimize_sample_size,
min_sample_size=min_sample_size,
max_sample_size=max_sample_size,
distribution=distribution,
accuracy=accuracy,
)
else:
print("Unsupported evaluation method '{0}'".format(method))
raise ValueError
return logdet
[docs]
def kl_divergence(self, other):
return kl_divergence(self, other)
@property
def size(self):
"""Dimension of the underlying probability distribution"""
return self._SIZE
# This property will be confusing; removing. Will build another function if needed to retrieve the Cholesky factor in the full space
# @property
# def stdv(self):
# return utility.asarray(self._STDEV)
@property
def design(self):
"""Get a copy of the design vector"""
return self._DESIGN.copy()
@design.setter
def design(self, design):
"""
Update the design vector; If the design is non-binary; a design weight matrix is
constructed as the square root of the passed design; this pre- and post-multiply
the observation error covariance matrix. Othe behaviour can be define if needed!
"""
self.update_design(design)
@property
def extended_design(self):
"""
Return a vector indicating active/inactive entries (with proper replication for multiple prognostic variables)
of the observation vector/range
"""
raise NotImplementedError("TODO")
@property
def active(self):
"""flags corresponding to active/inactive dimensions"""
return np.where(self._ACTIVE)[0] # copy()
[docs]
def active_size(self):
"""Dimension of the probability distribution projected onto the design space (only active/nonzero entries of the design)"""
return np.count_nonzero(self.active)
@property
def mean(self):
"""Get a copy of the distribution mean"""
return self._MEAN.get_local()[self._ACTIVE].copy()
@mean.setter
def mean(self, new_mean):
"""Update distribution mean (in the full space; regardless the design)"""
self.update_mean(new_mean)