Source code for pyoed.utility.misc

# Copyright © 2023, UChicago Argonne, LLC
# All Rights Reserved

"""
This module provides access to further Miscellaneous operations those didn't quite fit in the other utility modules. Functions here can be moved to new utility modules if they grow in their own category. Proper announcements will be made in this case.
"""

import os
import numbers
import copy
import numpy as np
import re
from warnings import warn
import tempfile
import errno
import shutil


__all__ = [
    "isnumber",
    "isstring",
    "isiterable",
    "aggregate",
    "aggregate_configurations",
    "print_configs",
    "path_is_accessible",
    "get_list_of_files",
    "get_list_of_subdirectories",
    "try_file_name",
    "validate_Cartesian_grid",
    "gridpoint_to_index",
]


# Types/Values:
# -------------
[docs] def isnumber(x, real_only=False): """ Check if a given variable x is a number, defined as: - integer - float - complex number - boolean :param x: variable to be checked :param real_only: if True, limit to the set of integers and floating point numbers. :rtype: bool """ if isinstance(x, numbers.Number): if real_only and not isinstance(x, (int, float)): flag = False else: flag = True else: flag = False return flag
[docs] def isstring(s): """ Check if s is a string-like variable, i.e. a string or a bytes object or encodable with ascii. :param s: variable to be checked :rtype: bool """ stype = type(s) if isinstance(s, (str, bytes)): flag = True else: try: s.encode("ascii") flag = True except: flag = False return flag
[docs] def isiterable(a): """ Check if a is an iterable object. :param a: variable to be checked :rtype: bool """ check = False try: a.__iter__ check = True except AttributeError: check = False return check
# Configurations/Dictionaries: # ----------------------------
[docs] def aggregate( configs, def_configs, in_place=False, deep_copy=False, keep_None=True, ): """ Add default configurations to the passed `configs` dictionary, i.e., blindly (and recursively) combine the two dictionaries. This is a one-way merge from `def_configs` to `configs` only. :param dict configs: a dictionary containing configurations to update :param dict def_configs: :param bool in_place: if `True` overwrite `configs` (in place) otherwise return a **COPY** of `configs` with keys/values aggregate with those in `def_configs`. :param bool deep_copy: if True deep copy of entries `def_configs` are merged with entries of `configs`, otherwise only shallow copies are taken. Deep copy is more relevant to compound objects. :param bool keep_None: if `True` any keyed value in `configs` that is set to `None` will be kept, otherwise it will be overridden by the value associated with the corresponding key in `def_configs` :raises: - ValueError if both `configs` and `def_configs` are None - TypeError if `configs` or `def_configs` are neither None or drerived from Python dict :returns: an updated version of `configs` with keys/values from both `configs` and `def_configs`. """ # Assertions and Type checks if configs is None and def_configs is None: raise ValueError("both inputs are None") if def_configs is None: def_configs = dict() elif isinstance(def_configs, dict): pass else: print( "def_configs must be either None or be derived from Python's dict. \ Received '{0}' instance".format( type(def_configs) ) ) raise TypeError if configs is None: configs = dict() elif isinstance(configs, dict): pass else: print( "configs must be either None or be derived from Python's dict. \ Received '{:}' instance".format( type(configs) ) ) raise TypeError # Recursively aggregate configurations out_configs = configs if in_place else configs.copy() for key in def_configs: if key not in out_configs: val = ( copy.deepcopy(def_configs[key]) if deep_copy else copy.copy(def_configs[key]) ) out_configs.update({key: val}) elif out_configs[key] is None and not keep_None: val = ( copy.deepcopy(def_configs[key]) if deep_copy else copy.copy(def_configs[key]) ) out_configs.update({key: val}) elif isinstance(out_configs[key], dict) and isinstance(def_configs[key], dict): # recursively aggregate the dictionary-valued keys aggregate( out_configs[key], def_configs[key], in_place=True, deep_copy=deep_copy ) return out_configs
# Add an alias aggregate_configurations = aggregate # Files, Directories & Configs: # -----------------------------
[docs] def path_is_accessible(path): """ Test if the passed path (to a directory) is accessible; that is the user can save/write files under that folder/directory. This tests whether the path exists or is creatble. :param path: path to folder/directory :returns: `True` if the path is writable, otherwise `False` """ path_created = False if not os.path.isdir(path): try: os.makedirs(path) path_created = True except Exception as err: if path_created: raise OSError( "Tried creating a temporary folder `{path}`\n" "Unexpected {err=} or {type(err)=}" ) return False # Folder exists or has been created # Try writing into the folder try: testfile = tempfile.TemporaryFile(dir = path) testfile.close() except OSError as e: if e.errno == errno.EACCES: # 13 # Cleanup if path_created: shutil.rmtree(path) return False e.filename = path raise # Cleanup & Return if path_created: shutil.rmtree(path) return True
[docs] def get_list_of_files( root_dir, recursive=False, return_abs=True, ignore_special_files=True, ignore_special_dirs=True, extension=None, ): """ Retrieve a list of files in the passed root_dir, (and optionally in all sub-directories). :param str root_dir: directory to start constructing sub-directories of. :param bool recursive: if True, the passed files are found in subdirectories as well. Default is False. :param bool return_abs: if True, returned paths are absolute, otherwise relative (to root_dir). Default is True. :param bool ignore_special_files: if `True` this function ignores special files (starting with . or __ ). Default is True. :param bool ignore_special_dirs: if `True` this function ignores any files under special directories (starting with . or __ ). Default is True. :param str extension: if not None, only files with the given extension are returned. :returns: a list containing absolute (or relative) under the given root_dir. :raises: IOError: If the passed path/directory does not exist """ if not os.path.isdir(root_dir): raise IOError(" ['%s'] is not a valid directory!" % root_dir) else: _passed_abs_path = os.path.abspath(root_dir) _passed_rel_path = os.path.relpath(root_dir) if extension is None: _ext = extension else: _ext = extension.lower().lstrip("* .").rstrip(" ") # files_list = [] if recursive: dirs_list = get_list_of_subdirectories( root_dir=_passed_abs_path, return_abs=False, ignore_special=ignore_special_dirs, ) else: dirs_list = [_passed_rel_path] for dir_name in dirs_list: loc_files = os.listdir(dir_name) # for fle in loc_files: file_rel_path = os.path.relpath( os.path.join(dir_name, fle) ) # relative path of file if os.path.isfile(file_rel_path): # To ignore directories... file_is_special = fle.startswith(".") or fle.startswith("__") if ignore_special_files and file_is_special: continue # Add this file to the aggregated list: if _ext is not None: head, tail = os.path.splitext(file_rel_path) if not (re.match(r"\A(.)*%s\Z" % _ext, tail, re.IGNORECASE)): continue if return_abs: files_list.append(os.path.abspath(file_rel_path)) else: files_list.append(file_rel_path) else: # not a file; pass pass # or continue return files_list
[docs] def get_list_of_subdirectories( root_dir, ignore_root=False, return_abs=False, ignore_special=True, recursive=True ): """ Retrieve a list of sub-directories . :param str root_dir: directory to start constructing sub-directories of. :param bool ignore_root: if True, the passed root_dir is ignored in the returned list. Default is False. :param bool return_ab: if True, returned paths are absolute, otherwise relative (to root_dir). Default is False. :param bool ignore_special: if True, this function ignores special files (starting with . or __ ). Default is True. :param bool recursive: if True, search subdirectories recursively. Default is True. :returns: A list containing subdirectories under the given root_dir; the subdires are absolute paths or relative paths based the passed root. If `root_dir` has no subdirectories, the list is empty. :raises: IOError: If the passed path/directory does not exist """ # if not os.path.isdir(root_dir): raise IOError(" ['%s'] is not a valid directory!" % root_dir) subdirs_list = [] _passed_path = os.path.abspath(root_dir) if os.path.isabs(root_dir): pass # OK, it's an absolute path, good to go else: # the passed path is relative; convert back, and guarantee it's of the right format for later comparison _passed_path = os.path.relpath(_passed_path) if recursive: for root, _, _ in os.walk(_passed_path): # '/.' insures that the iterator ignores any subdirectory of special directory such as '.git' subdirs. # '__' insures that the iterator ignores any cashed subdirectory. if ignore_special and ("/." in root or "__" in root): continue # in case this is not the initial run. We don't want to add duplicates to the system paths' list. if ignore_root and _passed_path == root: pass else: if return_abs: subdirs_list.append(os.path.abspath(root)) else: subdirs_list.append(os.path.relpath(root)) else: _sub = next(os.walk(_passed_path))[1] if not ignore_root: if return_abs: subdirs_list.append(os.path.abspath(_passed_path)) else: subdirs_list.append(os.path.relpath(_passed_path)) for d in _sub: if return_abs: subdirs_list.append(os.path.join(os.path.abspath(_passed_path), d)) else: subdirs_list.append(os.path.join(os.path.relpath(_passed_path), d)) # return subdirs_list
[docs] def try_file_name(directory, file_prefix, extension): """ In `directory`, search for smallest `i` such that f"{file_prefix}_{i}.{extension}" does not exist and return said file name. :param str directory: directory to search in. :param str file_prefix: file name prefix. :param str extension: file name extension. :rtype: str """ # if not os.path.isdir(directory): raise IOError(" ['%s'] is not a valid directory!" % directory) if not directory.endswith("/"): directory += "/" assert isinstance(file_prefix, str), "file_prefix must be a string" assert isinstance(extension, str), "extension must be a string" assert ( len(extension.strip(". ")) > 0 ), "extension must be a valid extension not empty" file_name = file_prefix + "." + extension.strip(". ") if not os.path.isfile(os.path.join(directory, file_name)): pass else: # success = False counter = 0 while not success: file_name = file_prefix + "_" + str(counter) + "." + extension.strip(". ") if not (os.path.isfile(directory + file_name)): success = True break else: pass counter += 1 return file_name
# Utility functions usefult to Models, Observations, etc. # -------------------------------------------------------
[docs] def validate_Cartesian_grid(grid, points_as_rows=True, points_ndim_test=True): """ Given a 1D/2D Numpy array characterizing a (Cartesian representation of a ) model grid, validate the shape, and create a two-dimensional numpy array with each row/column (based on `points_as_rows`) representing one coordinate point. Additionally, The repeatition of coordinates means multiple prognostic variables are considered by the model; This function finds indexes corresponding to each of the prognostic variables and make sure they are layed out in a reasonable fashion: - all prognostic variables are consecutive to each other, or - the whole grid is layed out for each prognostic variable. :param grid: The model grid; this is an iterable (list/array/etc.) to be casted as a numpy array. :param bool points_as_rows: used only for grids in 2D or more. If `True`, each row (after casting in an np array) is regarded as a single grid point. Thus, the number of columns is taken as the space dimension. Note that the default behaviour in all implemented simulation model is to provide points as rows, though a flag needs to go there to control it. :param bool points_ndim_test: if `True` assert that the number of grid points exceed the number of dimensions; this requires the number of grid points in 1D system to be at least 2, etc. :returns: - `grid`: validated model grid (either 1D or 2D Numpy array based on the input (passed) grid) - `prog_vars_indexes`: a list containing indexes of the prognostic variables; each entry is a numpy array holding indexes of one prognostic variable. :raises: - `TypeError` is raised if grid cannot be casted into a numpy array or if the array doesn't hold coordinates of one/two dimensional cartesian system - `AssertionError` is raised if `points_ndim_test` is `True` and number of points does not satisfy the test indicated above. :remarks: - Number of gridpoints must be more than the dimensionality of the coordinate system. - The coordinate system dimension is assumed to be the minimum of rows/columns of the passed grid; Transposition is carried out if needed """ try: grid = np.squeeze(np.asarray(grid)) except: print("Failed to cast the passed grid into a numpy array") raise TypeError # Put the grid in the right shape, and figure out dimensionality if grid.ndim == 1: # One-dimensional cartesian coordinates grid = ( grid.reshape((grid.size, 1)) if points_as_rows else grid.reshape((1, grid.size)) ) space_dim = 1 elif grid.ndim == 2: # Two or Three dimensional cartesian coordinates space_dim = np.size(grid, 1) if points_as_rows else np.size(grid, 0) if not (2 <= space_dim <= 3): print("Only one/two/three cartesian coordinates are supported") print("Received grid with coordinates in {0}D".format(space_dim)) raise TypeError else: print( "grid must be 1 or 2 dimensional numpy array; received array of shape {0}".format( grid.shape ) ) raise TypeError # Transform the view into (points_as_rows anyways to simplify the code, then transpose if needed) if not points_as_rows: grid = grid.T # Extract indexes of prognostic variables _, unique_indexes = np.unique(grid, axis=0, return_index=True) unique_indexes.sort() incr = unique_indexes[1:] - unique_indexes[0:-1] if not np.all(incr[0] == incr): print("The grid (corresponding to state entries) has unsupported structure") print( "Either prognostic variables must be consecutive in the state vector, or number-of-gridpoints apart!" ) raise TypeError if incr[0] == 1: # Prognostic variable at all gridpoints is stored in adjacent memory locations num_gridpoints = unique_indexes.size else: # All Prognostic variables at one gridpoint are adjacent in memory num_gridpoints = incr[0] if unique_indexes.size != num_gridpoints: print( "DEBUG: This shouldn't happen; state indexes increment doesn't match unique gridpoints!" ) raise AssertionError num_prog_vars = np.size(grid, 0) // num_gridpoints if num_gridpoints * num_prog_vars != np.size(grid, 0): print("This shouldn't happen!") print("The grid (corresponding to state entries) has unsupported structure") print( "Model/Observation size must be equal to number of grid points x number of prognostic variables!" ) raise TypeError if points_ndim_test and num_gridpoints <= space_dim: print("The number of grid points is not more than the space dimension.") print(f"Number of grid points: {num_gridpoints}; Space dimension: {space_dim}") raise AssertionError # List of prognostic variables to use for mapping if incr[0] == 1: prog_vars_indexes = [ np.arange(i * num_gridpoints, (i + 1) * num_gridpoints) for i in range(num_prog_vars) ] else: prog_vars_indexes = [ np.arange(i, num_gridpoints * num_prog_vars, num_prog_vars) for i in range(num_prog_vars) ] # Transform back (if transposed earlier) if not points_as_rows: grid = grid.T return grid, prog_vars_indexes
[docs] def gridpoint_to_index(coord, grid, return_all=False): """ Lookup coordinates `coord` in the rows of grid and return the first index (or all) found in the rows of `grid`. :param coord: a scalar (for 1d grids) or a tuple/list of length equal to the number of columns in `grid` :param grid: a 2D numpy array representing the cartesian grid. Each row is a set of coordinates (x, y, ...) :param bool return_all: if `True` all matching indices are returned, otherwise only the first found index is returned. :returns: integer index of the first (if `return_all` is `True`) or a numpy array of all indices (if `return_all` is `False`) of matching coordinates in `grid` to the passed `ccord`. `None` is returned if no matching coordinates are found. """ assert isinstance(grid, np.ndarray), "grid must be a numpy array!" if grid.ndim <= 1: grid = grid.reshape((grid.size, 1)) assert grid.ndim == 2, "grid must be 2D Numpy array" ndim = np.size(grid, 1) coord = np.asarray(coord).flatten() assert coord.size == ndim, "coord doesn't match the dimension of the passed grid" flags = grid[:, 0] == coord[0] for i in range(1, ndim): flags = np.logical_and(flags, grid[:, i] == coord[i]) index = np.nonzero(flags)[0] if index.size == 0: index = None elif not return_all: index = index[0] return index