Source code for pydsstools.core.gridinfo.v6.conversion

"""Conversion functions between DSS v6 and v7 grid metadata.

This module provides functions to convert between the legacy v6 format
(ctypes structures) and the modern v7 format (Pydantic models), as well
as string encoding/decoding utilities for v6 binary I/O.

String Encoding in v6
---------------------
DSS v6 stores strings as arrays of int32 values. Each int32 holds 4 ASCII
characters packed in little-endian byte order:

    "HRAP" → [1347571272]
    "MM"   → [19789]

Each int32 holds 4 bytes: H(72), R(82), A(65), P(80) → 0x50415248 = 1347571272
"""

from __future__ import annotations

import logging
import struct
import ctypes
from typing import Optional
from ... import HecTime, DssPathName

__all__ = [
    "str_to_ints",
    "ints_to_str",
    "float32_to_int32",
    "int32_to_float32",
    "gridinfo7_to_gridinfo6",
    "gridinfo6_to_gridinfo7_dict",
]


# ============================================================================
# String Encoding/Decoding for v6 Binary Format
# ============================================================================

[docs] def str_to_ints(s: str, endian: str = "little", signed: bool = True) -> list[int]: """Convert ASCII string to array of int32 values. Strings in DSS v6 are stored as arrays of int32, with 4 ASCII characters packed into each int32 value. This function performs that encoding. Parameters ---------- s : str ASCII string to encode. endian : str, optional Byte order: "little" or "big", default "little". signed : bool, optional If True, use signed int32; if False, use unsigned, default True. Returns ------- list[int] List of int32 values representing the string. Examples -------- >>> str_to_ints("HRAP") [1347571272] >>> str_to_ints("MM") [19789] >>> str_to_ints("PRECIPITATION") [1346457936, 1414676528, 1413830225, 78] Notes ----- Empty strings are treated as a single null character for padding. Strings are padded to 4-byte boundaries with null bytes. """ b = s.encode("ascii") length = len(b) if length == 0: length = 1 # Fill empty string with nulls ints = [] step = 4 pad = b"\x00" for i in range(0, length, step): chunk = b[i : i + 4] chunk += pad * (4 - len(chunk)) # Pad to 4 bytes if signed: if endian == "little": val = struct.unpack("<i", chunk)[0] else: val = struct.unpack(">i", chunk)[0] else: if endian == "little": val = struct.unpack("<I", chunk)[0] else: val = struct.unpack(">I", chunk)[0] ints.append(val) return ints
_ID = bytes.maketrans(b"", b"")
[docs] def ints_to_str( ints: list[int], endian: str = "little", encoding: str = "ascii", strip_trailing_nulls: bool = True, stop_at_first_null: bool = True, ) -> str: """Convert array of int32 values to ASCII string. Reverse of str_to_ints(). Unpacks 4 ASCII characters from each int32 and assembles them into a string. Parameters ---------- ints : list[int] List of int32 values representing a string. endian : str, optional Byte order: "little" or "big", default "little". encoding : str, optional Character encoding, default "ascii". strip_trailing_nulls : bool, optional If True, remove null padding at end, default True. stop_at_first_null : bool, optional If True, stop at first null (C-string semantics), default True. Returns ------- str Decoded ASCII string. Examples -------- >>> ints_to_str([1347571272]) 'HRAP' >>> ints_to_str([19789]) 'MM' >>> ints_to_str([1346457936, 1414676528, 1413830225, 78]) 'PRECIPITATION' Notes ----- - Applies C-string semantics by default (stops at first null) - Removes non-ASCII characters (values >= 128) - Handles both signed and unsigned int32 values """ fmt = "<I" if endian == "little" else ">I" b = bytearray() for v in ints: b.extend(struct.pack(fmt, v & 0xFFFFFFFF)) # 1) Apply C-string semantics first (if requested) # Everything after first NUL is discarded if stop_at_first_null: i = b.find(0) if i != -1: del b[i:] # 2) Optionally strip trailing NUL padding if strip_trailing_nulls and not stop_at_first_null: while b and b[-1] == 0: b.pop() # 3) Drop unwanted bytes (keep only ASCII excluding NUL) ascii_only = bytes(b).translate(_ID, b"\x00" + bytes(range(128, 256))) return ascii_only.decode(encoding)
[docs] def float32_to_int32(value: float, endian: str = "<") -> int: """Reinterpret float32 as int32 bits. Parameters ---------- value : float 32-bit floating point value. endian : str, optional Endianness: '=' native, '<' little-endian, '>' big-endian. Default is '<'. Returns ------- int Integer with same bit pattern as float. """ return struct.unpack(f"{endian}i", struct.pack(f"{endian}f", value))[0]
[docs] def int32_to_float32(value: int, endian: str = "<") -> float: """Reinterpret int32 bits as float32. Parameters ---------- value : int 32-bit integer value. endian : str, optional Endianness: '=' native, '<' little-endian, '>' big-endian. Default is '<'. Returns ------- float Float with same bit pattern as integer. """ return struct.unpack(f"{endian}f", struct.pack(f"{endian}i", value))[0]
# ============================================================================ # v7 to v6 Conversion # ============================================================================
[docs] def gridinfo7_to_gridinfo6(gridinfo7, pathname: str): """Convert DSS v7 GridInfo to DSS v6 format. Parameters ---------- gridinfo7 : GridInfo or subclass DSS v7 grid info object (from gridinfo package). pathname : str DSS pathname (used to extract start/end times from D and E parts). Returns ------- GridInfo6 or subclass DSS v6 grid info structure (ctypes object). Notes ----- Handles conversion of: - Pydantic models to ctypes structures - GridInfo enum types to integer codes - Tuple coordinates to separate X/Y fields - String units to int32 arrays - Python bools to int32 values Some fields may be truncated if they exceed DSS v6 limits: - data_units: 12 characters max - data_source: 12 characters max (HRAP) - proj_units: 12 characters max (Albers) - range_vals/counts: 20 bins max Examples -------- >>> from pydsstools.core.gridinfo import GridInfoCreate, GridType, DataType >>> from pydsstools.core.gridinfo.v6 import gridinfo7_to_gridinfo6 >>> >>> info7 = GridInfoCreate( ... grid_type=GridType.hrap, ... data_type=DataType.per_aver, ... shape=(100, 150), ... cell_size=4762.5 ... ) >>> pathname = "/GRID/LOC/PRECIP/01JAN2020:0000//" >>> info6 = gridinfo7_to_gridinfo6(info7, pathname) """ from .structures import ( GridInfo6Base, GridInfo6, HrapInfo6, AlbersInfo6, SpecifiedInfo6, SPECIFIED_GRID_INFO_VERSION, ) grid_type = gridinfo7.get_v6_grid_type() # enum grid_type = grid_type.value info6 = GridInfo6Base.from_grid_type(grid_type) info7 = gridinfo7 # Common parameters found in all grid types # ----------------------------------------- # Start and end time from DSS pathname path = DssPathName(pathname) dpart = path.dpart epart = path.epart try: stime = HecTime(dpart, granularity=60) info6.stime = stime.value() except: info6.stime = 0 try: etime = HecTime(epart, granularity=60) info6.etime = etime.value() except: info6.etime = 0 # data_units data_units = str_to_ints(info7.data_units) if len(data_units) > 3: logging.warning("data_units was truncated during conversion to grid v6") info6.data_units = (ctypes.c_int32 * 3)(*data_units[0:3]) # data_type info6.data_type = info7.data_type.value info6.lower_left_x = info7.lower_left_cell[0] info6.lower_left_y = info7.lower_left_cell[1] info6.rows = info7.shape[0] info6.cols = info7.shape[1] info6.cell_size = info7.cell_size # Compression comp_method = info7.compression_method.value comp_base = info7.compression_base comp_factor = info7.compression_factor info6.compression_method = comp_method info6.compression_base = comp_base info6.compression_factor = comp_factor # Statistics info6.max_val = info7.max_val info6.min_val = info7.min_val info6.mean_val = info7.mean_val range_vals = info7.range_vals range_counts = info7.range_counts end = 0 for i, count in enumerate(reversed(range_counts)): if count != 0: end = i break range_length = len(range_counts) - end range_length = min(len(range_vals), range_length, 20) info6.range_length = range_length for i in range(range_length): info6.range_vals[i] = info7.range_vals[i] info6.range_counts[i] = info7.range_counts[i] # Grid type-specific fields # ----------------------------------------- if grid_type == 400: # Undefined grid info_flat_size = ctypes.sizeof(GridInfo6) info_size = 124 info_gsize = 124 elif grid_type == 410: # HRAP info_flat_size = ctypes.sizeof(HrapInfo6) info_size = 128 info_gsize = 124 # data_source data_source = str_to_ints(info7.data_source) if len(data_source) > 3: logging.warning("data_source was truncated during conversion to grid v6") info6.data_source = (ctypes.c_int32 * 3)(*data_source[0:3]) elif grid_type == 420: # Albers info_flat_size = ctypes.sizeof(AlbersInfo6) info_size = 164 info_gsize = 124 # Projection parameters info6.proj_datum = info7.proj_datum.value proj_units = str_to_ints(info7.proj_units) if len(proj_units) > 3: logging.warning("proj_unit was truncated during conversion to grid v6") info6.proj_units = (ctypes.c_int32 * 3)(*proj_units[0:3]) info6.lat_origin = info7.lat_0 info6.first_parallel = info7.lat_1 info6.sec_parallel = info7.lat_2 info6.central_meridian = info7.lon_0 info6.false_easting = info7.x_0 info6.false_northing = info7.y_0 info6.xcoord_cell0 = info7.coords_cell0[0] info6.ycoord_cell0 = info7.coords_cell0[1] elif grid_type == 430: # Specified info_flat_size = ctypes.sizeof(SpecifiedInfo6) info_size = 160 info_gsize = 124 info6.version = SPECIFIED_GRID_INFO_VERSION # CRS name crs_name = str_to_ints(info7.crs_name) count = len(crs_name) if count > 0: info6.crs_name = (ctypes.c_int32 * count)(*crs_name) info6.crs_name_length = count info_flat_size += count * 4 # CRS definition crs = str_to_ints(info7.crs) count = len(crs) info6.crs_def_length = count if count > 0: info6.crs_def = (ctypes.c_int32 * count)(*crs) info_flat_size += count * 4 # Cell zero coordinates info6.xcoord_cell0 = info7.coords_cell0[0] info6.ycoord_cell0 = info7.coords_cell0[1] # NoData value info6.nodata = info7.nodata # Time zone tzid = str_to_ints(info7.tzid) count = len(tzid) info6.tzid_length = count if count > 0: info6.tzid = (ctypes.c_int32 * count)(*tzid) info_flat_size += count * 4 info6.tzoffset = info7.tzoffset info6.is_interval = int(info7.is_interval) info6.time_stamped = int(info7.time_stamped) info6.info_fsize = info_flat_size info6.info_size = info_size info6.info_gsize = info_gsize return info6
# ============================================================================ # v6 to v7 Conversion # ============================================================================
[docs] def gridinfo6_to_gridinfo7_dict(gridinfo6) -> dict: """Convert DSS v6 GridInfo to v7-compatible dictionary. Parameters ---------- gridinfo6 : GridInfo6 or subclass DSS v6 grid info structure (ctypes object). Returns ------- dict Dictionary compatible with GridInfoCreate() factory function. Notes ----- Handles conversion of: - Separate X/Y fields to tuple coordinates - Integer codes to GridType enum values - int32 arrays to Python strings - CRS inference from grid parameters Examples -------- >>> from pydsstools.core.gridinfo.v6 import gridinfo6_to_gridinfo7_dict >>> from pydsstools.core.gridinfo import GridInfoCreate >>> >>> # Assume info6 is a GridInfo6 from C library >>> info7_dict = gridinfo6_to_gridinfo7_dict(info6) >>> info7 = GridInfoCreate(**info7_dict) """ prof6 = gridinfo6.to_dict() grid_type = prof6["grid_type"] # Convert separate X/Y to tuple llx = prof6.pop("lower_left_x") lly = prof6.pop("lower_left_y") prof6["lower_left_cell"] = (llx, lly) rows = prof6.pop("rows") cols = prof6.pop("cols") prof6["shape"] = (rows, cols) prof6.pop("range_length") # Infer CRS information prof6["crs_name"] = gridinfo6._infer_crs_name() prof6["crs"] = gridinfo6._infer_crs() # Grid type-specific conversions if grid_type == 410: # HRAP - no special conversion needed pass if grid_type == 420: # Albers xcoord = prof6.pop("xcoord_cell0") ycoord = prof6.pop("ycoord_cell0") prof6["coords_cell0"] = (xcoord, ycoord) if grid_type == 430: # Specified xcoord = prof6.pop("xcoord_cell0") ycoord = prof6.pop("ycoord_cell0") prof6["coords_cell0"] = (xcoord, ycoord) # Remove length fields (handled automatically in v7) prof6.pop("version") prof6.pop("crs_name_length") prof6.pop("crs_def_length") prof6.pop("tzid_length") return prof6