Source code for pydsstools.heclib.dss.HecDss

"""
Open class object for HEC-DSS file

This module provides the public API for interacting with HEC-DSS files.
"""

__all__ = ["Open"]

import logging
from copy import copy
from array import array
from datetime import datetime
import numpy as np
import numpy.ma as ma
import pandas as pd
from datetime import datetime
from os import PathLike
from pathlib import Path
import numpy.typing as npt  # npt.NDArray[np.float32], npt.Arraylike
from pydantic import validate_call
from typing import (
    Any,
    Optional,
    Union,
    Iterable,
    Iterator,
    Sequence,
    Mapping,
    MutableMapping,
    Callable,
    overload,
    TypedDict,
    Final,
    ClassVar,
    TypeVar,
    Generic,
    NoReturn,
)

try:
    # python 3.10+
    from typing import Annotated, TypeAlias, Literal
except ImportError:
    # python 3.9
    from typing_extensions import Annotated, TypeAlias, Literal

from ...core import Open as _Open
from ...core import TimeSeriesStruct, TimeSeriesContainer
from ...core import PairedDataStruct, PairedDataContainer
from ...core import SpatialGridStruct
from ...core.enums import GridType
from ...core.gridinfo import GridInfo
from ...core.gridinfo.v6 import gridinfo7_to_gridinfo6, GridInfo6 
#from ...core.gridv6_internals import gridinfo7_to_gridinfo6, GridInfo6
from ...core import (
    PairedDataContainer,
    HecTime,
    DssPathName,
    UNDEFINED,
)

DateLike = TypeVar("DateLike", str, datetime, HecTime)
DateWindow: TypeAlias = tuple[DateLike, DateLike]
PathType: TypeAlias = Union[str, Path, PathLike]



# ==================== Main Class ====================


[docs] class Open(_Open): """ Open a DSS file and create a dataset object that supports input/output operations. This class provides a high-level, user-friendly interface for working with HEC-DSS files. It supports reading and writing time series, paired data, and spatial grid data. Parameters ---------- dss_path : str or Path or PathLike Path to the DSS file. version : {6, 7} or None, optional DSS file version. If ``None``, detect automatically. If creating a new file, ``None`` creates a version 7 file. Default is None. mode : {"rw", "r"}, optional File open mode. ``"rw"`` allows read/write; ``"r"`` is read-only. Default is "rw". Attributes ---------- mode : str The file access mode. version : int The DSS file version (6 or 7). filename : str Path to the DSS file. Examples -------- Open a DSS file for reading and writing: >>> from pydsstools.heclib.dss.HecDss import Open >>> fid = Open("example.dss", mode="rw") Open a DSS file as read-only: >>> fid = Open("example.dss", mode="r") >>> fid.close() Use context manager for automatic cleanup: >>> with Open("example.dss") as fid: ... ts = fid.read_ts("/A/B/C/01JAN2020/1HOUR/F/") See Also -------- TimeSeriesContainer : Container for time series data PairedDataContainer : Container for paired data SpatialGridStruct : Structure for spatial grid data """
[docs] def __init__( self, dss_path: PathType, version: Optional[Literal[6, 7]] = None, mode: Literal["rw", "r"] = "rw", ) -> None: super().__init__(dss_path, version) self.mode = mode
[docs] def read_ts( self, pathname: Union[str, DssPathName], window: Optional[DateWindow] = None, trim_missing: bool = False, window_flag: Literal[0, 1, 2, 3] = 0, reg: Optional[bool] = False, ireg: Optional[bool] = False ) -> TimeSeriesStruct: """ Read time-series record from DSS file. Parameters ---------- pathname : str or DssPathName DSS record pathname. window : tuple of (start, end) or None, optional Time window to read. If ``None``, the date range encoded in the D-part of the ``pathname`` is used. Default is None. trim_missing : bool, optional If True, removes missing values at the beginning and end of the data set. Applies to regular time-series only. Default is False. window_flag : {0, 1, 2, 3}, optional Applies to irregular time series only. Controls how the time window is applied. Default is 0. Possible values: * 0 : Strictly adhere to the time window. * 1 : Also retrieve one value immediately before the start of the window. * 2 : Also retrieve one value immediately after the end of the window. * 3 : Retrieve one value immediately before the start and one immediately after the end of the window. reg : bool, optional If True, treat the data as a regular time series. Default is False. ireg : bool, optional If True, treat the data as an irregular time series. Default is False. If both ``reg`` and ``ireg`` are ``False`` or both are ``True``, the type of time series will be determined from the E-part of ``pathname``. Returns ------- TimeSeriesStruct Time series data structure containing the requested data. Raises ------ ValueError If pathname does not correspond to a valid time series record or if window_flag is invalid. Examples -------- Read time series with a specific time window: >>> ts = fid.read_ts(pathname, window=('10MAR2006 24:00:00', '09APR2006 24:00:00')) Read entire time series: >>> ts = fid.read_ts(pathname) Read regular time series with trimming: >>> ts = fid.read_ts(pathname, trim_missing=True, reg=True) """ pathname = DssPathName(pathname) infer_type = True if reg and ireg: logging.info("The timeseries to be read is specified as both regular and irregular type; type will be inferred from the pathname.") elif reg: infer_type = False interval = 1 elif ireg: infer_type = False interval = -1 if infer_type: # find whether the ts is regular, irregular or not ts logging.debug("Determining the type of timeseries record.") interval = self._ts_type_from_pathname(pathname.text()) if interval == 0: raise ValueError( f"The pathname '{pathname.text()}' does not correspond to a valid " f"regular or irregular time series record. Verify the E-part " f"'{pathname.epart}' has a standard interval specification." ) if interval == 1: logging.debug("Reading regular time series.") retrieve_flag = -1 if trim_missing else 0 else: logging.debug("Reading irregular time series.") if window_flag in [0, 1, 2, 3]: retrieve_flag = window_flag else: logging.error("Invalid window_flag for irregular dss record") return if window: start_date, end_date = window sdate = HecTime(start_date, midnight_as_2400=False) edate = HecTime(end_date, midnight_as_2400=True) sday = sdate.date() stime = sdate.time(2) eday = edate.date() etime = edate.time(2) return super()._read_ts_window(pathname.text(), sday, stime, eday, etime, retrieve_flag) else: retrieve_all = 0 if ( not pathname.dpart.strip() ): # if date part is empty, retrieve all data ignoring date retrieve_all = 1 return super()._read_ts_normal( pathname.text(), retrieve_flag, boolRetrieveAllTimes=retrieve_all )
[docs] def put_ts( self, data: Union[str, "DssPathName", "TimeSeriesContainer"], **kwargs: Any ) -> None: """ Write time-series data to DSS file. Parameters ---------- data : str or DssPathName or TimeSeriesContainer Either a pathname string or a TimeSeriesContainer object. **kwargs : Any Keyword arguments for TimeSeriesContainer when ``data`` is a pathname. Required kwargs when data is pathname: * values : list or array-like Time series values. * For regular time-series (interval > 0): * start_time : str Starting date/time. * For irregular time-series (interval < 0): * times : list of str List of date/time strings. * julian_base : str, optional Julian base date. Returns ------- None Raises ------ TypeError If data is not of expected type. ValueError If required parameters are missing or invalid. Examples -------- Write using TimeSeriesContainer: >>> from pydsstools.heclib.dss.HecDss import Open >>> from pydsstools.core import TimeSeriesContainer >>> fid = Open("dss_file.dss", mode="rw") >>> pathname = r"/A/B/C//1HOUR/F/" >>> values = [10, 20, 30, 40, 50] >>> interval = 1 >>> start_time = r"01JAN2025 1500" >>> data_units = "ft" >>> data_type = "inst" >>> timezone = "UTC" >>> tsc = TimeSeriesContainer(pathname, len(values), interval, values=values, ... start_time=start_time, data_units=data_units, ... data_type=data_type, tzid=timezone) >>> fid.put_ts(tsc) Write irregular time series without using TimeSeriesContainer: >>> pathname = r"/A/B/C//IR-DAY/F/" >>> julian_base = "01JAN2000" >>> times = ["02JUL2010 1200", "05JAN2012 0000", "15MAR2014 0200", ... "25FEB2018 0500", "19DEC2024 1200"] >>> values = [1, 20, 30, 40, 50] >>> fid.put_ts(pathname, values=values, times=times, julian_base=julian_base, ... data_units=data_units, data_type=data_type, tzid=timezone) """ if self.mode != "rw": logging.error( "Open the dss file in 'rw' mode to be able to write data on it." ) return if not isinstance(data, (str, DssPathName, TimeSeriesContainer)): raise TypeError(f"Expected pathname or TimeSeriesContainer, got {type(data).__name__}.") if isinstance(data, TimeSeriesContainer): tsc = data if tsc.interval > 0: # Regular time-series if not tsc.start_time: raise ValueError("Start date/time for regular timeseries container is not provided") else: # Irregular time-series if tsc.times is None: raise ValueError("Times for irregular timeseries container is not provided") if tsc.values is None: raise ValueError("Values for timeseries container is not provided") else: pathname = DssPathName(data) if "pathname" in kwargs: logging.warning("Ignoring pathname for TimeSeriesContainer provided as keyword argument") # -1 = irregular # 1 = regular # 0 = invalid interval = self._ts_type_from_pathname(pathname.text()) if interval == 0: raise ValueError("The pathname for timeseries has invalid interval information") values = kwargs["values"] count = len(values) _count = kwargs.pop("count", None) if _count is not None: # noqa: SIM102 if _count != count: logging.warning(f"Ignoring count argument value (={_count}) as it is not equal to the length of values (={count})") if interval < 0: # required for irregular time-series times = kwargs["times"] tsc = TimeSeriesContainer(pathname.text(), count, interval, **kwargs) super()._put(tsc)
[docs] def read_pd( self, pathname: Union[str, "DssPathName"], window: Optional[tuple[int, int, int, int]] = None, dataframe: Optional[bool] = True, ) -> Union[pd.DataFrame, PairedDataStruct]: """ Read paired data from DSS file. Parameters ---------- pathname : str or DssPathName DSS record pathname. window : tuple of (int, int, int, int) or None, optional Index window to read. If ``None``, all rows and columns are read. Default is None. Supported forms: * ``(row_start, row_end, col_start, col_end)`` Indexing rules: * Zero-based and **inclusive at both ends**. * ``row_start`` / ``col_start`` >= 0 (first row/column is 0). * ``row_end`` / ``col_end`` <= last valid index. * ``None`` for any bound selects the respective first/last index. * Negative indices are allowed (Python-style) and are **wrapped**. * If an **end** index overflows the table size, it is **clipped**. * Any other out-of-range condition raises ``IndexError``. dataframe : bool, optional If True, return a pandas DataFrame. If False, return a PairedDataStruct object. Default is True. Returns ------- pandas.DataFrame or PairedDataStruct Paired data in the requested format. Raises ------ IndexError If window indices are invalid or out of range. Examples -------- Read paired data with a window: >>> df = fid.read_pd(pathname, window=(2, 5, 0, None)) Read all paired data: >>> df = fid.read_pd(pathname) Read as PairedDataStruct: >>> pds = fid.read_pd(pathname, dataframe=False) """ pathname = DssPathName(pathname) if window: logging.debug(f"Input paired data window = '{window}'") size_info = self._pd_info(pathname.text()) rows = size_info["data_no"] cols = size_info["curve_no"] # user's 0-based indices _row_start, _row_end, _col_start, _col_end = window row_start, row_end = _normalize_span(rows, _row_start, _row_end) col_start, col_end = _normalize_span(cols, _col_start, _col_end) window = (row_start, row_end, col_start, col_end) # updated zero based indices _row_start, _row_end, _col_start, _col_end = [x - 1 for x in window] logging.debug(f"Updated window = '{window}'") pds = super()._read_pd(pathname.text(), window) if dataframe: x_data = pds.x_data y_data = pds.y_data y_labels = pds.y_labels logging.debug(y_labels) # The row in curves array contains curve data # Transpose causes the curve data to be in columns (for DataFrame purpose) tb = np.asarray(y_data).T if not window: _col_start = 0 _col_end = tb.shape[1] - 1 primary_colnames = [f"y{i}" for i in range(_col_start, _col_end + 1)] alias_colnames = ['' for x in range(_col_start, _col_end + 1)] logging.debug(f'window:{window}') logging.debug(f'col_start/end: {_col_start},{_col_end}') logging.debug(f'primary colnames: {primary_colnames}') logging.debug(f'alias columns: {alias_colnames}') for i, label in enumerate(y_labels): alias_colnames[i] = label logging.debug(f'Revised alias columns: {alias_colnames}') column_names = pd.MultiIndex.from_arrays([primary_colnames, alias_colnames], names=["primary", "labels"]) indx = list(x_data[0]) df = pd.DataFrame( data=tb, index=indx, columns=column_names, copy=True ) df.index.name = "x_data" return df return pds
[docs] def read_pd_labels(self, pathname: Union[str, "DssPathName"]) -> dict[str, str]: """ Read paired data labels from DSS file. Parameters ---------- pathname : str or DssPathName DSS record pathname. Returns ------- dict of str to str Dictionary mapping primary column names to label names. Examples -------- >>> labels = fid.read_pd_labels("/A/B/STAGE-FLOW/D/E/F/") >>> print(labels) {'y0': 'Stage', 'y1': 'Flow'} """ pathname = DssPathName(pathname) _df = self.read_pd(pathname.text(), window=(0, 0, 0, None)) label0 = _df.columns.get_level_values(0).tolist() label1 = _df.columns.get_level_values(1).tolist() return dict(zip(label0, label1))
[docs] def pd_info(self, pathname: Union[str, "DssPathName"]) -> dict[str, Any]: """ Get information about a paired data record. Parameters ---------- pathname : str or DssPathName DSS record pathname. Returns ------- dict Dictionary containing paired data information with keys: - 'curve_no' : int Number of curves (columns). - 'data_no' : int Number of data points (rows). - 'dtype' : int Data type code. - 'label_size' : int Average label size in characters. Examples -------- >>> info = fid.pd_info("/A/B/STAGE-FLOW/D/E/F/") >>> print(f"Curves: {info['curve_no']}, Points: {info['data_no']}") Curves: 2, Points: 100 """ pathname = DssPathName(pathname) return super()._pd_info(pathname.text())
[docs] def put_pd( self, data: Union["PairedDataContainer", str, "DssPathName"], **kwargs: Any, ) -> None: """ Write new paired data or edit an existing paired data record in the DSS file. Parameters ---------- data : PairedDataContainer or str or DssPathName Input data to write. Can be: * A PairedDataContainer object. * A string or DssPathName specifying an existing or new DSS record pathname. **kwargs : Any Additional keyword arguments or attributes for the PairedDataContainer. When writing a DataFrame: * y_data : pandas.DataFrame DataFrame containing paired data. * x_units : str Units for x-axis data. * x_type : str Type of x-axis data (e.g., "linear"). * y_units : str Units for y-axis data. * y_type : str Type of y-axis data (e.g., "linear"). When writing a single curve to preallocated record: * col_index : int Column index (0-based) to write to. * y_data : list or array-like Y-axis values for the curve. * window : tuple of (int, int), optional Row range (start, end) for writing. * y_labels : list of str, optional Labels for y-axis curves. Returns ------- None Raises ------ ValueError If incompatible parameters are provided or indices are out of range. IndexError If data has too many values. Examples -------- Write PairedDataContainer: >>> from pydsstools.core import PairedDataContainer >>> pathname = "/A/B/STAGE-FLOW/D/E/F/" >>> curves = 2 >>> rows = 5 >>> pdc = PairedDataContainer(pathname, (rows, curves)) >>> pdc.x_data = [0.1, 0.2, 0.3, 0.4, 0.5] >>> pdc.y_data = [[10, 20, 30, 40, 50], [1, 2, 3, 4, 5]] >>> pdc.x_units = "ft" >>> pdc.x_type = "linear" >>> pdc.y_units = "cfs" >>> pdc.y_type = "linear" >>> fid.put_pd(pdc) Write DataFrame: >>> import pandas as pd >>> pathname = "/A/B/STAGE-FLOW/D/E/F/" >>> df = pd.DataFrame({"Curve #1": [1, 2], "Curve #2": [3, 4]}, index=[0.5, 0.6]) >>> fid.put_pd(pathname, x_units="ft", x_type="linear", y_data=df, ... y_units="cfs", y_type="linear") Write a curve to preallocated paired data record: >>> pathname = "/A/B/STAGE-FLOW/D/E/PREALLOC/" >>> fid.put_pd(pathname, col_index=2, y_data=[1, 2, 3, 4], window=(2, 5)) """ if self.mode != "rw": logging.error( "Open the dss file in 'rw' mode to be able to write data on it." ) return if isinstance(data, PairedDataContainer): super()._put_pd(data) return if isinstance(data, (str, DssPathName)): pathname = DssPathName(data) y_data = kwargs.pop("y_data", None) col_index = kwargs.pop("col_index", None) if "pathname" in kwargs: logging.warning("Ignoring pathname for PairedDataContainer provided as keyword argument") if isinstance(y_data, pd.DataFrame): logging.info('Writing paired data from DataFrame') df = y_data shape = df.shape pdc = PairedDataContainer(pathname.text(), shape, **kwargs) pdc.x_data = df.index.values pdc.y_data = df.values.T y_labels = [x.strip() for x in df.columns.tolist()] # TODO: check for multilevel index explicitly try: # if the column index is multilevel and contains level named 'labels' y_labels = df.columns.get_level_values('labels').tolist() y_labels = [x.strip() for x in y_labels] except: pass pdc.y_labels = y_labels super()._put_pd(pdc) return elif isinstance(col_index, int): logging.info('Writing single paired data curve to preallocated paired data set') # pd_info raise error if the record does not exist size_info = self._pd_info(pathname.text()) rows = size_info["data_no"] cols = size_info["curve_no"] logging.debug(f"The paired data record ({pathname.text()}) in file has rows={rows} and cols={cols}") # 1-based col_index logging.debug(f"Input 0-based col_index = {col_index}") col_index, _ = _normalize_span(cols, col_index, None) logging.debug(f"Updated 1-based col_index = {col_index}") # 1-based default indices row_start, row_end = (1, rows) logging.debug(f"1-based (row_start,row_end) assuming full curve data is replaced: ({row_start},{row_end}).") # update indices based on input window = kwargs.pop("window", None) if window: if not isinstance(window, (tuple, list)): raise ValueError("The window for writing single paired data must be tuple/list containing start and end row indices.") if len(window) < 2: raise ValueError(f"The window for writing single paired data curve must contain two integers; provided '{window}'.") elif len(window) > 2: window = window[0:2] # 0-based _row_start, _row_end = window logging.debug(f"0-based (row_start,row_end) provided as input: ({_row_start},{_row_end}).") # 1-based row_start, row_end = _normalize_span(rows, _row_start, _row_end) logging.debug(f"1-based (row_start,row_end) derived from input: ({row_start},{row_end}).") y_labels = kwargs.pop('y_labels', []) # Verify y_data has ndim == 1, or if ndim == 2 shape[0] == 1 _y_data = y_data if isinstance(y_data, (tuple, list)): _y_data = np.array(y_data, np.float32) if not isinstance(_y_data, np.ndarray): raise TypeError("y_data for paired data is not of valid type") if _y_data.ndim > 2: raise ValueError("The dimension of y_data should be 1 or 2.") if _y_data.ndim == 1: _y_data = np.ascontiguousarray(_y_data.reshape(1, -1)) if _y_data.ndim == 2 and _y_data.shape[0] != 1: logging.warning("The y_data for single curve has multiple rows; flattening the data as single row of values.") _y_data = np.ascontiguousarray(_y_data.reshape(1, -1)) y_data = _y_data shape = (y_data.shape[1], 1) if shape[0] + row_start - 1 > rows: raise IndexError("y_data has too many values exceeding allowable row_end index") # update row_end based on number of y_data values if row_end != row_start + shape[0] - 1: logging.debug("row_end updated based on the number of y_data") row_end = row_start + shape[0] - 1 logging.debug(f"Single paired data curve to be written with 1-based row_start={row_start} and row_end={row_end}. Total rows in dss = {rows}.") pdc = PairedDataContainer(pathname.text(), shape, y_data=y_data, x_data=None, x_units=None, x_type=None, y_units=None, y_type=None, y_labels=y_labels, ) super()._put_one_pd(pdc, col_index, (row_start, row_end)) return raise ValueError('Incompatible input parameters provided to write paired data to dss file')
[docs] def preallocate_pd( self, pathname: Union[str, "DssPathName"], shape: Union[list[int], tuple[int, int]], **kwargs: Any, ) -> None: """ Preallocate space for paired data record in DSS file. This method creates an empty paired data structure in the DSS file that can later be filled with individual curves using put_pd with col_index parameter. Parameters ---------- pathname : str or DssPathName DSS record pathname. shape : list of int or tuple of (int, int) Shape of the paired data as (rows, columns). **kwargs : Any Additional keyword arguments for PairedDataContainer initialization, such as x_units, y_units, x_type, y_type, etc. Returns ------- None Examples -------- >>> pathname = "/A/B/STAGE-FLOW/D/E/PREALLOC/" >>> fid.preallocate_pd(pathname, shape=(100, 5), x_units="ft", y_units="cfs") """ if self.mode != "rw": logging.error( "Open the dss file in 'rw' mode to be able to write data on it." ) return pathname = DssPathName(pathname) pdc = PairedDataContainer(pathname.text(), shape, **kwargs) super()._prealloc_pd(pdc)
[docs] def read_grid( self, pathname: Union[str, "DssPathName"], metadata_only: Optional[bool] = False ) -> SpatialGridStruct: """ Read spatial grid data from DSS file. Reads both version 0 (DSS-6 format) and version 100 (latest DSS-7 format) spatial grid data from DSS file. The method automatically detects the grid version and converts older formats to the modern format. Parameters ---------- pathname : str or DssPathName DSS record pathname. metadata_only : bool, optional If True, read only metadata without grid data. Default is False. Returns ------- SpatialGridStruct Spatial grid data structure containing grid data and metadata. Examples -------- Read grid data: >>> sg = fid.read_grid("/A/B/PRECIP/01JAN2020:0000/01JAN2020:2400/GRIDTYPE/") Read only metadata: >>> sg = fid.read_grid(pathname, metadata_only=True) >>> print(sg.gridinfo.shape) (100, 200) Notes ----- There are slight differences in grid metadata between version-0 and version-100 grids. For example, the RLE-style compression used for precipitation data is supported only in version-0 grids. When a version-0 grid is read using ``read_grid``, this compression method is reported in the returned ``gridinfo`` as *undefined compression*. Consequently, if a version-0 grid needs to be read and written back while preserving its original format, the ``read_grid2`` method should be used instead. """ pathname = DssPathName(pathname) sg_st = SpatialGridStruct() retrieve_data = False if metadata_only else True grid_ver = self._get_gridver(pathname.text()) if grid_ver is None: logging.error("Invalid grid data or version") return elif grid_ver == 100: logging.info("Reading modern format (DSS7) grid") super()._read_grid100(pathname.text(), sg_st, retrieve_data) else: logging.info( "Read grid version {} and convert it to version 100 grid".format( grid_ver ) ) #if self.version == 7: # raise NotImplementedError("Reading version {} from from DSS7 file is not implemented.", grid_ver) # find grid_type and create gridinfo6 grid_type = self._get_gridtype(pathname.text()) logging.debug("grid type is {}".format(grid_type)) gridinfo6 = GridInfo6.from_grid_type(grid_type) logging.debug("grid type in gridinfo6 is {}".format(gridinfo6.grid_type)) if grid_type == 430: # add space for crs definition, tz id generously # it should be more than what is in the file gridinfo6 = GridInfo6.get_specinfo6(50, 200, 50) logging.debug( "grid type in updated gridinfo6 is {}".format(gridinfo6.grid_type) ) super()._read_grid0(pathname.text(), sg_st, gridinfo6, retrieve_data) return sg_st
[docs] def read_grid2( self, pathname: Union[str, "DssPathName"], metadata_only: Optional[bool] = False ) -> Optional[Union[tuple[np.ndarray, GridInfo], GridInfo]]: """ Read spatial grid data from DSS file and return as tuple. Reads both version 0 (DSS-6 format) and version 100 (latest DSS-7 format) spatial grid data. This method provides an alternative return format compared to read_grid. Parameters ---------- pathname : str or DssPathName DSS record pathname. metadata_only : bool, optional If True, return only metadata (gridinfo). Default is False. Returns ------- tuple of (numpy.ndarray, GridInfo) or GridInfo or None If metadata_only is False, returns tuple of (numpy.ndarray, gridinfo). If metadata_only is True, returns gridinfo only. Returns None if grid data is invalid. Examples -------- Read grid as array and gridinfo: >>> data, gridinfo = fid.read_grid2(pathname) >>> print(data.shape, gridinfo.grid_type) Read only gridinfo: >>> gridinfo = fid.read_grid2(pathname, metadata_only=True) """ pathname = DssPathName(pathname) retrieve_data = False if metadata_only else True grid_ver = self._get_gridver(pathname.text()) if grid_ver is None: logging.error("Invalid grid data or version") elif grid_ver != 0: logging.info("Reading modern format (DSS7) grid") ds = self.read_grid(pathname.text(), retrieve_data) if metadata_only: logging.info("Returning metadata of gridded data") return ds.gridinfo else: return ds.read(), ds.gridinfo else: logging.info("Reading older format (DSS6 or grid version 0) grid") #if self.version == 7: # raise NotImplementedError("Reading version {} from from DSS7 file is not implemented.", grid_ver) # find grid_type and create gridinfo6 grid_type = self._get_gridtype(pathname.text()) gridinfo6 = GridInfo6.from_grid_type(grid_type) if grid_type == 430: # TODO: Investigate why locally run pytest randomly corrupts the spec type grid data # add space for crs definition, tz id generously # it should be more than what is in the file gridinfo6 = GridInfo6.get_specinfo6(50, 200, 50) # gridinfo6 is updated with data from the dss file data = super()._read_grid0_array(pathname.text(), gridinfo6, retrieve_data) if metadata_only: logging.info("Returning metadata of gridded data") if data is not None: return gridinfo6 if data is not None: logging.info("Returning metadata/data of gridded data") return data, gridinfo6
[docs] def put_grid( self, data: Union["SpatialGridStruct", np.ndarray], pathname: Optional[Union[str, "DssPathName"]] = None, gridinfo: Optional[GridInfo] = None, flipud: Optional[bool] = True, inplace: Optional[bool] = False, compute_stats: Optional[Union[bool, list[float]]] = True, transform: Optional[Any] = None, normalize: Optional[bool] = True, ) -> None: """ Write spatial grid to DSS-7 file. Writing to DSS-6 file is not allowed. Use put_grid0 for DSS-6 files. Parameters ---------- data : SpatialGridStruct or numpy.ndarray or numpy.ma.MaskedArray Grid data to write. * **numpy.ndarray**: ``np.nan`` and ``nodata`` (from ``gridinfo``) and ``UNDEFINED`` values are treated as nodata. * **numpy.ma.MaskedArray**: masked elements are treated as nodata. * **SpatialGridStruct**: a structured object containing grid and metadata. pathname : str or DssPathName or None, optional Pathname for the DSS record. It can be None for SpatialGridStruct. The dates in parts D and E are automatically reformatted to correct convention. Part D uses the beginning of the day (e.g., ``02JAN2025:0000``) while Part E uses the end of the previous day convention (e.g., ``01JAN2025:2400``). Default is None. gridinfo : GridInfo or subclass or None, optional Metadata describing the grid. Can be one of: * ``GridInfo``, ``HrapInfo``, or ``AlbersInfo``: requires ``data_type``, ``cell_size``, ``shape`` at minimum. * ``SpecifiedInfo``: additionally ``nodata`` and ``crs``. Default is None. flipud : bool, optional If True, flips the rows of the data array upside down before writing. This is necessary when the input data is numpy array with origin at top-left (e.g., array representing raster image in rasterio). Default is True. inplace : bool, optional If True, tries to modify the data in place to reduce memory usage. Default is False. compute_stats : bool or list of float, optional Controls whether and how statistics are computed for the grid data. Default is True. Possible values: * **True**: compute min, max, mean, range values, and range counts. * **False**: do not compute statistics. * **list of float**: compute "greater than or equal to" counts for the specified values (maximum of 19 thresholds, excluding nodata). transform : Any or None, optional Spatial transform information (e.g., affine transform). If provided, it overrides transform parameters in ``gridinfo``. Default is None. normalize : bool, optional If True, tries to normalize coords_cell0 and lower_left_cell based on min_xy or input transform parameter. Default is True. Returns ------- None Raises ------ Exception If D-part or E-part is not a valid datetime string for time-stamped grids. Examples -------- Write grid from array: >>> import numpy as np >>> from pydsstools.core.gridinfo import SpecifiedGridInfo >>> data = np.random.rand(100, 200).astype(np.float32) >>> pathname = "/A/B/PRECIP/01JAN2020:0000/01JAN2020:2400/SHG/" >>> gridinfo = SpecifiedGridInfo(data_type="PER-CUM", cell_size=2000.0, ... lower_left_x=100000, lower_left_y=200000, ... rows=100, cols=200, nodata=-999.0) >>> fid.put_grid(data, pathname, gridinfo) Write with custom statistics thresholds: >>> fid.put_grid(data, pathname, gridinfo, compute_stats=[0, 10, 50, 100]) """ if self.mode != "rw": logging.error( "Open the dss file in 'rw' mode to be able to write data on it." ) return if self.version == 6: logging.warning("Writing DSS grid record in DSS-6 file is not supported") return if not isinstance(data, (SpatialGridStruct, np.ndarray)): raise TypeError(f"Expected SpatialGridStruct or numpy.ndarray, got {type(data).__name__}.") if isinstance(data, SpatialGridStruct): # use this for copying from one file to another or updating statistics if pathname is None: pathname = DssPathName(data.pathname) else: pathname = DssPathName(pathname) gridinfo = data.gridinfo elif isinstance(data, np.ndarray): if not isinstance(gridinfo, GridInfo): logging.error("GridInfo is not provided to write gridded dataset") return if pathname is None: logging.error( "Provide valid pathname for grid record!", exc_info=True ) return pathname = DssPathName(pathname) # Verify pathname has valid datetime stamps when grid is specified to have time component if gridinfo.grid_type_has_time(): dpart = pathname.dpart epart = pathname.epart try: # check if dpart, epart or both are not datetime # TODO: Found out HecTime('1') passes this test stime = HecTime(dpart, midnight_as_2400=False, date_style=2, time_style=0) etime = HecTime(epart, midnight_as_2400=True, date_style=2, time_style=0) except: raise Exception( "For %s grid type, DPart and EPart of pathname must be datetime string" ) else: # unsure about this param gridinfo.time_stamped = 1 # update D and E part of pathname pathname.dpart = stime.text() pathname.epart = etime.text() grid_type = gridinfo.grid_type shape = gridinfo.shape nodata = UNDEFINED if grid_type == GridType.specified or grid_type == GridType.specified_time: nodata = gridinfo.nodata _data,stats = _sanitize_grid_array_for_dss_write(data,nodata,shape,flipud,inplace,compute_stats) if stats: gridinfo.max_val = stats["max_val"] gridinfo.min_val = stats["min_val"] gridinfo.mean_val = stats["mean_val"] gridinfo.range_vals = stats["range_vals"] gridinfo.range_counts = stats["range_counts"] if normalize: gridinfo.normalize(transform) super()._put_grid(pathname.text(), _data, gridinfo)
[docs] def put_grid0( self, data: Union["SpatialGridStruct", np.ndarray], pathname: Optional[Union[str, "DssPathName"]] = None, gridinfo: Optional[Union[GridInfo, GridInfo6]] = None, flipud: Optional[bool] = True, inplace: Optional[bool] = False, compute_stats: Optional[Union[bool, list[float]]] = True, transform: Optional[Any] = None, normalize: Optional[bool] = True, ) -> None: """ Write spatial grid to DSS-6 file. Writing to DSS-7 file using this method is experimental and may cause problems. Use put_grid for DSS-7 files instead. Parameters ---------- data : SpatialGridStruct or numpy.ndarray or numpy.ma.MaskedArray Grid data to write. * **numpy.ndarray**: ``np.nan`` and ``nodata`` (from ``gridinfo``) and ``UNDEFINED`` values are treated as nodata. * **numpy.ma.MaskedArray**: masked elements are treated as nodata. * **SpatialGridStruct**: a structured object containing grid and metadata. pathname : str or DssPathName or None, optional Pathname for the DSS record. It can be None for SpatialGridStruct. The dates in parts D and E are automatically reformatted to correct convention. Part D uses the beginning of the day (e.g., ``02JAN2025:0000``) while Part E uses the end of the previous day convention (e.g., ``01JAN2025:2400``). Default is None. gridinfo : GridInfo or GridInfo6 or None, optional Metadata describing the grid for version 6 and 7. Default is None. flipud : bool, optional If True, flips the rows of the data array upside down before writing. This is necessary when the input data is numpy array with origin at top-left (e.g., array representing raster image in rasterio). Default is True. inplace : bool, optional If True, tries to modify the data in place to reduce memory usage. Default is False. compute_stats : bool or list of float, optional Controls whether and how statistics are computed for the grid data. Default is True. Possible values: * **True**: compute min, max, mean, range values, and range counts. * **False**: do not compute statistics. * **list of float**: compute "greater than or equal to" counts for the specified values (maximum of 19 thresholds, excluding nodata). transform : Any or None, optional Spatial transform information (e.g., affine transform). If provided, it overrides transform parameters in ``gridinfo``. Default is None. normalize : bool, optional If True, tries to normalize coords_cell0 and lower_left_cell based on min_xy or input transform parameter. Default is True. Returns ------- None Raises ------ Exception If D-part or E-part is not a valid datetime string for time-stamped grids. Notes ----- This method writes grid data in DSS-6 (version 0) format. It is primarily intended for maintaining compatibility with legacy DSS-6 files. """ if self.mode != "rw": logging.error( "Open the dss file in 'rw' mode to be able to write data on it." ) return if self.version == 7: logging.warning( "Writing version 0 (DSS-6 format) grid data to DSS7 file is experimental." ) if not isinstance(data, (SpatialGridStruct, np.ndarray)): raise TypeError(f"Expected SpatialGridStruct or numpy.ndarray, got {type(data).__name__}.") if isinstance(data, SpatialGridStruct): # use this for copying from one file to another or updating statistics if pathname is None: pathname = DssPathName(data.pathname) else: pathname = DssPathName(pathname) gridinfo = data.gridinfo elif isinstance(data, np.ndarray): if not isinstance(gridinfo, GridInfo): logging.error("GridInfo is not provided to write gridded dataset") return if pathname is None: logging.error( "Provide valid pathname for grid record!", exc_info=True ) return pathname = DssPathName(pathname) # convert to gridinfo from verion 0 or 6 to 7, which is easier to work with if isinstance(gridinfo, GridInfo6): gridinfo = gridinfo.to_gridinfo7() # Verify pathname has valid datetime stamps when grid is specified to have time component if gridinfo.grid_type_has_time(): dpart = pathname.dpart epart = pathname.epart try: # check if dpart, epart or both are not datetime # TODO: Found out HecTime('1') passes this test stime = HecTime(dpart, midnight_as_2400=False, date_style=4, time_style=0) etime = HecTime(epart, midnight_as_2400=True, date_style=4, time_style=0) except: raise Exception( "For %s grid type, DPart and EPart of pathname must be datetime string" ) else: # unsure about this param gridinfo.time_stamped = 1 # update D and E part of pathname pathname.dpart = stime.text() pathname.epart = etime.text() grid_type = gridinfo.grid_type shape = gridinfo.shape nodata = UNDEFINED if grid_type == GridType.specified or grid_type == GridType.specified_time: nodata = gridinfo.nodata _data,stats = _sanitize_grid_array_for_dss_write(data,nodata,shape,flipud,inplace,compute_stats) if stats: gridinfo.max_val = stats["max_val"] gridinfo.min_val = stats["min_val"] gridinfo.mean_val = stats["mean_val"] gridinfo.range_vals = stats["range_vals"] gridinfo.range_counts = stats["range_counts"] if normalize: gridinfo.normalize(transform) gridinfo6 = gridinfo7_to_gridinfo6(gridinfo, pathname.text()) super()._put_grid0(pathname.text(), _data, gridinfo6)
[docs] def copy_path( self, pathname_in: Union[str, "DssPathName"], pathname_out: Union[str, "DssPathName"], dss_out: Optional["Open"] = None, ) -> None: """ Copy a DSS record from one pathname to another. Can copy within the same file or to a different DSS file. Parameters ---------- pathname_in : str or DssPathName Source pathname to copy from. pathname_out : str or DssPathName Destination pathname to copy to. dss_out : Open or None, optional Destination DSS file object. If None, copies within the same file. Default is None. Returns ------- None Examples -------- Copy within same file: >>> fid.copy_path("/A/B/C/D/E/F/", "/A/B/C_COPY/D/E/F/") Copy to different file: >>> with Open("target.dss", mode="rw") as fid_out: ... fid.copy_path("/A/B/C/D/E/F/", "/A/B/C/D/E/F/", dss_out=fid_out) """ dss_fid = dss_out if isinstance(dss_out, self.__class__) else self if dss_fid.mode != "rw": logging.error( "Open the dss file in 'rw' mode to be able to write data on it." ) return pathname_in = DssPathName(pathname_in) pathname_out = DssPathName(pathname_out) if ( pathname_in.text().lower() == pathname_out.text().lower() ) and dss_fid is self: # overwriting with exact data is pointless return self._copyRecordsTo(dss_fid, pathname_in.text(), pathname_out.text())
[docs] def del_path(self, pathname: Union[str, "DssPathName"]) -> None: """ Delete DSS record(s) matching the given pathname pattern. Parameters ---------- pathname : str or DssPathName Pathname or pathname pattern to delete. Supports wildcards (*). Returns ------- None Examples -------- Delete specific record: >>> fid.del_path("/A/B/C/D/E/F/") Delete multiple records with wildcards: >>> fid.del_path("/A/B/*/D/E/F/") """ if self.mode != "rw": logging.error( "Open the dss file in 'rw' mode to be able to write data on it." ) return pathname_pattern = _process_pathname_pattern(pathname) pathlist = self.search_path(pathname_pattern) for pth in pathlist: status = self._delete_pathname(pth)
[docs] def search_path( self, pathname: Union[str, "DssPathName"] = "", sort: Optional[bool] = False ) -> list[str]: """ Search for DSS pathnames matching a pattern. Parameters ---------- pathname : str or DssPathName, optional Pathname pattern which can include wildcard (*) for defining search pattern. Empty string returns all pathnames. Default is "". sort : bool, optional If True, sort the returned pathnames. Default is False. Returns ------- list of str List of matching pathnames. Examples -------- Get all pathnames: >>> paths = fid.search_path() Search with pattern: >>> paths = fid.search_path("/A/B/*/D/E/F/") Get sorted results: >>> paths = fid.search_path("/A/*/*/*/*/F/", sort=True) """ path_list = [] if pathname: pathname = _process_pathname_pattern(pathname) catalog = self._get_catalog(pathname, sort) if catalog is not None: path_list = catalog.paths() return path_list
[docs] def path_dict(self, sub_type: Optional[bool] = False) -> dict[str, list[str]]: """ Get all pathnames in DSS file organized by data type. Parameters ---------- sub_type : bool, optional If True, separate time series into regular and irregular, and grids by type. If False, group all time series together and all grids together. Default is False. Returns ------- dict of str to list of str Dictionary mapping data type names to lists of pathnames. When sub_type is True, keys include: * "ts-reg": Regular time series * "ts-irreg": Irregular time series * "pd": Paired data * "text": Text data * "text-table": Text tables * "grid-undefined": Undefined grid type * "grid-hrap": HRAP grids * "grid-albers": Albers grids * "grid-spec": Specified grids * "tin": TIN data * "location": Location data * "array": Array data * "image": Image data * "generic": Generic data * "undefined": Undefined data types When sub_type is False, keys include: * "ts": All time series (regular + irregular) * "grid": All grids (undefined + hrap + albers + specified) * Other keys same as above Examples -------- Get all paths grouped by general type: >>> paths = fid.path_dict() >>> print(f"Time series: {len(paths['ts'])}") >>> print(f"Paired data: {len(paths['pd'])}") Get paths with detailed sub-types: >>> paths = fid.path_dict(sub_type=True) >>> print(f"Regular TS: {len(paths['ts-reg'])}") >>> print(f"Irregular TS: {len(paths['ts-irreg'])}") """ ts_rts = [] ts_its = [] pd = [] text_data = [] text_table = [] grid_undefined = [] grid_hrap = [] grid_albers = [] grid_spec = [] tin = [] location = [] array_data = [] image_data = [] generic_data = [] undefined_data = [] path_list = self.search_path("") for path in path_list: name = self._record_type_name(path, abbr=True) logging.debug(f"{path} is record type {name}.") name = name.upper() if name.startswith("RT"): ts_rts.append(path) elif name.startswith("IT"): ts_its.append(path) elif name.startswith("PD"): pd.append(path) elif name.startswith("TXT"): text_data.append(path) elif name.startswith("TT"): text_table.append(path) elif name.startswith("UG"): grid_undefined.append(path) elif name.startswith("HG"): grid_hrap.append(path) elif name.startswith("AG"): grid_albers.append(path) elif name.startswith("SG"): grid_spec.append(path) elif name.startswith("SPA"): tin.append(path) elif name.startswith("LOC"): location.append(path) elif name.startswith("ARR"): array_data.append(path) elif name.startswith("IM"): image_data.append(path) elif name.startswith("GEN"): generic_data.append(path) else: undefined_data.append(path) if sub_type: result = { "ts-reg": ts_rts, "ts-irreg": ts_its, "pd": pd, "text": text_data, "text-table": text_table, "grid-undefined": grid_undefined, "grid-hrap": grid_hrap, "grid-albers": grid_albers, "grid-spec": grid_spec, "tin": tin, "location": location, "array": array_data, "image": image_data, "generic": generic_data, "undefined": undefined_data, } else: result = { "ts": ts_rts + ts_its, "pd": pd, "text": text_data, "text-table": text_table, "grid": grid_undefined + grid_hrap + grid_albers + grid_spec, "tin": tin, "location": location, "array": array_data, "image": image_data, "generic": generic_data, "undefined": undefined_data, } return result
# ==================== Helper Functions ==================== def _normalize_span( size: int, start0: Optional[int], end0: Optional[int], ) -> tuple[int, int]: """ Convert 0-based indices to 1-based indices for paired data. Python functions expect 0-based indices while C API uses 1-based indices. Parameters ---------- start0 : int or None Start index (0-based). If None, defaults to 0. end0 : int or None End index (0-based). If None, defaults to size-1. size : int Total size of the span being indexed. Returns ------- tuple of (int, int) Tuple containing (start, end) as 1-based indices. Raises ------ IndexError If indices are out of range or invalid. """ if not isinstance(size, int) or size < 0: raise IndexError("size must be a non-negative int") if size == 0: raise IndexError("Size of the span being indexed can not be zero") # start (0-based, wrap negatives; must be in [0, size-1]) if start0 is None: s0 = 0 else: if not isinstance(start0, int): raise IndexError("start must be int or None") # wrap negative s0 = start0 + size if start0 < 0 else start0 if not (0 <= s0 < size): raise IndexError(f"start {s0} out of range for size={size}") # end (0-based, wrap negatives; allow [0, size-1], clip only if >= size) if end0 is None: e0 = size - 1 else: if not isinstance(end0, int): raise IndexError("end must be int or None") # wrap negative e0 = end0 + size if end0 < 0 else end0 if e0 < 0: raise IndexError(f"end {e0} out of range after wrap") if e0 >= size: # clip e0 = size - 1 if s0 > e0: raise IndexError(f"invalid span: start {s0} > end {e0}") # map 0-based to 1-based return (s0 + 1, e0 + 1) def _sanitize_grid_array_for_dss_write(data,nodata,shape,flipud=True,inplace=False,compute_stats=False,range_values=None): # UNDEFINED is treated as nodata for gridded data. Additional nodata value is associated with Specified Grid. # TODO: masked elements and nans are converted to nodata; is it better to use UNDEFINED instead? is_masked = isinstance(data,ma.core.MaskedArray) is_sgrid = isinstance(data,SpatialGridStruct) is_nodata_undefined = nodata == UNDEFINED # Convert data to _data and mask arrays mask = None _data = data is_copied = False make_copy = not inplace if is_masked: # data is masked array _data = data._data mask = data.mask if _data.dtype != np.float32: # float32 and c_contiguous _data = _data.astype(np.float32, order="C", casting="unsafe", copy=True) _data[mask] = nodata is_copied = True elif not make_copy: # replace masked elements with nodata (ignoring array's fill value that can be arbitrary value) # TODO: check if setting fill value has any side effect in some cases data.set_fill_value(nodata) data.data[mask] = nodata else: _data = data.filled(nodata) elif is_sgrid: _data = data._get_mview() _data.setflags(write=1) # memory view is (rows*cols,) 1D array # reshape it to raster 2d-array without copy # buffer is laid out consistent with DSS API requirement and does not require flipud _data = np.reshape(_data,shape) if _data.dtype != np.float32: _data = _data.astype(np.float32, order="C", casting="unsafe", copy=True) is_copied = True else: # data is 2D array if _data.dtype != np.float32: _data = _data.astype(np.float32, order="C", casting="unsafe", copy=True) is_copied = True if np.any(np.isnan(_data)): if make_copy and not is_copied: _data = _data.copy() nan_mask = np.isnan(_data) _data[nan_mask] = nodata # _data can have both UNDEFINED and nodata at this point if (not is_sgrid) and flipud: _data = np.flipud(_data) if not _data.flags["C_CONTIGUOUS"]: _data = np.ascontiguousarray(_data) def _compute_stats(): data_count = _data.size if is_masked: filtered_data = _data[~mask] else: if is_nodata_undefined: filtered_data = _data[_data != UNDEFINED] else: filtered_data = _data[(_data != UNDEFINED) & (_data != nodata)] min_val = filtered_data.min() max_val = filtered_data.max() mean_val = filtered_data.mean() range_counts = [data_count] if isinstance(range_values,(list,tuple)): range_vals = [x for x in range_values] elif is_sgrid: range_vals = data.gridinfo.range_vals else: # compute range values as quartiles range_vals = list(np.percentile(filtered_data,[25,50,75])) range_vals = sorted([x for x in range_vals if not (np.isnan(x) or x < min_val or x > max_val) or x==nodata or x==UNDEFINED]) range_vals = range_vals[0:20] range_vals.insert(0,UNDEFINED) for val in range_vals[1:]: cnt = (filtered_data >= val).sum() range_counts.append(cnt) stats = { "min_val": min_val, "max_val": max_val, "mean_val": mean_val, "range_vals": range_vals, "range_counts": range_counts } return stats stats = None if compute_stats: stats = _compute_stats() return _data,stats def _process_pathname_pattern(pathname: Union[str, DssPathName]) -> str: """ Process pathname pattern for catalog searches. Converts empty pathname parts (represented by //) to wildcards (*). Parameters ---------- pathname : str or DssPathName Pathname or pattern to process. Returns ------- str Processed pathname string with wildcards. Examples -------- >>> _process_pathname_pattern("/A/B//D//F/") '/A/B/*/D/*/F/' """ pathname_obj = DssPathName(pathname) return pathname_obj.text().replace("//", "/*/")