Source code for OpenPinch.services.common.graph_data

"""Graph construction helpers for composite curves and related plots."""

from collections import defaultdict
from typing import Iterable, List, Optional, Tuple

import numpy as np

from ...classes.problem_table import ProblemTable
from ...classes.zone import Zone
from ...lib.config import tol
from ...lib.enums import GT, PT, ArrowHead, LineColour, StreamLoc
from ...lib.schemas.targets import BaseTargetModel
from ...utils.miscellaneous import clean_composite_curve
from .graph_series_meta import GRAPH_SERIES_META, GraphSeriesMeta

DECIMAL_PLACES = 2
GCC_VERTICAL_TOL = 1e-3


__all__ = ["get_output_graph_data"]


################################################################################
# Public API
################################################################################


[docs] def get_output_graph_data(zone: Zone, graph_sets: Optional[dict] = None) -> dict: """Returns Json data points for each process.""" if graph_sets is None: graph_sets = {} for target in zone.targets.values(): graph_sets[target.name] = _create_graph_set(target, zone=zone) if len(zone.subzones) > 0: for subzone in zone.subzones.values(): graph_sets = get_output_graph_data(subzone, graph_sets) return graph_sets
################################################################################ # Helper Functions ################################################################################ def _create_graph_set(t: BaseTargetModel, zone: Optional[Zone] = None) -> dict: """Creates Pinch Analysis and total site analysis graphs for a specifc zone.""" graphs: List[dict] = [] target_graphs = getattr(t, "graphs", {}) graph_title = t.name zone_name = getattr(zone, "name", None) or getattr(t, "zone_name", None) zone_address = getattr(zone, "address", None) if GT.CC.value in target_graphs: graphs.append( _make_composite_graph( graph_title=graph_title, key=GT.CC.value, data=target_graphs[GT.CC.value], label="Composite Curve", value_field=[PT.H_HOT, PT.H_COLD], stream_type=[StreamLoc.HotS, StreamLoc.ColdS], ) ) if GT.SCC.value in target_graphs: graphs.append( _make_composite_graph( graph_title=graph_title, key=GT.SCC.value, data=target_graphs[GT.SCC.value], label="Shifted Composite Curve", value_field=[PT.H_HOT, PT.H_COLD], stream_type=[StreamLoc.HotS, StreamLoc.ColdS], ) ) if GT.BCC.value in target_graphs: graphs.append( _make_composite_graph( graph_title=graph_title, key=GT.BCC.value, data=target_graphs[GT.BCC.value], label="Balanced Composite Curve", value_field=[PT.H_HOT_BAL, PT.H_COLD_BAL], stream_type=[StreamLoc.HotS, StreamLoc.ColdS], include_arrows=True, ) ) if GT.GCC.value in target_graphs: graphs.append( _make_gcc_graph( graph_title=graph_title, key=GT.GCC.value, data=target_graphs[GT.GCC.value], label="Grand Composite Curve", value_field=[ PT.H_NET, PT.H_NET_NP, PT.H_NET_V, PT.H_NET_A, PT.H_NET_UT, ], is_utility_profile=[False, False, False, False, True], ) ) if GT.GCC_R.value in target_graphs: graphs.append( _make_gcc_graph( graph_title=graph_title, key=GT.GCC_R.value, data=target_graphs[GT.GCC_R.value], label="Grand Composite Curve (Real)", value_field=[ PT.H_NET, PT.H_NET_UT, ], is_utility_profile=[False, True], ) ) if GT.NLP.value in target_graphs: graphs.append( _make_composite_graph( graph_title=graph_title, key=GT.NLP.value, data=target_graphs[GT.NLP.value], label="Net Load Curves", value_field=[ PT.H_NET_HOT, PT.H_NET_COLD, PT.H_HOT_UT, PT.H_COLD_UT, PT.H_HOT_HP, PT.H_COLD_HP, ], stream_type=[ StreamLoc.HotS, StreamLoc.ColdS, StreamLoc.HotU, StreamLoc.ColdU, StreamLoc.HotU, StreamLoc.ColdU, ], include_arrows=True, ) ) if GT.TSP.value in target_graphs: graphs.append( _make_composite_graph( graph_title=graph_title, key=GT.TSP.value, data=target_graphs[GT.TSP.value], value_field=[ PT.H_HOT, PT.H_COLD, PT.H_HOT_UT, PT.H_COLD_UT, ], stream_type=[ StreamLoc.HotS, StreamLoc.ColdS, StreamLoc.HotU, StreamLoc.ColdU, ], label="Total Site Profiles", include_arrows=True, ) ) if GT.SUGCC.value in target_graphs: graphs.append( _make_gcc_graph( graph_title=graph_title, key=GT.SUGCC.value, data=target_graphs[GT.SUGCC.value], label="Site Utility Grand Composite Curve", value_field=[PT.H_NET_UT], is_utility_profile=[True], ) ) if GT.GCC_HP.value in target_graphs: graphs.append( _make_gcc_graph( graph_title=graph_title, key=GT.GCC_HP.value, data=target_graphs[GT.GCC_HP.value], label="Grand Composite Curve with Heat Pump", value_field=[PT.H_NET_W_AIR, PT.H_NET_HP], is_utility_profile=[False, True], ) ) return { "name": graph_title, "target_type": getattr(t, "type", None), "state_id": getattr(t, "state_id", None), "zone_name": zone_name, "zone_address": zone_address, "graphs": graphs, } def _make_composite_graph( graph_title: str, key: str, data, label: str, *, value_field, stream_type, name: Optional[str] = None, include_arrows: bool = True, decolour: bool = False, ): temperatures = _column_to_list(data, PT.T) fields = _normalise_graph_fields(value_field) stream_types = _normalise_graph_values( stream_type, len(fields), "`value_field` and `stream_type` must have the same length.", ) segments: List[dict] = [] for field, stream_loc in zip(fields, stream_types): column_key = _column_key(field) x_vals = _column_to_list(data, column_key) if not _should_plot_series(x_vals): continue segments.extend( _graph_cc( key, stream_loc, temperatures, x_vals, column_key=column_key, include_arrows=include_arrows, decolour=decolour, ) ) return { "type": key, "name": name or f"{label}: {graph_title}", "segments": segments, } def _normalise_graph_fields(value_field) -> List: if isinstance(value_field, str) or not hasattr(value_field, "__iter__"): return [value_field] return list(value_field) def _normalise_graph_values(value, count: int, mismatch_message: str) -> List: if isinstance(value, str) or not hasattr(value, "__iter__"): values = [value] * count else: values = list(value) if len(values) != count: raise ValueError(mismatch_message) return values def _normalise_gcc_flags(flag, count: int) -> List[bool]: flags = _normalise_graph_values( flag if flag is not None else False, count, "`value_field` and `is_utility_profile` must have the same length.", ) try: return [bool(item) for item in flags] except TypeError as exc: raise ValueError( "`is_utility_profile` values must be coercible to bool." ) from exc def _column_key(field) -> str: return getattr(field, "value", field) def _series_meta_from_key(column_key: str) -> GraphSeriesMeta: meta = GRAPH_SERIES_META.get(column_key) if meta is not None: return meta label = str(column_key) return GraphSeriesMeta(label, label) def _build_gcc_segments( y_vals: Iterable[float], x_vals: Iterable[float], *, series_id: str, meta: GraphSeriesMeta, is_utility_profile: bool, decolour: bool, ) -> List[dict]: y_vals, x_vals = clean_composite_curve(y_vals, x_vals) counts: dict[StreamLoc, int] = defaultdict(int) segments: List[dict] = [] for stream_loc, is_vertical, x_seg, y_seg in _iter_gcc_segment_slices( x_vals, y_vals, is_utility_profile, meta.preferred_stream_loc, ): counts[stream_loc] += 1 title = _format_segment_title(meta, counts[stream_loc]) colour = ( LineColour.Black.value if is_vertical or decolour else _streamloc_colour(stream_loc) ) segments.append( _create_curve( title=title, colour=colour, x_vals=x_seg, y_vals=y_seg, arrow=ArrowHead.NO_ARROW.value, series_label=meta.label, series_id=series_id, series_description=meta.description, is_vertical=is_vertical, is_utility_stream=stream_loc in {StreamLoc.HotU, StreamLoc.ColdU}, ) ) return segments def _iter_gcc_segment_slices( x_vals: List[float], y_vals: List[float], is_utility_profile: bool, preferred_stream_loc: Optional[StreamLoc], ): start, end = _segment_bounds(x_vals) j = start while j < end: classified = _classify_segment(x_vals[j] - x_vals[j + 1], is_utility_profile) next_j = j + 1 while next_j < end: next_class = _classify_segment( x_vals[next_j] - x_vals[next_j + 1], is_utility_profile ) if next_class != classified: break next_j += 1 raw_loc = _segment_streamloc(classified) is_vertical = raw_loc == StreamLoc.Unassigned stream_loc = ( preferred_stream_loc if is_vertical and preferred_stream_loc else raw_loc ) yield stream_loc, is_vertical, x_vals[j : next_j + 1], y_vals[j : next_j + 1] j = next_j def _segment_bounds(x_vals: List[float]) -> Tuple[int, int]: start = next( (i for i in range(len(x_vals) - 1) if abs(x_vals[i] - x_vals[i + 1]) > tol), 0 ) end = next( ( i for i in range(len(x_vals) - 1, 0, -1) if abs(x_vals[i] - x_vals[i - 1]) > tol ), len(x_vals) - 1, ) return start, end def _format_segment_title(meta: GraphSeriesMeta, index: int) -> str: base = meta.description or meta.label or "Segment" return f"{base} {index}" def _make_gcc_graph( graph_title: str, key: str, data, label: str, *, value_field, name: Optional[str] = None, is_utility_profile: bool = False, decolour: bool = False, ): temperatures = _column_to_list(data, PT.T) fields = _normalise_graph_fields(value_field) flags = _normalise_gcc_flags(is_utility_profile, len(fields)) segments: List[dict] = [] for field, utility_flag in zip(fields, flags): column_key = _column_key(field) x_vals = _column_to_list(data, column_key) if not _should_plot_series(x_vals): continue meta = _series_meta_from_key(column_key) segments.extend( _build_gcc_segments( temperatures, x_vals, series_id=f"{key}:{column_key}", meta=meta, is_utility_profile=utility_flag, decolour=decolour, ) ) return { "type": key, "name": name or f"{label}: {graph_title}", "segments": segments, } def _graph_cc( key: str, stream_loc, y_vals: List[float], x_vals: List[float], *, column_key: Optional[str] = None, include_arrows: bool = True, decolour: bool = False, ) -> List[dict]: """Plots a (shifted) hot or cold composite curve.""" # Clean composite y_vals, x_vals = clean_composite_curve(y_vals, x_vals) if not isinstance(stream_loc, StreamLoc): candidate = getattr(stream_loc, "value", stream_loc) try: stream_loc = StreamLoc(candidate) except ValueError: aliases = { "Hot": StreamLoc.HotS, "Cold": StreamLoc.ColdS, } if candidate in aliases: stream_loc = aliases[candidate] else: raise ValueError( "Unrecognised composite curve stream location." ) from None title_map = { StreamLoc.HotS: "Hot CC", StreamLoc.ColdS: "Cold CC", StreamLoc.HotU: "Hot Utility", StreamLoc.ColdU: "Cold Utility", } if key not in [GT.TSP.value]: arrow_map = { StreamLoc.HotS: ArrowHead.END.value, StreamLoc.HotU: ArrowHead.END.value, StreamLoc.ColdS: ArrowHead.START.value, StreamLoc.ColdU: ArrowHead.START.value, } else: arrow_map = { StreamLoc.HotS: ArrowHead.START.value, StreamLoc.HotU: ArrowHead.START.value, StreamLoc.ColdS: ArrowHead.END.value, StreamLoc.ColdU: ArrowHead.END.value, } if stream_loc not in title_map: raise ValueError("Unrecognised composite curve stream location.") base_colour = _streamloc_colour(stream_loc) colour = LineColour.Black.value if decolour else base_colour arrow = arrow_map[stream_loc] if include_arrows else ArrowHead.NO_ARROW.value meta = _composite_series_meta(stream_loc, column_key) return [ _create_curve( title=meta.composite_title, colour=colour, arrow=arrow, x_vals=x_vals, y_vals=y_vals, series_label=meta.label if column_key is not None else None, series_id=f"{key}:{column_key}" if column_key is not None else None, series_description=meta.description if column_key is not None else None, ) ] def _composite_series_meta( stream_loc: StreamLoc, column_key: Optional[str], ) -> GraphSeriesMeta: """Return the display metadata for one Composite Curve series.""" if column_key is not None: meta = _series_meta_from_key(column_key) if meta.composite_title is not None: return meta title_map = { StreamLoc.HotS: "Hot CC", StreamLoc.ColdS: "Cold CC", StreamLoc.HotU: "Hot Utility", StreamLoc.ColdU: "Cold Utility", } default_title = title_map[stream_loc] return GraphSeriesMeta( label=default_title, description=default_title, composite_title=default_title, ) def _should_plot_series(values: List[float]) -> bool: """Return ``True`` when a graph series contains meaningful non-zero values.""" try: numeric = np.asarray(values, dtype=float) except TypeError, ValueError: numeric = np.array( [float(value) for value in values if value is not None], dtype=float, ) finite = numeric[np.isfinite(numeric)] if finite.size == 0: return False return bool(np.any(np.abs(finite) > tol)) def _column_to_list(data, column_key: str) -> List[float]: """Return the requested column from ``data`` as a Python list.""" if not isinstance(column_key, str): column_key = getattr(column_key, "value", column_key) try: if isinstance(data, ProblemTable): column = data[column_key] elif ( hasattr(data, "col") and hasattr(data, "columns") and column_key in getattr(data, "columns", []) ): column = data.col[column_key] elif isinstance(data, dict): column = data[column_key] else: column = data[column_key] except (KeyError, AttributeError, TypeError) as exc: raise KeyError( f"Column '{column_key}' not found in graph data payload." ) from exc if hasattr(column, "to_list"): return column.to_list() if hasattr(column, "tolist"): return column.tolist() return list(column) def _classify_segment(enthalpy_diff: float, is_utility_profile: bool) -> str: if abs(enthalpy_diff) <= GCC_VERTICAL_TOL: return StreamLoc.Unassigned if enthalpy_diff > 0: return StreamLoc.ColdS if not is_utility_profile else StreamLoc.HotU if enthalpy_diff < 0: return StreamLoc.HotS if not is_utility_profile else StreamLoc.ColdU return StreamLoc.Unassigned def _segment_streamloc(segment_type: str) -> StreamLoc: """Map a segment classification to a :class:`StreamLoc`.""" if isinstance(segment_type, StreamLoc): return segment_type if segment_type == StreamLoc.ColdS.value: return StreamLoc.ColdS if segment_type == StreamLoc.HotS.value: return StreamLoc.HotS if segment_type == StreamLoc.HotU.value: return StreamLoc.HotU if segment_type == StreamLoc.ColdU.value: return StreamLoc.ColdU return StreamLoc.Unassigned def _streamloc_colour(stream_loc: StreamLoc) -> int: """Return the default colour for a given stream location.""" if stream_loc == StreamLoc.HotS: return LineColour.HotS.value if stream_loc == StreamLoc.ColdS: return LineColour.ColdS.value if stream_loc == StreamLoc.HotU: return LineColour.HotU.value if stream_loc == StreamLoc.ColdU: return LineColour.ColdU.value return LineColour.Other.value def _create_curve( title: str, colour: int, x_vals, y_vals, arrow=ArrowHead.NO_ARROW.value, series_label: Optional[str] = None, series_id: Optional[str] = None, series_description: Optional[str] = None, is_vertical: Optional[bool] = None, is_utility_stream: Optional[bool] = None, ) -> dict: """Creates an individual curve from data points.""" curve = {"title": title, "colour": colour, "arrow": arrow} if series_label is not None: curve["series"] = series_label if series_id is not None: curve["series_id"] = series_id if series_description is not None: curve["series_description"] = series_description if is_vertical is not None: curve["is_vertical"] = bool(is_vertical) if is_utility_stream is not None: curve["is_utility_stream"] = bool(is_utility_stream) curve["data_points"] = [ {"x": round(x, DECIMAL_PLACES), "y": round(y, DECIMAL_PLACES)} for x, y in zip(x_vals, y_vals) if x is not None and y is not None ] return curve