# Copyright © 2023, UChicago Argonne, LLC
# All Rights Reserved
"""
This module provides access to further Miscellaneous operations those didn't quite fit in the other utility modules. Functions here can be moved to new utility modules if they grow in their own category. Proper announcements will be made in this case.
"""
import os
import numbers
import copy
import numpy as np
import re
from warnings import warn
import tempfile
import errno
import shutil
__all__ = [
"isnumber",
"isstring",
"isiterable",
"aggregate",
"aggregate_configurations",
"print_configs",
"path_is_accessible",
"get_list_of_files",
"get_list_of_subdirectories",
"try_file_name",
"validate_Cartesian_grid",
"gridpoint_to_index",
]
# Types/Values:
# -------------
[docs]
def isnumber(x, real_only=False):
"""
Check if a given variable x is a number, defined as:
- integer
- float
- complex number
- boolean
:param x: variable to be checked
:param real_only: if True, limit to the set of integers and floating point numbers.
:rtype: bool
"""
if isinstance(x, numbers.Number):
if real_only and not isinstance(x, (int, float)):
flag = False
else:
flag = True
else:
flag = False
return flag
[docs]
def isstring(s):
"""
Check if s is a string-like variable, i.e. a string or a bytes object or
encodable with ascii.
:param s: variable to be checked
:rtype: bool
"""
stype = type(s)
if isinstance(s, (str, bytes)):
flag = True
else:
try:
s.encode("ascii")
flag = True
except:
flag = False
return flag
[docs]
def isiterable(a):
"""
Check if a is an iterable object.
:param a: variable to be checked
:rtype: bool
"""
check = False
try:
a.__iter__
check = True
except AttributeError:
check = False
return check
# Configurations/Dictionaries:
# ----------------------------
[docs]
def aggregate(
configs,
def_configs,
in_place=False,
deep_copy=False,
keep_None=True,
):
"""
Add default configurations to the passed `configs` dictionary, i.e.,
blindly (and recursively) combine the two dictionaries.
This is a one-way merge from `def_configs` to `configs` only.
:param dict configs: a dictionary containing configurations to update
:param dict def_configs:
:param bool in_place: if `True` overwrite `configs` (in place)
otherwise return a **COPY** of `configs` with keys/values aggregate with those in `def_configs`.
:param bool deep_copy: if True deep copy of entries `def_configs` are merged with entries of `configs`,
otherwise only shallow copies are taken. Deep copy is more relevant to compound objects.
:param bool keep_None: if `True` any keyed value in `configs` that is set to `None` will be kept,
otherwise it will be overridden by the value associated with the corresponding key in `def_configs`
:raises:
- ValueError if both `configs` and `def_configs` are None
- TypeError if `configs` or `def_configs` are neither None or drerived
from Python dict
:returns: an updated version of `configs` with keys/values from both `configs` and `def_configs`.
"""
# Assertions and Type checks
if configs is None and def_configs is None:
raise ValueError("both inputs are None")
if def_configs is None:
def_configs = dict()
elif isinstance(def_configs, dict):
pass
else:
print(
"def_configs must be either None or be derived from Python's dict. \
Received '{0}' instance".format(
type(def_configs)
)
)
raise TypeError
if configs is None:
configs = dict()
elif isinstance(configs, dict):
pass
else:
print(
"configs must be either None or be derived from Python's dict. \
Received '{:}' instance".format(
type(configs)
)
)
raise TypeError
# Recursively aggregate configurations
out_configs = configs if in_place else configs.copy()
for key in def_configs:
if key not in out_configs:
val = (
copy.deepcopy(def_configs[key])
if deep_copy
else copy.copy(def_configs[key])
)
out_configs.update({key: val})
elif out_configs[key] is None and not keep_None:
val = (
copy.deepcopy(def_configs[key])
if deep_copy
else copy.copy(def_configs[key])
)
out_configs.update({key: val})
elif isinstance(out_configs[key], dict) and isinstance(def_configs[key], dict):
# recursively aggregate the dictionary-valued keys
aggregate(
out_configs[key], def_configs[key], in_place=True, deep_copy=deep_copy
)
return out_configs
# Add an alias
aggregate_configurations = aggregate
[docs]
def print_configs(configs, header="", prefix=""):
"""
Print (to screen) elements of a configurations dictionary `configs`
:param dict configs: configurations dictionary
:header str header: printed before all configurations
:prefix str prefix: string added before all printed lines. Useful for nested
configs
:raises: AssertionError if parameters are not exactly the correct type, i.e.
cast your header and prefix to strings beforehand.
"""
# Assertion and validation
assert isinstance(configs, dict), "configs must be a dictionary"
assert isinstance(header, str), "header must be a string"
assert isinstance(prefix, str), "prefix must be a string"
sepp = prefix + "*" * max(30, 80 - len(prefix))
print("\n{0}\n{2}Configurations of: '{1}' \n{0}".format(sepp, header, prefix))
for i, key in enumerate(configs):
val = configs[key]
if isinstance(val, dict):
print_configs(val, header=key, prefix=prefix + " ")
else:
print(" {0}+ {1}: {2}".format(prefix, key, val))
print(sepp)
# Files, Directories & Configs:
# -----------------------------
[docs]
def path_is_accessible(path):
"""
Test if the passed path (to a directory) is accessible;
that is the user can save/write files under that folder/directory.
This tests whether the path exists or is creatble.
:param path: path to folder/directory
:returns: `True` if the path is writable, otherwise `False`
"""
path_created = False
if not os.path.isdir(path):
try:
os.makedirs(path)
path_created = True
except Exception as err:
if path_created:
raise OSError(
"Tried creating a temporary folder `{path}`\n"
"Unexpected {err=} or {type(err)=}"
)
return False
# Folder exists or has been created
# Try writing into the folder
try:
testfile = tempfile.TemporaryFile(dir = path)
testfile.close()
except OSError as e:
if e.errno == errno.EACCES: # 13
# Cleanup
if path_created: shutil.rmtree(path)
return False
e.filename = path
raise
# Cleanup & Return
if path_created: shutil.rmtree(path)
return True
[docs]
def get_list_of_files(
root_dir,
recursive=False,
return_abs=True,
ignore_special_files=True,
ignore_special_dirs=True,
extension=None,
):
"""
Retrieve a list of files in the passed root_dir, (and optionally in all sub-directories).
:param str root_dir: directory to start constructing sub-directories of.
:param bool recursive: if True, the passed files are found in subdirectories
as well. Default is False.
:param bool return_abs: if True, returned paths are absolute, otherwise
relative (to root_dir). Default is True.
:param bool ignore_special_files: if `True` this function ignores special
files (starting with . or __ ). Default is True.
:param bool ignore_special_dirs: if `True` this function ignores any files
under special directories (starting with . or __ ). Default is True.
:param str extension: if not None, only files with the given extension are returned.
:returns:
a list containing absolute (or relative) under the given root_dir.
:raises:
IOError: If the passed path/directory does not exist
"""
if not os.path.isdir(root_dir):
raise IOError(" ['%s'] is not a valid directory!" % root_dir)
else:
_passed_abs_path = os.path.abspath(root_dir)
_passed_rel_path = os.path.relpath(root_dir)
if extension is None:
_ext = extension
else:
_ext = extension.lower().lstrip("* .").rstrip(" ")
#
files_list = []
if recursive:
dirs_list = get_list_of_subdirectories(
root_dir=_passed_abs_path,
return_abs=False,
ignore_special=ignore_special_dirs,
)
else:
dirs_list = [_passed_rel_path]
for dir_name in dirs_list:
loc_files = os.listdir(dir_name)
#
for fle in loc_files:
file_rel_path = os.path.relpath(
os.path.join(dir_name, fle)
) # relative path of file
if os.path.isfile(file_rel_path): # To ignore directories...
file_is_special = fle.startswith(".") or fle.startswith("__")
if ignore_special_files and file_is_special:
continue
# Add this file to the aggregated list:
if _ext is not None:
head, tail = os.path.splitext(file_rel_path)
if not (re.match(r"\A(.)*%s\Z" % _ext, tail, re.IGNORECASE)):
continue
if return_abs:
files_list.append(os.path.abspath(file_rel_path))
else:
files_list.append(file_rel_path)
else: # not a file; pass
pass # or continue
return files_list
[docs]
def get_list_of_subdirectories(
root_dir, ignore_root=False, return_abs=False, ignore_special=True, recursive=True
):
"""
Retrieve a list of sub-directories .
:param str root_dir: directory to start constructing sub-directories of.
:param bool ignore_root: if True, the passed root_dir is ignored in the
returned list. Default is False.
:param bool return_ab: if True, returned paths are absolute, otherwise
relative (to root_dir). Default is False.
:param bool ignore_special: if True, this function ignores special files
(starting with . or __ ). Default is True.
:param bool recursive: if True, search subdirectories recursively. Default is True.
:returns:
A list containing subdirectories under the given root_dir; the subdires
are absolute paths or relative paths based the passed root. If
`root_dir` has no subdirectories, the list is empty.
:raises:
IOError: If the passed path/directory does not exist
"""
#
if not os.path.isdir(root_dir):
raise IOError(" ['%s'] is not a valid directory!" % root_dir)
subdirs_list = []
_passed_path = os.path.abspath(root_dir)
if os.path.isabs(root_dir):
pass # OK, it's an absolute path, good to go
else:
# the passed path is relative; convert back, and guarantee it's of the right format for later comparison
_passed_path = os.path.relpath(_passed_path)
if recursive:
for root, _, _ in os.walk(_passed_path):
# '/.' insures that the iterator ignores any subdirectory of special directory such as '.git' subdirs.
# '__' insures that the iterator ignores any cashed subdirectory.
if ignore_special and ("/." in root or "__" in root):
continue
# in case this is not the initial run. We don't want to add duplicates to the system paths' list.
if ignore_root and _passed_path == root:
pass
else:
if return_abs:
subdirs_list.append(os.path.abspath(root))
else:
subdirs_list.append(os.path.relpath(root))
else:
_sub = next(os.walk(_passed_path))[1]
if not ignore_root:
if return_abs:
subdirs_list.append(os.path.abspath(_passed_path))
else:
subdirs_list.append(os.path.relpath(_passed_path))
for d in _sub:
if return_abs:
subdirs_list.append(os.path.join(os.path.abspath(_passed_path), d))
else:
subdirs_list.append(os.path.join(os.path.relpath(_passed_path), d))
#
return subdirs_list
[docs]
def try_file_name(directory, file_prefix, extension):
"""
In `directory`, search for smallest `i` such that
f"{file_prefix}_{i}.{extension}" does not exist and return said file name.
:param str directory: directory to search in.
:param str file_prefix: file name prefix.
:param str extension: file name extension.
:rtype: str
"""
#
if not os.path.isdir(directory):
raise IOError(" ['%s'] is not a valid directory!" % directory)
if not directory.endswith("/"):
directory += "/"
assert isinstance(file_prefix, str), "file_prefix must be a string"
assert isinstance(extension, str), "extension must be a string"
assert (
len(extension.strip(". ")) > 0
), "extension must be a valid extension not empty"
file_name = file_prefix + "." + extension.strip(". ")
if not os.path.isfile(os.path.join(directory, file_name)):
pass
else:
#
success = False
counter = 0
while not success:
file_name = file_prefix + "_" + str(counter) + "." + extension.strip(". ")
if not (os.path.isfile(directory + file_name)):
success = True
break
else:
pass
counter += 1
return file_name
# Utility functions usefult to Models, Observations, etc.
# -------------------------------------------------------
[docs]
def validate_Cartesian_grid(grid, points_as_rows=True, points_ndim_test=True):
"""
Given a 1D/2D Numpy array characterizing a (Cartesian representation of a ) model grid, validate the shape,
and create a two-dimensional numpy array with each row/column (based on `points_as_rows`)
representing one coordinate point.
Additionally, The repeatition of coordinates means multiple prognostic variables are considered
by the model; This function finds indexes corresponding to each of the prognostic variables
and make sure they are layed out in a reasonable fashion:
- all prognostic variables are consecutive to each other, or
- the whole grid is layed out for each prognostic variable.
:param grid: The model grid; this is an iterable (list/array/etc.) to be casted as a numpy array.
:param bool points_as_rows: used only for grids in 2D or more.
If `True`, each row (after casting in an np array) is regarded as a single grid point.
Thus, the number of columns is taken as the space dimension. Note that the default behaviour
in all implemented simulation model is to provide points as rows, though a flag needs to go there to control it.
:param bool points_ndim_test: if `True` assert that the number of grid points exceed the number
of dimensions; this requires the number of grid points in 1D system to be at least 2, etc.
:returns:
- `grid`: validated model grid (either 1D or 2D Numpy array based on the input (passed) grid)
- `prog_vars_indexes`: a list containing indexes of the prognostic variables;
each entry is a numpy array holding indexes of one prognostic variable.
:raises:
- `TypeError` is raised if grid cannot be casted into a numpy array or if the array doesn't hold
coordinates of one/two dimensional cartesian system
- `AssertionError` is raised if `points_ndim_test` is `True` and number of points does not
satisfy the test indicated above.
:remarks:
- Number of gridpoints must be more than the dimensionality of the coordinate system.
- The coordinate system dimension is assumed to be the minimum of rows/columns of the passed grid;
Transposition is carried out if needed
"""
try:
grid = np.squeeze(np.asarray(grid))
except:
print("Failed to cast the passed grid into a numpy array")
raise TypeError
# Put the grid in the right shape, and figure out dimensionality
if grid.ndim == 1:
# One-dimensional cartesian coordinates
grid = (
grid.reshape((grid.size, 1))
if points_as_rows
else grid.reshape((1, grid.size))
)
space_dim = 1
elif grid.ndim == 2:
# Two or Three dimensional cartesian coordinates
space_dim = np.size(grid, 1) if points_as_rows else np.size(grid, 0)
if not (2 <= space_dim <= 3):
print("Only one/two/three cartesian coordinates are supported")
print("Received grid with coordinates in {0}D".format(space_dim))
raise TypeError
else:
print(
"grid must be 1 or 2 dimensional numpy array; received array of shape {0}".format(
grid.shape
)
)
raise TypeError
# Transform the view into (points_as_rows anyways to simplify the code, then transpose if needed)
if not points_as_rows:
grid = grid.T
# Extract indexes of prognostic variables
_, unique_indexes = np.unique(grid, axis=0, return_index=True)
unique_indexes.sort()
incr = unique_indexes[1:] - unique_indexes[0:-1]
if not np.all(incr[0] == incr):
print("The grid (corresponding to state entries) has unsupported structure")
print(
"Either prognostic variables must be consecutive in the state vector, or number-of-gridpoints apart!"
)
raise TypeError
if incr[0] == 1:
# Prognostic variable at all gridpoints is stored in adjacent memory locations
num_gridpoints = unique_indexes.size
else:
# All Prognostic variables at one gridpoint are adjacent in memory
num_gridpoints = incr[0]
if unique_indexes.size != num_gridpoints:
print(
"DEBUG: This shouldn't happen; state indexes increment doesn't match unique gridpoints!"
)
raise AssertionError
num_prog_vars = np.size(grid, 0) // num_gridpoints
if num_gridpoints * num_prog_vars != np.size(grid, 0):
print("This shouldn't happen!")
print("The grid (corresponding to state entries) has unsupported structure")
print(
"Model/Observation size must be equal to number of grid points x number of prognostic variables!"
)
raise TypeError
if points_ndim_test and num_gridpoints <= space_dim:
print("The number of grid points is not more than the space dimension.")
print(f"Number of grid points: {num_gridpoints}; Space dimension: {space_dim}")
raise AssertionError
# List of prognostic variables to use for mapping
if incr[0] == 1:
prog_vars_indexes = [
np.arange(i * num_gridpoints, (i + 1) * num_gridpoints)
for i in range(num_prog_vars)
]
else:
prog_vars_indexes = [
np.arange(i, num_gridpoints * num_prog_vars, num_prog_vars)
for i in range(num_prog_vars)
]
# Transform back (if transposed earlier)
if not points_as_rows:
grid = grid.T
return grid, prog_vars_indexes
[docs]
def gridpoint_to_index(coord, grid, return_all=False):
"""
Lookup coordinates `coord` in the rows of grid and return the first index
(or all) found in the rows of `grid`.
:param coord: a scalar (for 1d grids) or a tuple/list of length equal
to the number of columns in `grid`
:param grid: a 2D numpy array representing the cartesian grid.
Each row is a set of coordinates (x, y, ...)
:param bool return_all: if `True` all matching indices are returned,
otherwise only the first found index is returned.
:returns: integer index of the first (if `return_all` is `True`) or
a numpy array of all indices (if `return_all` is `False`) of matching coordinates
in `grid` to the passed `ccord`.
`None` is returned if no matching coordinates are found.
"""
assert isinstance(grid, np.ndarray), "grid must be a numpy array!"
if grid.ndim <= 1:
grid = grid.reshape((grid.size, 1))
assert grid.ndim == 2, "grid must be 2D Numpy array"
ndim = np.size(grid, 1)
coord = np.asarray(coord).flatten()
assert coord.size == ndim, "coord doesn't match the dimension of the passed grid"
flags = grid[:, 0] == coord[0]
for i in range(1, ndim):
flags = np.logical_and(flags, grid[:, i] == coord[i])
index = np.nonzero(flags)[0]
if index.size == 0:
index = None
elif not return_all:
index = index[0]
return index