Source code for OpenPinch.services.direct_heat_integration.direct_integration_entry

"""Direct heat integration entry point for process and unit-level targeting."""

from typing import List, Tuple

import numpy as np

from ...classes.problem_table import ProblemTable
from ...classes.stream import Stream
from ...classes.stream_collection import StreamCollection
from ...classes.zone import Zone
from ...lib.config import tol
from ...lib.enums import GT, PT, ST, TT
from ...lib.schemas.targets import DirectIntegrationTarget
from ...utils.miscellaneous import delta_vals, get_state_index
from ..common.capital_cost_and_area_targeting import (
    get_area_targets,
    get_balanced_CC,
    get_capital_cost_targets,
    get_min_number_hx,
)
from ..common.gcc_manipulation import get_additional_GCCs
from ..common.problem_table_analysis import (
    get_heat_recovery_target_from_pt,
    get_process_heat_cascade,
    set_zonal_targets,
)
from ..common.utility_targeting import get_utility_targets

__all__ = ["compute_direct_integration_targets"]

################################################################################
# Public API
################################################################################


[docs] def compute_direct_integration_targets( zone: Zone, args: dict | None = None, ) -> DirectIntegrationTarget: """Populate a ``Zone`` with detailed direct heat integration pinch targets. The function aggregates Problem Table calculations, multi-utility targeting, pinch temperature detection, and graph preparation. Results are cached on the provided ``zone`` and used later by site and regional aggregation routines. """ idx, sid = get_state_index(state_ids=zone.state_ids, args=args) pt = get_process_heat_cascade( hot_streams=zone.hot_streams, cold_streams=zone.cold_streams, all_streams=zone.all_streams, is_shifted=True, idx=idx, ) pt_real = get_process_heat_cascade( hot_streams=zone.hot_streams, cold_streams=zone.cold_streams, all_streams=zone.all_streams, is_shifted=False, known_heat_recovery=get_heat_recovery_target_from_pt(pt), idx=idx, ) hot_pinch, cold_pinch = pt.pinch_temperatures() pt = get_additional_GCCs( pt, do_vert_cc_calc=zone.config.DO_VERTICAL_GCC, do_assisted_ht_calc=zone.config.DO_ASSITED_HT, assisted_ht_dt_cut=zone.config.DT_ASSISTED_HT, ) get_utility_targets( pt=pt, pt_real=pt_real, hot_utilities=zone.hot_utilities, cold_utilities=zone.cold_utilities, is_direct_integration=True, idx=idx, ) zone.net_hot_streams, zone.net_cold_streams = ( _create_net_hot_and_cold_stream_collections_for_site_analysis( T_vals=pt[PT.T], H_vals=pt[PT.H_NET_A], hot_utilities=zone.hot_utilities, cold_utilities=zone.cold_utilities, idx=idx, ) ) if zone.config.DO_BALANCED_CC or zone.config.DO_AREA_TARGETING: pt.update( **get_balanced_CC( T_col=pt[PT.T], H_hot=pt[PT.H_HOT], H_cold=pt[PT.H_COLD], H_hot_ut=pt[PT.H_HOT_UT], H_cold_ut=pt[PT.H_COLD_UT], ) ) pt_real.update( **get_balanced_CC( T_col=pt_real[PT.T], H_hot=pt_real[PT.H_HOT], H_cold=pt_real[PT.H_COLD], H_hot_ut=pt_real[PT.H_HOT_UT], H_cold_ut=pt_real[PT.H_COLD_UT], dT_vals=pt_real[PT.DELTA_T], RCP_hot=pt_real[PT.RCP_HOT], RCP_cold=pt_real[PT.RCP_COLD], RCP_hot_ut=pt_real[PT.RCP_HOT_UT], RCP_cold_ut=pt_real[PT.RCP_COLD_UT], ) ) # Target capital cost, area, and exchanger count from the balanced CC. if zone.config.DO_AREA_TARGETING: num_units = get_min_number_hx( T_vals=pt[PT.T], H_hot_bal=pt[PT.H_HOT_BAL], H_cold_bal=pt[PT.H_COLD_BAL], hot_streams=zone.hot_streams, cold_streams=zone.cold_streams, hot_utilities=zone.hot_utilities, cold_utilities=zone.cold_utilities, idx=idx, ) area = get_area_targets( T_vals=pt_real[PT.T], H_hot_bal=pt_real[PT.H_HOT_BAL], H_cold_bal=pt_real[PT.H_COLD_BAL], R_hot_bal=pt_real[PT.R_HOT_BAL], R_cold_bal=pt_real[PT.R_COLD_BAL], ) capital_cost, annual_capital_cost = get_capital_cost_targets( area=area, num_units=num_units, zone_config=zone.config, ) area_payload = { "area": area, "num_units": num_units, "capital_cost": capital_cost, "total_cost": annual_capital_cost, } else: area_payload = {} else: area_payload = {} payload = ( set_zonal_targets( pt=pt, pt_real=pt_real, ) | { "zone_name": zone.name, "type": TT.DI.value, "parent_zone": zone.parent_zone, "config": zone.config, "pt": pt, "pt_real": pt_real, "graphs": _save_graph_data(pt, pt_real), "hot_utilities": zone.hot_utilities, "cold_utilities": zone.cold_utilities, "hot_pinch": hot_pinch, "cold_pinch": cold_pinch, "state_id": sid, "state_idx": idx, } | area_payload ) return DirectIntegrationTarget.model_validate(payload)
################################################################################ # Helper functions ################################################################################ def _create_net_hot_and_cold_stream_collections_for_site_analysis( T_vals: np.ndarray, H_vals: np.ndarray, hot_utilities: StreamCollection, cold_utilities: StreamCollection, idx: int | None = None, ) -> Tuple[StreamCollection, StreamCollection]: """Construct net stream segments requiring utility input by interval.""" net_hot_streams = StreamCollection() net_cold_streams = StreamCollection() if ( hot_utilities.sum_stream_attribute("heat_flow", idx=idx) + cold_utilities.sum_stream_attribute("heat_flow", idx=idx) < tol ): # If no utility is needed, there is no net streams for indirect integration. return net_hot_streams, net_cold_streams if delta_vals(T_vals).min() < tol: raise ValueError("Infeasible temperature interval detected in _store_TSP_data") T_vals = T_vals dh_vals = delta_vals(H_vals) hot_utilities_seq = list(hot_utilities) cold_utilities_seq = list(cold_utilities) hot_remaining = [float(u.heat_flow[idx]) for u in hot_utilities_seq] cold_remaining = [float(u.heat_flow[idx]) for u in cold_utilities_seq] hu_idx = _initialise_utility_index(hot_utilities_seq, hot_remaining) cu_idx = _initialise_utility_index(cold_utilities_seq, cold_remaining) k = 1 for i, dh in enumerate(dh_vals): if dh > tol and hu_idx >= 0: hu_idx, k = _add_net_segment_stateful( T_ub=T_vals[i], T_lb=T_vals[i + 1], curr_idx=hu_idx, dh_req=dh, utilities=hot_utilities_seq, remaining=hot_remaining, net_streams=net_cold_streams, k=k, idx=idx, ) elif -dh > tol and cu_idx >= 0: cu_idx, k = _add_net_segment_stateful( T_ub=T_vals[i], T_lb=T_vals[i + 1], curr_idx=cu_idx, dh_req=abs(dh), utilities=cold_utilities_seq, remaining=cold_remaining, net_streams=net_hot_streams, k=k, idx=idx, ) return net_hot_streams, net_cold_streams def _add_net_segment_stateful( T_ub: float, T_lb: float, curr_idx: int, dh_req: float, utilities: StreamCollection, remaining: List[float], net_streams: StreamCollection, k: int, j: int = 0, idx: int | None = None, ) -> Tuple[int, int]: """Adds a net utility segment and recursively handles segmentation if needed.""" if curr_idx < 0 or not utilities or dh_req <= tol: return curr_idx, k curr_u = utilities[curr_idx] available = float(remaining[curr_idx]) if curr_idx < len(remaining) else 0.0 if available <= tol: next_idx = _find_next_available_utility(curr_idx + 1, utilities, remaining) if next_idx == curr_idx: return curr_idx, k return _add_net_segment_stateful( T_ub=T_ub, T_lb=T_lb, curr_idx=next_idx, dh_req=dh_req, utilities=utilities, remaining=remaining, net_streams=net_streams, k=k, j=j, idx=idx, ) dh_curr = float(min(dh_req, available)) remaining[curr_idx] = max(available - dh_curr, 0.0) dh_next = dh_req - dh_curr T_span = T_ub - T_lb T_i = T_ub - (dh_curr / dh_req) * T_span if T_span and dh_req > tol else T_lb net_streams.add( Stream( name=f"Segment {k}" if j == 0 else f"Segment {k}-{j}", t_supply=T_i if curr_u.type == ST.Hot.value else T_ub, t_target=T_ub if curr_u.type == ST.Hot.value else T_i, heat_flow=dh_curr, dt_cont=float(curr_u.dt_cont[idx]), dt_cont_multiplier=curr_u.dt_cont_multiplier, htc=1.0, is_process_stream=True, ) ) if dh_next > tol: next_idx = _find_next_available_utility(curr_idx + 1, utilities, remaining) if next_idx == curr_idx: return curr_idx, k + 1 return _add_net_segment_stateful( T_ub=T_i, T_lb=T_lb, curr_idx=next_idx, dh_req=dh_next, utilities=utilities, remaining=remaining, net_streams=net_streams, k=k, j=j + 1, idx=idx, ) next_idx = ( curr_idx if remaining[curr_idx] > tol else _find_next_available_utility(curr_idx + 1, utilities, remaining) ) return next_idx, k + 1 def _initialise_utility_index( utilities: StreamCollection, remaining: List[float] ) -> int: """Returns the index of the first available utility with remaining capacity.""" for idx, residual in enumerate(remaining): if residual > tol: return idx return len(utilities) - 1 if utilities else -1 def _find_next_available_utility( start: int, utilities: StreamCollection, remaining: List[float] ) -> int: """Return the index of the next utility that still has remaining duty.""" if not utilities: return -1 for idx in range(max(start, 0), len(utilities)): if remaining[idx] > tol: return idx return len(utilities) - 1 def _save_graph_data(pt: ProblemTable, pt_real: ProblemTable) -> dict: """Assemble the Problem Table slices required for composite/comparison plots.""" pt.round(decimals=4) pt_real.round(decimals=4) return { GT.CC.value: pt_real.slice([PT.T, PT.H_HOT, PT.H_COLD]), GT.SCC.value: pt.slice([PT.T, PT.H_HOT, PT.H_COLD]), GT.BCC.value: pt_real.slice([PT.T, PT.H_HOT_BAL, PT.H_COLD_BAL]), GT.GCC.value: pt.slice( [PT.T, PT.H_NET, PT.H_NET_NP, PT.H_NET_V, PT.H_NET_A, PT.H_NET_UT] ), GT.GCC_R.value: pt_real.slice([PT.T, PT.H_NET, PT.H_NET_UT]), GT.NLP.value: pt.slice( [ PT.T, PT.H_NET_HOT, PT.H_NET_COLD, PT.H_HOT_UT, PT.H_COLD_UT, PT.H_HOT_HP, PT.H_COLD_HP, ] ), GT.GCC_HP.value: pt.slice([PT.T, PT.H_NET_W_AIR, PT.H_NET_HP]), }