Source code for OpenPinch.services.indirect_heat_integration.indirect_integration_entry

"""Indirect heat-integration entry point for total site style targeting.

The routines in this module aggregate process-level direct-integration outputs
from subzones, construct site process/utility cascades, and calculate net
utility balances after feasible inter-zone heat recovery.
"""

from copy import deepcopy
from typing import Tuple

import numpy as np

from ...classes.problem_table import ProblemTable
from ...classes.stream_collection import StreamCollection
from ...classes.zone import Zone
from ...lib.config import tol
from ...lib.enums import GT, PT, TT
from ...lib.problem_table_types import ProblemTableUpdateKwargs
from ...lib.schemas.targets import TotalProcessTarget, TotalSiteTarget
from ...utils.miscellaneous import get_state_index
from ..common.problem_table_analysis import (
    get_process_heat_cascade,
)

__all__ = [
    "compute_total_subzone_utility_targets",
    "compute_indirect_integration_targets",
]


################################################################################
# Public API
################################################################################


[docs] def compute_total_subzone_utility_targets( zone: Zone, args: dict | None = None, ) -> TotalProcessTarget: """Sums and records zonal targets.""" # Sum targets from subzones idx, sid = get_state_index(state_ids=zone.state_ids, args=args) hot_utility_target = cold_utility_target = heat_recovery_target = 0.0 utility_cost = num_units = area = 0.0 hot_utilities = deepcopy(zone.hot_utilities).set_common_stream_attribute( attr_name="heat_flow", value=0.0, idx=idx ) cold_utilities = deepcopy(zone.cold_utilities).set_common_stream_attribute( attr_name="heat_flow", value=0.0, idx=idx ) for subzone in zone.subzones.values(): t = subzone.targets[TT.DI.value] hot_utility_target += t.hot_utility_target cold_utility_target += t.cold_utility_target heat_recovery_target += t.heat_recovery_target utility_cost += t.utility_cost for j in range(len(hot_utilities)): hot_utilities[j].set_value_attr_at_state_idx( attr_name="heat_flow", value=hot_utilities[j].heat_flow[idx] + t.hot_utilities[j].heat_flow[idx], idx=idx, ) for j in range(len(cold_utilities)): cold_utilities[j].set_value_attr_at_state_idx( attr_name="heat_flow", value=cold_utilities[j].heat_flow[idx] + t.cold_utilities[j].heat_flow[idx], idx=idx, ) if area > tol: num_units += t.num_units area += t.area # capital_cost = t.capital_cost heat_recovery_limit = zone.targets[TT.DI.value].heat_recovery_limit output = { "zone_name": zone.name, "type": TT.TZ.value, "parent_zone": zone.parent_zone, "config": zone.config, "hot_utilities": hot_utilities, "cold_utilities": cold_utilities, "hot_utility_target": hot_utility_target, "cold_utility_target": cold_utility_target, "heat_recovery_target": heat_recovery_target, "heat_recovery_limit": heat_recovery_limit, "degree_of_int": ( (heat_recovery_target / heat_recovery_limit) if heat_recovery_limit > 0 else 1.0 ), "utility_cost": utility_cost, "state_id": sid, "state_idx": idx, } return TotalProcessTarget.model_validate(output)
[docs] def compute_indirect_integration_targets( zone: Zone, args: dict | None = None, ) -> TotalSiteTarget: """Compute indirect integration targets for an aggregated zone. The routine assumes the relevant child zones have already been solved for direct integration. It then sums subzone targets, builds site-level net stream cascades, performs utility-to-utility balancing, and records the resulting Total Site target on ``zone`` before returning it. """ idx, sid = get_state_index(state_ids=zone.state_ids, args=args) s_tzt = zone.targets[TT.TZ.value] if len(zone.net_hot_streams) == 0 and len(zone.net_cold_streams) == 0: return None # Total site profiles - process side pt = get_process_heat_cascade( hot_streams=zone.net_hot_streams, cold_streams=zone.net_cold_streams, is_shifted=True, # Align a second shift with the real utility scale. idx=idx, ) pt.update( **_shift_site_process_profiles( T_col=pt[PT.T], H_hot=pt[PT.H_HOT], H_cold=pt[PT.H_COLD], ) ) # Apply the problem table algorithm to subzone utility use pt.update( **_build_site_utility_profile( hot_utilities=s_tzt.hot_utilities, cold_utilities=s_tzt.cold_utilities, is_shifted=False, idx=idx, ) ) # Extract overall heat integration targets hot_utility_target = pt.loc[0, PT.H_NET_UT] cold_utility_target = pt.loc[-1, PT.H_NET_UT] heat_recovery_target = s_tzt.heat_recovery_target + ( s_tzt.hot_utility_target - hot_utility_target ) hot_pinch, cold_pinch = pt.pinch_temperatures(col_H=PT.H_NET_UT) # Apply the utility targeting method to determine the net utility use and generation hot_utilities, cold_utilities = _match_utility_gen_and_use_at_same_level( hot_utilities=deepcopy(s_tzt.hot_utilities), cold_utilities=deepcopy(s_tzt.cold_utilities), idx=idx, ) output = { "zone_name": zone.name, "type": TT.TS.value, "parent_zone": zone.parent_zone, "config": zone.config, "pt": pt, "graphs": _save_graph_data(pt), "hot_pinch": hot_pinch, "cold_pinch": cold_pinch, "hot_utilities": hot_utilities, "cold_utilities": cold_utilities, "hot_utility_target": hot_utility_target, "cold_utility_target": cold_utility_target, "heat_recovery_target": heat_recovery_target, "heat_recovery_limit": s_tzt.heat_recovery_limit, "degree_of_int": ( (heat_recovery_target / s_tzt.heat_recovery_limit) if s_tzt.heat_recovery_limit > 0 else 1.0 ), "utility_cost": _compute_utility_cost( hot_utilities + cold_utilities, idx=idx, ), "state_id": sid, "state_idx": idx, } return TotalSiteTarget.model_validate(output)
################################################################################ # Helper Functions ################################################################################ def _match_utility_gen_and_use_at_same_level( hot_utilities: StreamCollection, cold_utilities: StreamCollection, idx: int | None = None, ) -> Tuple[StreamCollection, StreamCollection]: for u_h in hot_utilities: for u_c in cold_utilities: if ( abs((u_h.t_supply[idx] - u_c.t_target[idx])) < 1 and abs((u_h.t_target[idx] - u_c.t_supply[idx])) < 1 ): Q = min(u_h.heat_flow[idx], u_c.heat_flow[idx]) u_h.heat_flow[idx] -= Q u_c.heat_flow[idx] -= Q return hot_utilities, cold_utilities def _compute_utility_cost( utilities: StreamCollection, idx: int | None = None, ) -> np.float64: return np.sum([u.utility_cost[idx] for u in utilities]) def _shift_site_process_profiles( T_col: np.ndarray, H_hot: np.ndarray, H_cold: np.ndarray, ) -> ProblemTableUpdateKwargs: return { "T_col": T_col, "updates": { PT.H_HOT: H_hot - H_hot[0], PT.H_COLD: H_cold - H_cold[-1], }, } def _build_site_utility_profile( hot_utilities: StreamCollection, cold_utilities: StreamCollection, is_shifted: bool = False, idx: int | None = None, ) -> ProblemTableUpdateKwargs: pt_ut = get_process_heat_cascade( hot_streams=hot_utilities, cold_streams=cold_utilities, is_shifted=is_shifted, idx=idx, ) h_net_ut = pt_ut[PT.H_HOT] - pt_ut[PT.H_COLD] return { "T_col": pt_ut[PT.T], "updates": { PT.H_NET_UT: h_net_ut - h_net_ut.min(), PT.H_HOT_UT: pt_ut[PT.H_HOT], PT.H_COLD_UT: pt_ut[PT.H_COLD] - pt_ut[PT.H_COLD].max(), }, } def _save_graph_data( pt: ProblemTable, ) -> Zone: """Prepare graph-ready tables capturing site-level utility composite curves.""" pt.round(decimals=4) return { GT.TSP.value: pt.slice([PT.T, PT.H_HOT, PT.H_COLD, PT.H_HOT_UT, PT.H_COLD_UT]), GT.SUGCC.value: pt.slice([PT.T, PT.H_NET_UT, PT.H_NET_HP]), }