Source code for pyoed.models.error_models.Gaussian._fenics_Gaussian

# Copyright © 2023, UChicago Argonne, LLC
# All Rights Reserved

"""
Gaussian error model for FEniCSx-based forward problems.

This module has been migrated from the legacy FEniCS (dolfin 2019) API to
**FEniCSx (dolfinx 0.10)**.  Because :class:`DolfinGaussianErrorModel` uses a
*Real* (constant) finite-element space, the FEM assembly machinery in the
original code reduced to plain linear-algebra operations on constant-coefficient
matrices.  The migrated implementation stores all covariance operators as
:mod:`numpy`/:mod:`scipy.sparse` arrays, removing the runtime dependency on
``dolfin``/PETSc for the Gaussian model specifically.

A ``dolfinx`` function space can still be passed as ``Vh`` to extract the DOF
count automatically; a plain ``int`` is also accepted for convenience.
"""

import re
from dataclasses import dataclass
from typing import Any

import numpy as np
import scipy.sparse as sp
import scipy.sparse.linalg as splinalg

import pyoed
from pyoed.models.core import (
    ErrorModel,
    ErrorModelConfigs,
)
from pyoed import utility
from pyoed.utility.mixins import (
    RandomNumberGenerationMixin,
)
from pyoed.configs import (
    validate_key,
    set_configurations,
    PyOEDConfigsValidationError,
)

try:
    import dolfinx
    import dolfinx.fem as fem
    _DOLFINX_AVAILABLE = True
except ImportError:
    dolfinx = fem = None
    _DOLFINX_AVAILABLE = False


# Module-level constants
_LINSOLVE_TOL = 1e-12          # Base tolerance for iterative linear solves
_RANDOMIZATION_SAMPLE_SIZE = 50


def _extract_size(Vh) -> int:
    """
    Extract the DOF count from *Vh*.

    :param Vh: A :class:`dolfinx.fem.FunctionSpace`, a legacy ``dolfin``
        function space (duck-typed via ``dim()``), or a plain :class:`int`.
    :returns: Number of degrees of freedom.
    :raises TypeError: If *Vh* is not a recognised type.
    :raises ValueError: If the extracted size is not positive.
    """
    if isinstance(Vh, int):
        size = Vh
    elif _DOLFINX_AVAILABLE and isinstance(Vh, fem.FunctionSpace):
        size = Vh.dofmap.index_map.size_global
    else:
        # Duck-type: support legacy dolfin FunctionSpace if still importable
        try:
            size = Vh.dofmap.index_map.size_global
        except AttributeError:
            try:
                size = Vh.dim()
            except AttributeError:
                raise TypeError(
                    "Vh must be a dolfinx.fem.FunctionSpace or a positive integer; "
                    f"got {type(Vh)!r}"
                )
    if size <= 0:
        raise ValueError(f"Vh must yield a positive DOF count; got {size}")
    return size


[docs] @dataclass(kw_only=True, slots=True) class DolfinGaussianErrorModelConfigs(ErrorModelConfigs): """ Configuration settings for :py:class:`DolfinGaussianErrorModel`. :param Vh: Finite-element function space that defines the dimension of the distribution. Accepts a :class:`dolfinx.fem.FunctionSpace` *or* a plain :class:`int` (the DOF count). :param mean: Mean of the distribution. Scalar or numpy array; scalar is broadcast to the full DOF count. :param variance: Variance / covariance of the distribution. Accepts a scalar (isotropic), a 1-D numpy array (diagonal), or a 2-D array / scipy sparse matrix (full covariance). :param random_seed: Seed for the internal random-number generator. ``None`` defers to global numpy rules. """ Vh: None | Any = None mean: float | np.ndarray = 0.0 variance: float | np.ndarray | sp.spmatrix = 1.0 random_seed: int | None = None
[docs] @set_configurations(DolfinGaussianErrorModelConfigs) class DolfinGaussianErrorModel(ErrorModel, RandomNumberGenerationMixin): """ Gaussian error model for FEniCSx-based forward problems. The distribution is :math:`\\mathcal{N}(\\boldsymbol{\\mu}, \\mathbf{B})` where :math:`\\boldsymbol{\\mu}` is the mean and :math:`\\mathbf{B}` is the covariance matrix. Internally the covariance is stored as its lower Cholesky factor :math:`\\mathbf{L}` (so :math:`\\mathbf{B} = \\mathbf{L} \\mathbf{L}^{\\top}`); all matvec operations are performed in :mod:`numpy`/:mod:`scipy`. A finite-element function space *Vh* (or its DOF count as a plain :class:`int`) defines the dimension of the distribution. The mesh geometry is **not** required; only the DOF count is used. :param configs: Configuration object or dictionary. See :class:`DolfinGaussianErrorModelConfigs` for accepted keys. :raises TypeError: If *mean* or *variance* are of unsupported types, or if *Vh* is not a recognised type. :raises ValueError: If *Vh* is ``None``, or if the mean / covariance sizes are inconsistent with the DOF count. :raises AssertionError: If the design vector has the wrong size or contains invalid values. """
[docs] def __init__( self, configs: DolfinGaussianErrorModelConfigs | dict | None = None ): configs = self.configurations_class.data_to_dataclass(configs) super().__init__(configs) Vh = self.configurations.Vh mean = self.configurations.mean variance = self.configurations.variance random_seed = self.configurations.random_seed if Vh is None: raise ValueError( "You MUST pass a valid FEM space (or integer DOF count) as 'Vh'; " "found None." ) self._Vh = Vh self._SIZE: int = _extract_size(Vh) # Set mean and covariance (populates _MEAN, _STDEV, _STDEV_INV, _COV, _PREC) self.update_mean(mean) self.update_covariance_matrix(variance) # Initialise design to "all sensors active" self.update_design(np.ones(self._SIZE, dtype=bool)) # Seed the random number generator self.update_random_number_generator(random_seed=random_seed)
# ── Mean ────────────────────────────────────────────────────────────────
[docs] def update_mean(self, mean) -> None: """ Update the distribution mean. :param mean: Scalar, 1-D numpy array, or scipy sparse vector. Scalars are broadcast to the full DOF count. :raises TypeError: If *mean* is of an unsupported type. :raises AssertionError: If the mean size mismatches the DOF count. """ if utility.isnumber(mean) or isinstance(mean, np.ndarray): mean = np.array(mean, dtype=float).flatten() elif sp.issparse(mean): mean = mean.toarray().flatten() else: raise TypeError(f"`mean` is of unsupported type: {type(mean)!r}") if mean.size == self._SIZE: pass elif mean.size == 1: mean = np.ones(self._SIZE) * mean[0] else: raise AssertionError( f"Mean size {mean.size} does not match the DOF count {self._SIZE}." ) # Store as plain numpy array (compatible with the assimilation layer) self._MEAN: np.ndarray = mean.copy()
# ── Covariance ──────────────────────────────────────────────────────────
[docs] def update_covariance_matrix(self, cov) -> None: """ Define the covariance operator and its Cholesky factorisation. Accepted forms for *cov*: * **Scalar** – isotropic covariance :math:`\\sigma^2 \\mathbf{I}`. * **1-D array** of length *n* – diagonal covariance. * **2-D array** or **scipy sparse matrix** of shape (*n*, *n*) – full covariance (must be symmetric positive semi-definite). :raises TypeError: If *cov* is of an unsupported type. :raises AssertionError: If the covariance shape is incompatible with the DOF count. """ if isinstance(cov, np.ndarray) or utility.isnumber(cov): cov = np.array(cov) elif sp.issparse(cov): pass else: raise TypeError(f"`variance` is of unsupported type: {type(cov)!r}") # Normalise to a sparse square matrix if cov.size == 1: if sp.issparse(cov): cov = cov.toarray() scalar = float(cov.flatten()[0]) cov = sp.diags( np.ones(self._SIZE) * scalar, offsets=0, shape=(self._SIZE, self._SIZE), format="csc", ) elif cov.size == self._SIZE: if sp.issparse(cov): cov = cov.toarray() cov = sp.diags( cov.flatten(), offsets=0, shape=(self._SIZE, self._SIZE), format="csc" ) elif cov.shape == (self._SIZE, self._SIZE): pass else: raise AssertionError( f"Covariance shape {cov.shape} is incompatible with DOF count {self._SIZE}." ) # Lower Cholesky factor L such that cov = L L^T L = utility.factorize_spsd_matrix(cov, lower=True) if sp.issparse(L): L_dense = L.toarray() else: L_dense = np.asarray(L, dtype=float) L_inv = splinalg.spsolve_triangular( sp.csc_matrix(L_dense), np.eye(self._SIZE), lower=True ) # Store as dense numpy arrays for efficient matvec operations self._STDEV: np.ndarray = L_dense # L (lower Cholesky) self._STDEV_INV: np.ndarray = L_inv # L^{-1} self._COV: np.ndarray = L_dense @ L_dense.T # B = L L^T self._PREC: np.ndarray = L_inv.T @ L_inv # B^{-1} = L^{-T} L^{-1}
# ── Design ──────────────────────────────────────────────────────────────
[docs] def update_design(self, design) -> None: """ Update the experimental design (which sensors are active). :param design: 1-D numpy array of length *n*. * **Boolean** – each entry flags whether the corresponding sensor is active. * **Integer** – treated as boolean after a non-negativity check. * **Float** – non-negative weights; zero entries are treated as inactive. :raises TypeError: If *design* is not a numpy array or has an unsupported dtype. :raises AssertionError: If *design* has the wrong size or contains negative entries. :raises NotImplementedError: If all sensors are deactivated. """ if not isinstance(design, np.ndarray): raise TypeError( f"Unsupported type '{type(design)!r}' for the design vector; " "expected a numpy array." ) design = np.asarray(design).flatten() if design.size != self.size: raise AssertionError( f"Design vector size {design.size} does not match DOF count {self.size}." ) if np.issubdtype(design.dtype, np.integer): assert design.min() >= 0, "All design variables must be non-negative." design = design.astype(bool) elif np.issubdtype(design.dtype, np.floating): assert design.min() >= 0, "All design variables must be non-negative." elif np.issubdtype(design.dtype, np.bool_): pass else: raise TypeError( f"Unsupported dtype '{design.dtype}' in the design vector." ) if np.count_nonzero(design) == 0: raise NotImplementedError( "All sensors are inactive (zero active entries in the design). " "This case is not yet supported." ) self.configurations.design = self._DESIGN = design.copy() self._DESIGN_WEIGHTS = np.power(design, 2) # Build the sparse design (selection/weighting) matrix D of shape # (active_size, size) such that D @ x extracts/weights active entries. active = np.where(design)[0] if np.issubdtype(design.dtype, np.bool_): data = np.ones(active.size) else: # floating weights data = 1.0 / np.sqrt(design[active]) row = np.arange(active.size) self._DESIGN_MAT: sp.csc_matrix = sp.csc_matrix( (data, (row, active)), shape=(active.size, self.size) )
# ── Vector helpers ──────────────────────────────────────────────────────
[docs] def create_vector( self, return_np: bool = True, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Create a zero vector compatible with the covariance domain. :param bool return_np: Always ``True``; kept for API compatibility. :param bool ignore_design: If ``True`` the full DOF space is used; otherwise the active subspace is used. :returns: 1-D zero numpy array. """ size = self._SIZE if ignore_design else self.active_size return np.zeros(size)
# ── Noise generation ────────────────────────────────────────────────────
[docs] def generate_white_noise( self, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, truncate: bool = False, return_np: bool = True, ) -> np.ndarray: """ Generate a standard normal random vector. :param bool ignore_design: Use the full space (``True``) or the active subspace (``False``). :param bool truncate: If ``True``, clamp samples to :math:`[-3, 3]`. :param bool return_np: Always ``True``; kept for API compatibility. :returns: 1-D numpy array of standard-normal samples. """ size = self.size if ignore_design else self.active_size white_noise = self.random_number_generator.standard_normal(size) if truncate: white_noise = np.clip(white_noise, -3.0, 3.0) return white_noise
[docs] def add_noise(self, x, in_place: bool = False) -> np.ndarray: """ Add a random noise sample to the vector *x*. :param x: 1-D numpy array of size ``active_size``. :param bool in_place: If ``True``, overwrite *x*; otherwise return a new array. :returns: Noisy copy of *x* (or *x* itself if *in_place*). :raises TypeError: If *x* is not a numpy array or has the wrong size. """ if not isinstance(x, np.ndarray): raise TypeError( f"Unsupported type '{type(x)!r}' for vector x; expected numpy array." ) if self.active_size != x.size: raise TypeError( f"Size mismatch: expected {self.active_size}, got {x.size}." ) out_vec = x if in_place else x.copy() out_vec += self.generate_noise(return_np=True) return out_vec
[docs] def generate_noise( self, return_np: bool = True, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Sample a random noise vector from :math:`\\mathcal{N}(\\boldsymbol{\\mu}, \\mathbf{B})`. :param bool return_np: Always ``True``; kept for API compatibility. :param bool ignore_design: If ``True``, return a vector in the full space; otherwise return a vector in the active subspace. :returns: 1-D numpy array containing the noise sample (including mean). """ white_noise = self.generate_white_noise(ignore_design=True, return_np=True) noise = self.covariance_sqrt_matvec( white_noise, lower=True, ignore_design=True, return_np=True ) if ignore_design: return noise + self._MEAN return noise[self.active_indexes] + self._MEAN[self.active_indexes]
[docs] def sample( self, return_np: bool = True, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Sample a random vector from :math:`\\mathcal{N}(\\boldsymbol{\\mu}, \\mathbf{B})`. :param bool return_np: Always ``True``; kept for API compatibility. :param bool ignore_design: If ``True``, return a vector in the full space; otherwise return a vector in the active subspace. :returns: 1-D numpy array containing the sample. """ return self.generate_noise(return_np=True, ignore_design=ignore_design)
# ── Density ─────────────────────────────────────────────────────────────
[docs] def pdf( self, x, normalize: bool = False, log: bool = False, ) -> float: """ Evaluate the Gaussian density at *x*. :param x: Realization of the random variable (1-D array-like). :param bool normalize: Include the normalization constant. :param bool log: Return the log-density instead of the density. :returns: Scalar density (or log-density) value. """ val = self.log_density(x=x, normalize=normalize) if not log: val = np.exp(val) if val == 0.0: val = 0.0 # Avoid -0.0 return float(val)
[docs] def pdf_gradient( self, x, normalize: bool = False, log: bool = False, ) -> np.ndarray: """ Evaluate the gradient of the Gaussian density at *x*. :param x: Realization of the random variable. :param bool normalize: Include the normalization factor. :param bool log: Return the gradient of the log-density. :returns: 1-D numpy array (gradient vector). """ grad = self.log_density_gradient(x=x) if not log: grad = grad * self.pdf(x=x, log=False, normalize=False) if normalize: size = self.active_size if self.project_onto_active_design_space else self.size scl = -size / 2.0 * np.log(2 * np.pi) scl -= 0.5 * self.covariance_logdet() grad = grad * scl return grad
[docs] def log_density( self, x, normalize: bool = False, ) -> float: """ Evaluate the log-density at *x*. :param x: Realization of the random variable. :param bool normalize: Include the log-normalization constant. :returns: Scalar log-density value. :raises TypeError: If *x* has the wrong size. """ x_np = np.asarray(x, dtype=float).flatten() size = self.active_size if self.project_onto_active_design_space else self.size if x_np.size != size: raise TypeError( f"Size mismatch: expected {size}, got {x_np.size}." ) mean = self._MEAN[self.active_indexes] if self.project_onto_active_design_space else self._MEAN innov = x_np - mean scaled_innov = self.covariance_inv_matvec(innov, return_np=True) val = -0.5 * float(np.dot(innov, scaled_innov)) if normalize: scl = -size / 2.0 * np.log(2 * np.pi) scl -= 0.5 * utility.matrix_logdet( self.covariance_matvec, size=size, method="exact" ) val += scl return val
[docs] def log_density_gradient( self, x, normalize: bool = False, ) -> np.ndarray: """ Evaluate the gradient of the log-density at *x*. :param x: Realization of the random variable. :param bool normalize: Unused (included for API symmetry). :returns: 1-D numpy array (gradient vector). :raises TypeError: If *x* has the wrong size. """ x_np = np.asarray(x, dtype=float).flatten() size = self.active_size if self.project_onto_active_design_space else self.size if x_np.size != size: raise TypeError( f"Size mismatch: expected {size}, got {x_np.size}." ) mean = self._MEAN[self.active_indexes] if self.project_onto_active_design_space else self._MEAN innov = x_np - mean return -self.covariance_inv_matvec(innov, return_np=True)
# ── Covariance matvecs ───────────────────────────────────────────────────
[docs] def covariance_sqrt_matvec( self, x, lower: bool = True, return_np: bool = True, in_place: bool = False, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Apply the square root of the covariance matrix to *x*. The covariance square root is the lower Cholesky factor :math:`\\mathbf{L}` (so :math:`\\mathbf{B} = \\mathbf{L}\\mathbf{L}^{\\top}`). When *ignore_design* is ``False``, the design matrix :math:`\\mathbf{D}` is incorporated: * ``lower=True``: :math:`\\mathbf{D} \\mathbf{L} \\, x` * ``lower=False``: :math:`\\mathbf{L}^{\\top} \\mathbf{D}^{\\top} x` :param x: Input vector (1-D array-like). :param bool lower: Apply :math:`\\mathbf{L}` (``True``) or :math:`\\mathbf{L}^{\\top}` (``False``). :param bool return_np: Always ``True``; kept for API compatibility. :param bool in_place: If ``True`` and *x* is a numpy array, overwrite *x* with the result. :param bool ignore_design: Use the full space (``True``) or the active subspace (``False``). :returns: 1-D numpy result array. :raises TypeError: If *x* has the wrong size. """ x_np = np.asarray(x, dtype=float).flatten() if ignore_design: if x_np.size != self._SIZE: raise TypeError( f"Expected size {self._SIZE}, got {x_np.size}." ) out = self._STDEV @ x_np if lower else self._STDEV.T @ x_np else: if lower: if x_np.size != self._SIZE: raise TypeError( f"Expected full size {self._SIZE}, got {x_np.size}." ) tmp = self._STDEV @ x_np out = self._DESIGN_MAT.dot(tmp) else: if x_np.size != self.active_size: raise TypeError( f"Expected active size {self.active_size}, got {x_np.size}." ) tmp = self._DESIGN_MAT.T.dot(x_np) out = self._STDEV.T @ tmp if in_place and isinstance(x, np.ndarray) and x.size == out.size: x[:] = out return x return out
[docs] def covariance_matvec( self, x, return_np: bool = True, in_place: bool = False, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Apply the covariance matrix :math:`\\mathbf{B} = \\mathbf{L}\\mathbf{L}^{\\top}` to *x*. Computed as two triangular matvecs: :math:`\\mathbf{B}\\,x = \\mathbf{L}(\\mathbf{L}^{\\top} x)`. :param x: Input vector (1-D array-like) of size ``size`` (or ``active_size`` when *ignore_design* is ``False``). :returns: 1-D numpy result array. """ tmp = self.covariance_sqrt_matvec( x, lower=False, ignore_design=ignore_design, return_np=True ) out = self.covariance_sqrt_matvec( tmp, lower=True, ignore_design=ignore_design, return_np=True ) if in_place and isinstance(x, np.ndarray) and x.size == out.size: x[:] = out return x return out
[docs] def covariance_diagonal( self, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Return the diagonal of the covariance matrix (the per-DOF variances). :param bool ignore_design: Use the full space (``True``) or the active subspace (``False``). :returns: 1-D numpy array of variances. """ size = self._SIZE if ignore_design else self.active_size diag = np.empty(size) e_vec = np.zeros(size) for i in range(size): e_vec[:] = 0.0 e_vec[i] = 1.0 diag[i] = self.covariance_matvec(e_vec, ignore_design=ignore_design)[i] return diag
[docs] def variance( self, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """Alias for :py:meth:`covariance_diagonal`.""" return self.covariance_diagonal(ignore_design=ignore_design)
[docs] def covariance_inv_matvec( self, x, in_place: bool = False, return_np: bool = True, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Apply the precision matrix (inverse covariance) to *x* via GMRES. :param x: Input vector (1-D array-like). :param bool in_place: Overwrite *x* if ``True``. :param bool return_np: Always ``True``; kept for API compatibility. :param bool ignore_design: Use the full space (``True``) or the active subspace (``False``). :returns: 1-D numpy result array. :raises ValueError: If GMRES does not converge. """ x_np = np.asarray(x, dtype=float).flatten() if x_np.size == 0: return x_np.copy() matvec = lambda v: self.covariance_matvec( v, in_place=False, return_np=True, ignore_design=ignore_design ) size = self._SIZE if ignore_design else self.active_size A = splinalg.LinearOperator((size, size), matvec=matvec) _out, info = splinalg.gmres(A=A, b=x_np) if info != 0: raise ValueError( "GMRES failed to solve the covariance-inverse linear system " f"(info={info})." ) if in_place and isinstance(x, np.ndarray): x[:] = _out return x return _out
[docs] def covariance_sqrt_inv_matvec( self, x, lower: bool = True, return_np: bool = True, in_place: bool = False, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Apply the inverse of the covariance square root to *x* via GMRES. :param x: Input vector (1-D array-like). :param bool lower: Use :math:`\\mathbf{L}` (``True``) or :math:`\\mathbf{L}^{\\top}` (``False``) as the "sqrt". :param bool return_np: Always ``True``; kept for API compatibility. :param bool in_place: Overwrite *x* if ``True``. :param bool ignore_design: Use the full space (``True``) or the active subspace (``False``). :returns: 1-D numpy result array. :raises ValueError: If GMRES does not converge. """ x_np = np.asarray(x, dtype=float).flatten() if x_np.size == 0: return x_np.copy() matvec = lambda v: self.covariance_sqrt_matvec( v, lower=lower, in_place=False, return_np=True, ignore_design=ignore_design ) size = self._SIZE if ignore_design else self.active_size A = splinalg.LinearOperator((size, size), matvec=matvec) _out, info = splinalg.gmres(A=A, b=x_np) if info != 0: raise ValueError( "GMRES failed to solve the covariance-sqrt-inverse linear system " f"(info={info})." ) if in_place and isinstance(x, np.ndarray): x[:] = _out return x return _out
[docs] def covariance_matrix( self, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Construct the full covariance matrix as a dense numpy array. :param bool ignore_design: Use the full space (``True``) or the active subspace (``False``). :returns: 2-D numpy array of shape *(n, n)*. """ size = self._SIZE if ignore_design else self.active_size cov = np.empty((size, size)) e_vec = np.zeros(size) for j in range(size): e_vec[:] = 0.0 e_vec[j] = 1.0 cov[:, j] = self.covariance_matvec( e_vec, ignore_design=ignore_design, return_np=True ) return cov
[docs] def precision_matrix( self, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, ) -> np.ndarray: """ Construct the full precision matrix (inverse covariance) as a dense numpy array. :param bool ignore_design: Use the full space (``True``) or the active subspace (``False``). :returns: 2-D numpy array of shape *(n, n)*. """ size = self._SIZE if ignore_design else self.active_size prec = np.empty((size, size)) e_vec = np.zeros(size) for j in range(size): e_vec[:] = 0.0 e_vec[j] = 1.0 prec[:, j] = self.covariance_inv_matvec( e_vec, ignore_design=ignore_design, return_np=True ) return prec
# ── Trace / log-det ─────────────────────────────────────────────────────
[docs] def covariance_trace( self, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, method: str = "exact", sample_size: int = _RANDOMIZATION_SAMPLE_SIZE, optimize_sample_size: bool = False, min_sample_size: int = 10, max_sample_size: int = 100, distribution: str = "Rademacher", accuracy: float = 1e-2, ) -> float: """ Evaluate or approximate the trace of the covariance matrix. :param bool ignore_design: Use the full space (``True``) or active subspace (``False``). :param str method: ``"exact"`` (sum of diagonal) or ``"randomized"`` (Hutchinson estimator). :returns: Trace value. :raises ValueError: If *method* is unrecognised. """ if re.match(r"\Aexact\Z", method, re.IGNORECASE): return float(np.sum(self.covariance_diagonal(ignore_design=ignore_design))) if re.match(r"\Arandom(ized)?\Z", method, re.IGNORECASE): size = self._SIZE if ignore_design else self.active_size matvec = lambda v: self.covariance_matvec( v, ignore_design=ignore_design, return_np=True, in_place=False ) out = utility.trace_estimator( matvec_fun=matvec, vector_size=size, sample_size=sample_size, optimize_sample_size=optimize_sample_size, min_sample_size=min_sample_size, max_sample_size=max_sample_size, distribution=distribution, accuracy=accuracy, ) if not out[0]: print( "Warning: the randomized trace estimator did not converge; " "the result may be inaccurate." ) return float(out[1]) raise ValueError(f"Unsupported trace method '{method}'.")
[docs] def covariance_logdet( self, ignore_design: bool = not pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE, method: str = "exact", sample_size: int = _RANDOMIZATION_SAMPLE_SIZE, optimize_sample_size: bool = False, min_sample_size: int = 10, max_sample_size: int = 100, distribution: str = "Rademacher", accuracy: float = 1e-2, ) -> float: """ Evaluate or approximate the log-determinant of the covariance matrix. :param bool ignore_design: Use the full space (``True``) or active subspace (``False``). :param str method: ``"exact"`` or ``"randomized"``. :returns: Log-determinant value. :raises ValueError: If *method* is unrecognised or the covariance is singular. """ if re.match(r"\Aexact\Z", method, re.IGNORECASE): if ignore_design: # log det(L L^T) = 2 * sum(log(diag(L))) ldiag = np.diagonal(self._STDEV).copy() return float(2.0 * np.sum(np.log(ldiag))) else: # Project covariance onto the active subspace, re-factorise cov_proj = self.covariance_matrix(ignore_design=False) s, logdet = np.linalg.slogdet(cov_proj) if s <= 0: raise ValueError( "The projected covariance matrix is singular or negative definite." ) return float(logdet) if re.match(r"\Arandom(ized)?\Z", method, re.IGNORECASE): size = self._SIZE if ignore_design else self.active_size matvec = lambda v: self.covariance_matvec( v, ignore_design=ignore_design, return_np=True, in_place=False ) return float(utility.logdet_estimator( matvec, vector_size=size, sample_size=sample_size, optimize_sample_size=optimize_sample_size, min_sample_size=min_sample_size, max_sample_size=max_sample_size, distribution=distribution, accuracy=accuracy, )) raise ValueError(f"Unsupported log-det method '{method}'.")
[docs] def kl_divergence(self, other): """KL divergence between this Gaussian and *other*. .. note:: Not yet implemented for :class:`DolfinGaussianErrorModel`. """ raise NotImplementedError( "kl_divergence is not yet implemented for DolfinGaussianErrorModel." )
# ── Properties ────────────────────────────────────────────────────────── @property def size(self) -> int: """Dimension of the underlying probability distribution.""" return self._SIZE @property def design(self) -> np.ndarray: """Get a copy of the current design vector.""" return self.configurations.design.copy() @design.setter def design(self, design) -> None: """Update the design vector.""" self.update_design(design) @property def active(self) -> np.ndarray: """ Boolean array of length *n* flagging active sensors. This is the same as ``design.astype(bool)``. """ return np.asarray(self.design, dtype=bool).copy() @property def active_indexes(self) -> np.ndarray: """ Indices of the active (non-zero design) sensors. :returns: 1-D integer numpy array. """ return np.where(self.design)[0] @property def active_size(self) -> int: """Number of active sensors (dimension of the projected subspace).""" return int(self.active_indexes.size) @property def mean(self) -> np.ndarray: """ Get the distribution mean. Returns the full mean when :attr:`~pyoed.SETTINGS.PROJECT_ONTO_ACTIVE_DESIGN_SPACE` is ``False``, or the active-subspace mean otherwise. """ if self.project_onto_active_design_space: return self._MEAN[self.active_indexes].copy() return self._MEAN.copy() @mean.setter def mean(self, new_mean) -> None: """Update the distribution mean (always in the full space).""" self.update_mean(new_mean)