Source code for OpenPinch.services.common.problem_table_analysis

"""Problem Table generation and cascade utilities for pinch analysis.

This module collects the vectorised implementations of the temperature
interval problem table, cascade shifting logic, and helper routines used when
deriving zonal targets and plotting data for composite curves.
"""

import math
from typing import List, Tuple

import numpy as np

from ...classes.problem_table import ProblemTable
from ...classes.stream_collection import StreamCollection
from ...lib.config import tol
from ...lib.enums import PT
from ...lib.problem_table_types import ProblemTableUpdateKwargs
from ...services.common.gcc_manipulation import get_additional_GCCs
from ...utils.miscellaneous import (
    delta_with_zero_at_start,
    linear_interpolation,
)

__all__ = [
    "get_process_heat_cascade",
    "get_utility_heat_cascade",
    "create_problem_table_with_t_int",
    "problem_table_algorithm",
    "get_heat_recovery_target_from_pt",
    "set_zonal_targets",
]


################################################################################
# Public API
################################################################################


[docs] def get_process_heat_cascade( hot_streams: StreamCollection = None, cold_streams: StreamCollection = None, all_streams: StreamCollection = None, is_shifted: bool = True, known_heat_recovery: float = None, extra_T_intervals: list = None, is_full_analysis: bool = False, idx: int | None = None, ) -> ProblemTable: """Prepare, calculate, and analyse the problem table for given streams.""" if hot_streams is None: hot_streams = StreamCollection() if cold_streams is None: cold_streams = StreamCollection() if extra_T_intervals is None: extra_T_intervals = [] # Get all possible temperature intervals, remove duplicates, and order high to low. if all_streams is None: all_streams = hot_streams + cold_streams pt = create_problem_table_with_t_int( streams=all_streams, is_shifted=is_shifted, extra_T_intervals=extra_T_intervals, idx=idx, ) # Perform the heat cascade of the problem table problem_table_algorithm( pt=pt, hot_streams=hot_streams, cold_streams=cold_streams, is_shifted=is_shifted, idx=idx, ) if isinstance(known_heat_recovery, float): # Correct the cold composite curve and limiting GCC at real temperatures. _shift_pt_to_set_heat_recovery( pt=pt, heat_recovery_target=known_heat_recovery, current_heat_recovery=get_heat_recovery_target_from_pt(pt), ) heat_recovery_target = get_heat_recovery_target_from_pt(pt) if heat_recovery_target > tol: # Add additional temperature intervals for ease in targeting utility etc _insert_temperature_interval_into_pt_at_constant_h(pt) if is_full_analysis: get_additional_GCCs(pt, is_process_stream=True) return pt
[docs] def get_utility_heat_cascade( T_int_vals: np.ndarray, hot_utilities: StreamCollection = None, cold_utilities: StreamCollection = None, is_shifted: bool = True, idx: int | None = None, ) -> ProblemTableUpdateKwargs: """Prepare and calculate the utility heat cascade for a utility set.""" pt_ut = ProblemTable({PT.T: T_int_vals}) problem_table_algorithm( pt_ut, hot_utilities, cold_utilities, is_shifted, idx=idx, ) h_net_values = pt_ut[PT.H_NET] H_NET_UT = h_net_values.max() - h_net_values h_ut_cc = pt_ut[PT.H_HOT] c_ut_cc = pt_ut[PT.H_COLD] - pt_ut[PT.H_COLD].max() return { "T_col": T_int_vals, "updates": { PT.H_NET_UT: H_NET_UT, PT.H_HOT_UT: h_ut_cc, PT.H_COLD_UT: c_ut_cc, PT.RCP_HOT_UT: pt_ut[PT.RCP_HOT], PT.RCP_COLD_UT: pt_ut[PT.RCP_COLD], PT.RCP_UT_NET: pt_ut[PT.RCP_HOT] + pt_ut[PT.RCP_COLD], }, }
[docs] def create_problem_table_with_t_int( streams: StreamCollection = None, is_shifted: bool = True, extra_T_intervals: list | float | np.ndarray = None, idx: int | None = None, ) -> ProblemTable: """Return a problem table populated with ordered unique temperature intervals.""" if streams is None: streams = StreamCollection() if extra_T_intervals is None: extra_T_intervals = [] elif isinstance(extra_T_intervals, (int, float)): extra_T_intervals = [extra_T_intervals] elif isinstance(extra_T_intervals, np.ndarray): extra_T_intervals = extra_T_intervals.tolist() if is_shifted: T_vals = [ float(t) for s in streams for t in (s.t_min_star[idx], s.t_max_star[idx]) ] else: T_vals = [float(t) for s in streams for t in (s.t_min[idx], s.t_max[idx])] T_vals += extra_T_intervals dp = int(-math.log10(tol)) T_vals = np.array(T_vals).round(dp) return ProblemTable({PT.T: sorted(set(T_vals), reverse=True)})
[docs] def problem_table_algorithm( pt: ProblemTable, hot_streams: StreamCollection = None, cold_streams: StreamCollection = None, is_shifted: bool = True, idx: int | None = None, ) -> ProblemTable: """Fast calculation of the problem table using vectorized cascade formulas.""" # Sum m_dot*Cp contributions from hot streams per interval (sets CP_HOT and rCP_HOT) if hot_streams is not None: pt[PT.CP_HOT], pt[PT.RCP_HOT] = _sum_mcp_between_temperature_boundaries( pt[PT.T], hot_streams, is_shifted, idx=idx ) else: pt[PT.CP_HOT].fill(0.0) pt[PT.RCP_HOT].fill(0.0) # Sum m_dot*Cp contributions from cold streams per interval. if cold_streams is not None: pt[PT.CP_COLD], pt[PT.RCP_COLD] = _sum_mcp_between_temperature_boundaries( pt[PT.T], cold_streams, is_shifted, idx=idx ) else: pt[PT.CP_COLD].fill(0.0) pt[PT.RCP_COLD].fill(0.0) # ΔT_i = T_{i-1} - T_i pt[PT.DELTA_T] = delta_with_zero_at_start(pt[PT.T]) # ΔH_hot = ΔT * CP_hot pt[PT.DELTA_H_HOT] = pt[PT.DELTA_T] * pt[PT.CP_HOT] # H_hot = Σ ΔH_hot pt[PT.H_HOT] = np.cumsum(pt[PT.DELTA_H_HOT]) # ΔH_cold = ΔT * CP_cold pt[PT.DELTA_H_COLD] = pt[PT.DELTA_T] * pt[PT.CP_COLD] # H_cold = Σ ΔH_cold pt[PT.H_COLD] = np.cumsum(pt[PT.DELTA_H_COLD]) # CP_NET = CP_cold - CP_hot pt[PT.CP_NET] = pt[PT.CP_COLD] - pt[PT.CP_HOT] # ΔH_net = ΔT * CP_NET pt[PT.DELTA_H_NET] = pt[PT.DELTA_T] * pt[PT.CP_NET] # H_net = -Σ ΔH_net pt[PT.H_NET] = -np.cumsum(pt[PT.DELTA_H_NET]) # Shift cascades so minimum H_net is zero and the curves align at the pinch. pt[PT.H_HOT] = pt[PT.H_HOT][-1] - pt[PT.H_HOT] pt[PT.H_COLD] = (pt[PT.H_COLD][-1] + pt[PT.H_NET][-1] - pt[PT.H_NET].min()) - pt[ PT.H_COLD ] pt[PT.H_NET] -= pt[PT.H_NET].min() return pt
[docs] def get_heat_recovery_target_from_pt( pt: ProblemTable, ) -> float: """Compute the heat-recovery target implied by a solved problem table. Parameters ---------- pt: Problem table with populated ``H_hot`` and ``H_net`` columns. Returns ------- float Maximum direct heat recovery for the analysed zone. """ return pt.loc[0, PT.H_HOT] - pt.loc[-1, PT.H_NET]
[docs] def set_zonal_targets( pt: ProblemTable, pt_real: ProblemTable, ) -> dict: """Assign thermal targets and integration degree from process-table data.""" return { "hot_utility_target": pt.loc[0, PT.H_NET], "cold_utility_target": pt.loc[-1, PT.H_NET], "heat_recovery_target": pt.loc[0, PT.H_HOT] - pt.loc[-1, PT.H_NET], "heat_recovery_limit": pt_real.loc[0, PT.H_HOT] - pt_real.loc[-1, PT.H_NET], "degree_of_int": ( (pt.loc[0, PT.H_HOT] - pt.loc[-1, PT.H_NET]) / (pt_real.loc[0, PT.H_HOT] - pt_real.loc[-1, PT.H_NET]) if (pt_real.loc[0, PT.H_HOT] - pt_real.loc[-1, PT.H_NET]) > 0 else 1.0 ), }
################################################################################ # Helper functions ################################################################################ def _sum_mcp_between_temperature_boundaries( temperatures: List[float], streams: StreamCollection, is_shifted: bool = True, idx: int | None = None, ) -> Tuple[List[float], List[float], List[float], List[float]]: """Vectorized CP and rCP summation across temperature intervals.""" def calc_active_matrix(streams: StreamCollection, use_shifted: bool) -> np.ndarray: if use_shifted: t_min = np.asarray( [stream.t_min_star[idx] for stream in streams], dtype=float ) t_max = np.asarray( [stream.t_max_star[idx] for stream in streams], dtype=float ) else: t_min = np.asarray([stream.t_min[idx] for stream in streams], dtype=float) t_max = np.asarray([stream.t_max[idx] for stream in streams], dtype=float) lower_bounds = np.array(temperatures[1:]) upper_bounds = np.array(temperatures[:-1]) # Shape: (intervals, streams) active = (t_max[np.newaxis, :] > lower_bounds[:, np.newaxis] + tol * 10) & ( t_min[np.newaxis, :] < upper_bounds[:, np.newaxis] - tol * 10 ) return active def sum_cp_rcp(streams: StreamCollection, active: np.ndarray): cp = np.asarray([stream.CP[idx] for stream in streams], dtype=float) rcp = np.asarray([stream.rCP[idx] for stream in streams], dtype=float) cp_sum = active @ cp rcp_sum = active @ rcp return np.insert(cp_sum, 0, 0.0), np.insert(rcp_sum, 0, 0.0) is_active = calc_active_matrix(streams, is_shifted) cp_array, rcp_array = sum_cp_rcp(streams, is_active) return cp_array.tolist(), rcp_array.tolist() def _shift_pt_to_set_heat_recovery( pt: ProblemTable, heat_recovery_target: float, current_heat_recovery: float ) -> ProblemTable: """Shift H_COLD and H_NET columns to match targeted heat recovery levels.""" delta_shift = current_heat_recovery - heat_recovery_target pt[PT.H_COLD] += delta_shift pt[PT.H_NET] += delta_shift return pt def _insert_temperature_interval_into_pt_at_constant_h( pt: ProblemTable, ) -> ProblemTable: """Insert temperature intervals where HCC and CCC intersect at constant enthalpy.""" T_new = [] # --- HCC to CCC projection --- if pt[PT.H_HOT][0] > 0.0: T_new.append(_get_T_start_on_opposite_cc(pt, pt[PT.H_HOT][0], PT.H_COLD)) # --- CCC to HCC projection --- if pt[PT.H_COLD][-1] > 0.0: T_new.append(_get_T_start_on_opposite_cc(pt, pt[PT.H_COLD][-1], PT.H_HOT)) T_new = [t for t in T_new if t is not None] if len(T_new) > 0: pt.insert_temperature_interval(T_new) return pt def _get_T_start_on_opposite_cc( pt: ProblemTable, h0: float, col_CC: str | PT, col_T: str | PT = PT.T, ) -> float: """Find the temperature where composite curves intersect at constant enthalpy.""" temp_cc = pt[col_CC] - h0 if temp_cc.size < 2: return None zero_hits = np.flatnonzero(np.abs(temp_cc) < tol) if zero_hits.size > 0: return None transitions = np.flatnonzero((temp_cc[:-1] >= tol) & (temp_cc[1:] <= -tol)) if transitions.size != 1: return None idx = int(transitions[0]) cc_vals = pt[col_CC] T_vals = pt[col_T] T_new = linear_interpolation( h0, cc_vals[idx], cc_vals[idx + 1], T_vals[idx], T_vals[idx + 1], ) return T_new