Source code for OpenPinch.utils.input_validation

"""Input normalisation helpers shared by Excel/CSV/JSON ingestion paths."""

import pandas as pd

__all__ = ["validate_stream_data", "validate_utility_data"]


################################################################################
# Public API
################################################################################


[docs] def validate_stream_data(sd: pd.DataFrame): """Normalise stream records and fill missing names/zones with sensible defaults.""" default_zone = "Process Zone" default_stream_prefix = "S" def _normalize_label(value, prefix: str): if value is None or pd.isna(value): return value text = str(value).strip() if "." in text: text = text.replace(".", "-") if text.isdigit(): text = f"{prefix}{text}" return text def _is_missing(value) -> bool: return ( value is None or pd.isna(value) or (isinstance(value, str) and not value.strip()) ) def _has_content(record: dict) -> bool: for value in record.values(): if not _is_missing(value): return True return False def _normalise_records(records: list[dict]) -> list[dict]: cleaned = [] append = cleaned.append previous_zone = None used_names = set() next_default_name_idx = 1 for record in records: if not isinstance(record, dict): continue record = record.copy() if not _has_content(record): continue zone = record.get("zone") if _is_missing(zone): record["zone"] = previous_zone if previous_zone else default_zone else: zone = zone.strip() if isinstance(zone, str) else zone record["zone"] = _normalize_label(zone, "Z") previous_zone = record["zone"] name = record.get("name") if _is_missing(name): while f"{default_stream_prefix}{next_default_name_idx}" in used_names: next_default_name_idx += 1 record["name"] = f"{default_stream_prefix}{next_default_name_idx}" next_default_name_idx += 1 else: record["name"] = _normalize_label(name, default_stream_prefix) used_names.add(record["name"]) append(record) return cleaned if sd is None: return [] if isinstance(sd, pd.DataFrame): if sd.empty: return sd records = sd.replace({pd.NA: None}).to_dict(orient="records") return pd.DataFrame(_normalise_records(records)) return _normalise_records(sd)
[docs] def validate_utility_data(ud: pd.DataFrame): """Drop utility rows without a name.""" if ud is None: return [] if isinstance(ud, pd.DataFrame): if ud.empty: return ud if "name" in ud.columns: name_series = ud["name"] valid = ~name_series.isna() if valid.any(): valid &= name_series.astype(str).str.strip() != "" ud = ud.loc[valid].reset_index(drop=True) return ud cleaned = [] append = cleaned.append for record in ud: if not isinstance(record, dict): continue name = record.get("name") if name is None or pd.isna(name): continue if isinstance(name, str) and not name.strip(): continue append(record) return cleaned