Source code for OpenPinch.services.common.gcc_manipulation

"""Determine various forms of the grand composite curve."""

from typing import Tuple

import numpy as np

from ...classes.problem_table import ProblemTable
from ...lib.config import tol
from ...lib.enums import PT
from ...lib.problem_table_types import ProblemTableUpdateKwargs
from ...utils.miscellaneous import delta_with_zero_at_start, linear_interpolation

__all__ = [
    "get_additional_GCCs",
    "get_GCC_without_pockets",
    "get_GCC_with_partial_pockets",
    "get_GCC_with_vertical_heat_transfer",
    "get_GCC_needing_utility",
    "get_GGC_pockets",
    "get_seperated_gcc_heat_load_profiles",
]


################################################################################
# Public API
################################################################################


[docs] def get_additional_GCCs( pt: ProblemTable, do_vert_cc_calc: bool = False, do_assisted_ht_calc: bool = False, assisted_ht_dt_cut: float = 10.0, assisted_ht_dt_cut_min: float = 0.0, is_process_stream: bool = True, ) -> ProblemTable: """Populate derived GCC variants used by utility and integration targeting. Parameters ---------- pt: Problem table containing baseline ``H_net`` and Composite Curve columns. do_vert_cc_calc: Enable vertical heat-transfer transformation for assisted integration. do_assisted_ht_calc: Enable pocket extraction and assisted heat-transfer GCC adjustments. Returns ------- ProblemTable Updated problem table with additional GCC-related columns. """ # Calculate various GCC profiles get_GCC_without_pockets(pt) actual_h_net = pt[PT.H_NET_NP] if do_vert_cc_calc: pt.update( **get_GCC_with_vertical_heat_transfer( T_col=pt[PT.T], h_cold=pt[PT.H_COLD], h_hot=pt[PT.H_HOT], h_net=pt[PT.H_NET], ) ) actual_h_net = pt[PT.H_NET_V] if do_assisted_ht_calc: pt.update( **get_GCC_with_partial_pockets( T_col=pt[PT.T], h_net=pt[PT.H_NET], h_net_np=pt[PT.H_NET_NP], dt_cut=assisted_ht_dt_cut, dt_cut_min=assisted_ht_dt_cut_min, ) ) actual_h_net = pt[PT.H_NET_AI] else: pt.update( **get_GGC_pockets( T_col=pt[PT.T], h_net=pt[PT.H_NET], h_net_np=pt[PT.H_NET_NP], ) ) pt.update( T_col=pt[PT.T], updates={PT.H_NET_AI: np.asarray(pt[PT.H_NET_NP], dtype=float)}, ) pt.update( **get_GCC_needing_utility( T_col=pt[PT.T], h_net=actual_h_net, ) ) pt.update( **get_seperated_gcc_heat_load_profiles( T_col=pt[PT.T], H_net=pt[PT.H_NET_A], is_process_stream=is_process_stream, ) ) return pt
[docs] def get_GCC_without_pockets( pt: ProblemTable, col_H_NP: str | PT = PT.H_NET_NP, col_H: str | PT = PT.H_NET, ) -> Tuple[ProblemTable, ProblemTable]: """Flatten GCC pockets by inserting breakpoints so the profile becomes monotonic.""" pt[col_H_NP] = pt[col_H] hot_pinch_loc, cold_pinch_loc, valid = pt.pinch_idx(col_H) if not valid: return pt # Remove any possible pocket segments between the pinches if hot_pinch_loc + 1 < cold_pinch_loc: for j in range(hot_pinch_loc + 1, cold_pinch_loc): pt.loc[j, col_H_NP] = 0 # Remove pocket segments above the Pinch pt, hot_pinch_loc, cold_pinch_loc = _remove_pockets_on_one_side_of_the_pinch( pt, col_H_NP, col_H, hot_pinch_loc, cold_pinch_loc, True ) # Remove pocket segments below the Pinch pt, hot_pinch_loc, cold_pinch_loc = _remove_pockets_on_one_side_of_the_pinch( pt, col_H_NP, col_H, hot_pinch_loc, cold_pinch_loc, False ) return pt
[docs] def get_GCC_with_partial_pockets( T_col: np.ndarray, h_net: np.ndarray, h_net_np: np.ndarray, dt_cut: float = 10, dt_cut_min: float = 0, ) -> ProblemTableUpdateKwargs: """Return GCC updates for the assisted profile after partially cutting pockets.""" dt_cut = max(float(dt_cut), float(dt_cut_min)) T_col = np.asarray(T_col, dtype=float) h_net = np.asarray(h_net, dtype=float) h_net_np = np.asarray(h_net_np, dtype=float) h_net_pk = _normalise_pocket_profile(np.subtract(h_net, h_net_np)) pocket_pt = ProblemTable( { PT.T: T_col, PT.H_NET: h_net, PT.H_NET_NP: h_net_np, PT.H_NET_PK: h_net_pk, PT.H_NET_AI: h_net - h_net_pk, } ) if np.nansum(pocket_pt[PT.H_NET_PK]) <= tol * len(pocket_pt): return _assisted_gcc_updates(pocket_pt) scan_idx = 0 while True: h_pockets = pocket_pt[PT.H_NET_PK] run_bounds = _next_positive_run(h_pockets, start_idx=scan_idx) if run_bounds is None: break run_start, run_stop = run_bounds if run_start == 0 or run_stop >= len(pocket_pt): scan_idx = run_stop continue t_vals = pocket_pt[PT.T] i_upper = run_start - 1 i_lower = run_stop if t_vals[i_upper] - t_vals[i_lower] < dt_cut - tol: scan_idx = run_stop continue if i_upper + 2 < i_lower: i_upper, i_lower = _shrink_pocket_to_dt_cut_zone( t_vals=t_vals, h_pockets=h_pockets, i_upper=i_upper, i_lower=i_lower, dt_cut=dt_cut, ) h_cut = _solve_assisted_pocket_cut_height( t_vals=t_vals, h_pockets=h_pockets, i_upper=i_upper, i_lower=i_lower, dt_cut=dt_cut, ) if h_cut is None or h_cut <= tol: scan_idx = run_stop continue cut_temps = _cut_temperatures_needing_insertion( t_vals=t_vals, h_pockets=h_pockets, i_upper=i_upper, i_lower=i_lower, h_cut=h_cut, dt_cut=dt_cut, ) if cut_temps.size > 0: pocket_pt.insert_temperature_interval(cut_temps) h_pockets = pocket_pt[PT.H_NET_PK] run_bounds = _next_positive_run(h_pockets, start_idx=max(i_upper, 0)) if run_bounds is None: break run_start, run_stop = run_bounds _apply_h_cut_to_positive_run( h_pockets=pocket_pt[PT.H_NET_PK], run_start=run_start, run_stop=run_stop, h_cut=h_cut, ) scan_idx = run_stop pocket_pt[PT.H_NET_PK] = _normalise_pocket_profile(pocket_pt[PT.H_NET_PK]) pocket_pt[PT.H_NET_AI] = pocket_pt[PT.H_NET] - pocket_pt[PT.H_NET_PK] return _assisted_gcc_updates(pocket_pt)
[docs] def get_GCC_with_vertical_heat_transfer( T_col: np.ndarray, h_cold: np.ndarray, h_hot: np.ndarray, h_net: np.ndarray, ) -> ProblemTableUpdateKwargs: """Return the extreme GCC with vertical heat transfer on the composite curves.""" h_cold = np.asarray(h_cold) h_hot = np.asarray(h_hot) h_net = np.asarray(h_net) hcc_max = h_hot[0] base = np.where(h_cold > hcc_max, h_cold - hcc_max, 0.0) cu_tar = h_net[-1] h_net_v = np.where( h_hot < cu_tar, cu_tar - h_hot, base, ) return {"T_col": T_col, "updates": {PT.H_NET_V: h_net_v}}
[docs] def get_GCC_needing_utility( T_col: np.ndarray, h_net: np.ndarray, ) -> ProblemTableUpdateKwargs: """Return the actual GCC.""" return {"T_col": T_col, "updates": {PT.H_NET_A: h_net}}
[docs] def get_GGC_pockets( T_col: np.ndarray, h_net: np.ndarray, h_net_np: np.ndarray, ) -> ProblemTableUpdateKwargs: """Store GCC pocket contribution. This is the difference between the real and pocket-free GCC profiles. """ h_net_pk = _normalise_pocket_profile(np.subtract(h_net, h_net_np)) return { "T_col": np.asarray(T_col, dtype=float), "updates": {PT.H_NET_PK: h_net_pk}, }
[docs] def get_seperated_gcc_heat_load_profiles( T_col: np.ndarray, H_net: np.ndarray, rcp_net: np.ndarray = None, is_process_stream: bool = True, ) -> ProblemTableUpdateKwargs: """Determine the net required heating or cooling profile from the GCC.""" # Calculate ΔH differences dh_diff = delta_with_zero_at_start(H_net) # Determine whether each row corresponds to a hot-side or cold-side enthalpy change if is_process_stream: is_hot = dh_diff <= 0 is_cold = ~is_hot else: is_cold = dh_diff <= 0 is_hot = ~is_cold # Compute cumulative enthalpy change hot_profile = np.cumsum(-dh_diff * is_hot) cold_profile = np.cumsum(-dh_diff * is_cold) # Handle RCP (HTR x CP) if not is_process_stream: rcp_hot = rcp_net * is_hot rcp_cold = rcp_net * is_cold # Normalize hot profile to start at x=0 and cold profile to end at x=0 if is_process_stream: hot_profile *= -1 hut_max = -cold_profile[-1] cold_profile = cold_profile + hut_max else: cold_profile *= -1 hut_max = -hot_profile[-1] hot_profile = hot_profile + hut_max updates = ( { PT.H_NET_HOT: hot_profile, PT.H_NET_COLD: cold_profile, } if is_process_stream else { PT.H_HOT_UT: hot_profile, PT.H_COLD_UT: cold_profile, PT.RCP_HOT_UT: rcp_hot, PT.RCP_COLD_UT: rcp_cold, } ) return {"T_col": T_col, "updates": updates}
################################################################################ # Helper functions ################################################################################ def _remove_pockets_on_one_side_of_the_pinch( pt: ProblemTable, col_H_NP: str | PT = PT.H_NET_NP, col_H: str | PT = PT.H_NET, hot_pinch_loc: int = None, cold_pinch_loc: int = None, is_above_pinch: bool = True, ) -> Tuple[ProblemTable, ProblemTable]: """Iteratively eliminate pockets above or below the pinch.""" # Settings for removing pocket segments for above/below the pinch if is_above_pinch: i = 0 pinch_loc = hot_pinch_loc sgn = 1 else: i = len(pt) - 1 pinch_loc = cold_pinch_loc sgn = -1 T_vals, H_vals, H_NP_vals = pt[PT.T], pt[col_H], pt[col_H_NP] if H_vals[i] < tol: # No heating or cooling required return pt, hot_pinch_loc, cold_pinch_loc for _ in range(i, pinch_loc, sgn): di = sgn n_int_added = 0 if H_vals[i] < H_vals[i + sgn] - tol: i_0 = i i = _pocket_exit_index(H_vals, i, pinch_loc, sgn) if i != pinch_loc: T0 = linear_interpolation( H_vals[i_0], H_vals[i], H_vals[i + sgn], T_vals[i], T_vals[i + sgn] ) n_int_added = pt.insert_temperature_interval(T0) if n_int_added > 0: T_vals, H_vals, H_NP_vals = ( pt[PT.T], pt[col_H], pt[col_H_NP], ) if is_above_pinch: hot_pinch_loc += n_int_added cold_pinch_loc += n_int_added pinch_loc += n_int_added else: i_0 += n_int_added j_rng = range(i_0 + 1, i + 1) if is_above_pinch else range(i + 1, i_0) for j in j_rng: H_NP_vals[j] = H_vals[i_0] di = n_int_added * sgn i += di if (pinch_loc - i) * sgn <= 0: break return pt, hot_pinch_loc, cold_pinch_loc def _pocket_exit_index(H_vals: np.ndarray, i_0: int, pinch_loc: int, sgn: int) -> int: """Return index where a pocket terminates when marching in direction ``sgn``.""" if sgn > 0: for i in range(i_0 + 1, pinch_loc + 1): if H_vals[i_0] >= H_vals[i] + tol: return i - 1 return pinch_loc else: for i in range(i_0 - 1, pinch_loc - 1, -1): if H_vals[i_0] >= H_vals[i] + tol: return i + 1 return pinch_loc def _assisted_gcc_updates(pocket_pt: ProblemTable) -> ProblemTableUpdateKwargs: """Return aligned assisted GCC updates from a local pocket ProblemTable.""" return { "T_col": pocket_pt[PT.T], "updates": { PT.H_NET_PK: pocket_pt[PT.H_NET_PK], PT.H_NET_AI: pocket_pt[PT.H_NET_AI], }, } def _normalise_pocket_profile(h_pockets: np.ndarray) -> np.ndarray: """Clamp a pocket profile to non-negative values and zero near-tolerance noise.""" h_pockets = np.clip(np.asarray(h_pockets, dtype=float), 0.0, None) h_pockets[np.abs(h_pockets) <= tol] = 0.0 return h_pockets def _next_positive_run( h_pockets: np.ndarray, start_idx: int = 0, ) -> tuple[int, int] | None: """Return ``(start, stop)`` for the next contiguous positive pocket run.""" positive_idx = np.flatnonzero(h_pockets[start_idx:] > tol) if positive_idx.size == 0: return None run_start = start_idx + int(positive_idx[0]) non_positive_idx = np.flatnonzero(h_pockets[run_start:] <= tol) run_stop = ( run_start + int(non_positive_idx[0]) if non_positive_idx.size > 0 else h_pockets.size ) return run_start, run_stop def _shrink_pocket_to_dt_cut_zone( t_vals: np.ndarray, h_pockets: np.ndarray, i_upper: int, i_lower: int, dt_cut: float, ) -> tuple[int, int]: """Restrict a pocket to the top and bottom segments that bracket the cut region.""" k = i_lower j_top = i_upper + 1 for j_top in range(i_upper + 1, i_lower): while k > j_top and h_pockets[j_top] - h_pockets[k] > tol: k -= 1 if abs(t_vals[j_top] - t_vals[k]) <= dt_cut + tol or j_top + 1 >= k: break i_upper = j_top - 1 k = i_upper j_bottom = i_lower - 1 for j_bottom in range(i_lower - 1, i_upper, -1): while k < j_bottom and h_pockets[j_bottom] - h_pockets[k] > tol: k += 1 if abs(t_vals[j_bottom] - t_vals[k]) <= dt_cut + tol or j_bottom <= k: break i_lower = j_bottom + 1 return i_upper, i_lower def _solve_assisted_pocket_cut_height( t_vals: np.ndarray, h_pockets: np.ndarray, i_upper: int, i_lower: int, dt_cut: float, ) -> float | None: """Return the cut height solving ``T_hot(H) - T_cold(H) = dt_cut``.""" dh_top = h_pockets[i_upper] - h_pockets[i_upper + 1] dh_bottom = h_pockets[i_lower - 1] - h_pockets[i_lower] if abs(dh_top) <= tol or abs(dh_bottom) <= tol: return None m1 = (t_vals[i_upper] - t_vals[i_upper + 1]) / dh_top c1 = t_vals[i_upper] - m1 * h_pockets[i_upper] m2 = (t_vals[i_lower - 1] - t_vals[i_lower]) / dh_bottom c2 = t_vals[i_lower] - m2 * h_pockets[i_lower] denom = m2 - m1 if abs(denom) <= tol: return None h_cut = (c1 - c2 - dt_cut) / denom if not np.isfinite(h_cut): return None hot_bounds = sorted((h_pockets[i_upper], h_pockets[i_upper + 1])) cold_bounds = sorted((h_pockets[i_lower - 1], h_pockets[i_lower])) h_min = max(hot_bounds[0], cold_bounds[0]) h_max = min(hot_bounds[1], cold_bounds[1]) if h_cut < h_min - tol or h_cut > h_max + tol: return None return min(max(h_cut, h_min), h_max) def _cut_temperatures_needing_insertion( t_vals: np.ndarray, h_pockets: np.ndarray, i_upper: int, i_lower: int, h_cut: float, dt_cut: float, ) -> np.ndarray: """Return new breakpoint temperatures for the residual ``h_cut`` intersections.""" if not ( i_upper < i_lower - 1 and t_vals[i_upper + 1] - t_vals[i_lower - 1] - dt_cut < -tol ): return np.empty(0, dtype=float) cut_temps = [ temp for temp in ( _segment_cut_temperature( t_vals=t_vals, h_pockets=h_pockets, left_idx=i_upper, right_idx=i_upper + 1, h_cut=h_cut, ), _segment_cut_temperature( t_vals=t_vals, h_pockets=h_pockets, left_idx=i_lower - 1, right_idx=i_lower, h_cut=h_cut, ), ) if temp is not None ] if not cut_temps: return np.empty(0, dtype=float) return np.asarray(cut_temps, dtype=float) def _segment_cut_temperature( t_vals: np.ndarray, h_pockets: np.ndarray, left_idx: int, right_idx: int, h_cut: float, ) -> float | None: """Return the breakpoint temperature where one segment crosses ``h_cut``.""" if ( abs(h_pockets[left_idx] - h_cut) <= tol or abs(h_pockets[right_idx] - h_cut) <= tol ): return None return linear_interpolation( h_cut, h_pockets[right_idx], h_pockets[left_idx], t_vals[right_idx], t_vals[left_idx], ) def _apply_h_cut_to_positive_run( h_pockets: np.ndarray, run_start: int, run_stop: int, h_cut: float, ) -> None: """Apply the cut height across one positive run and clip exhausted tails to zero.""" segment = slice(run_start, run_stop) h_pockets[segment] = _normalise_pocket_profile(h_pockets[segment] - h_cut)