"""Target multiple utilities over a heating or cooling profile from the pinch."""
from typing import Tuple
import numpy as np
from ...classes.problem_table import ProblemTable
from ...classes.stream_collection import StreamCollection
from ...lib.config import tol
from ...lib.enums import PT
from .gcc_manipulation import get_seperated_gcc_heat_load_profiles
from .problem_table_analysis import get_utility_heat_cascade
__all__ = ["target_utilities_for_load_profiles", "get_utility_targets"]
################################################################################
# Public API
################################################################################
[docs]
def get_utility_targets(
pt: ProblemTable,
pt_real: ProblemTable = None,
hot_utilities: StreamCollection = None,
cold_utilities: StreamCollection = None,
is_direct_integration: bool = True,
idx: int | None = None,
) -> Tuple[ProblemTable, ProblemTable, StreamCollection, StreamCollection]:
"""Target utility usage and compute GCC variants for a zone.
Parameters
----------
pt, pt_real:
Shifted and real problem tables used for constructing composite curves.
hot_utilities, cold_utilities:
Candidate utility collections that will be targeted across temperature
intervals.
is_direct_integration:
When ``True`` (default) the function assumes the zone represents a
process area and applies additional targeting logic appropriate for that
context.
Returns
-------
tuple
Updated ``(pt, pt_real, hot_utilities, cold_utilities)`` collections with
derived profiles embedded.
"""
# Target multiple utility use
if is_direct_integration:
hot_utilities, cold_utilities = target_utilities_for_load_profiles(
hot_utilities=hot_utilities,
cold_utilities=cold_utilities,
T_vals=pt[PT.T],
H_net_cold=pt[PT.H_NET_COLD],
H_net_hot=pt[PT.H_NET_HOT],
pinch_idx=pt.pinch_idx(PT.H_NET_A),
is_real_temperatures=False,
idx=idx,
)
pt.update(
**get_utility_heat_cascade(
T_int_vals=pt[PT.T],
hot_utilities=hot_utilities,
cold_utilities=cold_utilities,
is_shifted=True,
idx=idx,
)
)
pt.update(
**get_seperated_gcc_heat_load_profiles(
T_col=pt[PT.T],
H_net=pt[PT.H_NET_UT],
rcp_net=pt[PT.RCP_UT_NET],
is_process_stream=False,
)
)
if isinstance(pt_real, ProblemTable):
pt_real.update(
**get_utility_heat_cascade(
T_int_vals=pt_real[PT.T],
hot_utilities=hot_utilities,
cold_utilities=cold_utilities,
is_shifted=False,
idx=idx,
)
)
pt_real.update(
**get_seperated_gcc_heat_load_profiles(
T_col=pt_real[PT.T],
H_net=pt_real[PT.H_NET_UT],
rcp_net=pt_real[PT.RCP_UT_NET],
is_process_stream=False,
)
)
return pt, pt_real, hot_utilities, cold_utilities
################################################################################
# Helper functions
################################################################################
[docs]
def target_utilities_for_load_profiles(
*,
hot_utilities: StreamCollection,
cold_utilities: StreamCollection,
T_vals: np.ndarray,
H_net_cold: np.ndarray,
H_net_hot: np.ndarray,
pinch_idx: Tuple[int, int],
is_real_temperatures: bool = False,
idx: int | None = None,
) -> Tuple[StreamCollection, StreamCollection]:
"""Targets multiple utilities for precomputed hot- and cold-side load profiles."""
if abs(H_net_cold[0]) > tol:
if len(hot_utilities) == 0:
raise ValueError(
"Hot utility targeting failed. No hot utilities provided but "
"heat load profile indicates utility use is required."
)
hot_utilities = _assign_utility(
T_vals=T_vals,
H_vals=np.abs(H_net_cold),
u_ls=hot_utilities,
pinch_row=pinch_idx[0],
is_hot_ut=True,
is_real_temperatures=is_real_temperatures,
idx=idx,
)
if abs(H_net_hot[-1]) > tol:
if len(cold_utilities) == 0:
raise ValueError(
"Cold utility targeting failed. No cold utilities provided but "
"heat load profile indicates utility use is required."
)
cold_utilities = _assign_utility(
T_vals=T_vals,
H_vals=np.abs(H_net_hot),
u_ls=cold_utilities,
pinch_row=pinch_idx[1],
is_hot_ut=False,
is_real_temperatures=is_real_temperatures,
idx=idx,
)
return hot_utilities, cold_utilities
def _assign_utility(
T_vals: np.ndarray,
H_vals: np.ndarray,
u_ls: StreamCollection,
pinch_row: int,
is_hot_ut: bool,
is_real_temperatures: bool,
idx: int | None,
) -> StreamCollection:
"""Assigns utility heat duties based on vertical heat transfer across a pinch."""
if is_hot_ut:
T_segment = T_vals[: pinch_row + 1]
H_segment = H_vals[: pinch_row + 1]
segment_limit = H_segment[0]
else:
T_segment = T_vals[pinch_row:]
H_segment = H_vals[pinch_row:]
segment_limit = H_segment[-1]
if len(np.where(H_segment < tol)) != 1:
raise ValueError(
"Error in utility targeting. Please report the data that produced "
"this error."
)
Q_assigned = 0.0
for u in reversed(u_ls) if is_hot_ut else u_ls:
if is_real_temperatures:
t_lo, t_hi = u.t_min, u.t_max
else:
t_lo, t_hi = u.t_min_star, u.t_max_star
if is_hot_ut:
Ts, Tt = float(t_hi[idx]), float(t_lo[idx])
else:
Ts, Tt = float(t_lo[idx]), float(t_hi[idx])
Q_ut_max = _maximise_utility_duty(
T_segment,
H_segment,
Ts,
Tt,
is_hot_ut,
Q_assigned,
)
if Q_ut_max > tol:
u.set_value_attr_at_state_idx(
attr_name="heat_flow",
value=Q_ut_max,
idx=idx,
)
Q_assigned += Q_ut_max
if abs(segment_limit - Q_assigned) < tol:
break
return u_ls
def _maximise_utility_duty(
T_segment: np.ndarray,
H_segment: np.ndarray,
Ts: float,
Tt: float,
is_hot_ut: bool,
Q_assigned: float,
) -> float:
"""Determine remaining heat duty within temperature and assignment limits."""
if T_segment.size < 2:
return 0.0
if is_hot_ut:
current_T = T_segment[1:]
previous_T = T_segment[:-1]
current_H = H_segment[1:]
adjacent_H = H_segment[:-1]
Q_pot = adjacent_H - Q_assigned
dt_tar = Tt - current_T
dt_sup = Ts - previous_T
else:
current_T = T_segment[:-1]
next_T = T_segment[1:]
current_H = H_segment[:-1]
adjacent_H = H_segment[1:]
Q_pot = adjacent_H - Q_assigned
dt_tar = current_T - Tt
dt_sup = next_T - Ts
valid_mask = (adjacent_H != current_H) & (dt_sup >= -tol) & (Q_pot > tol)
if not np.any(valid_mask):
return 0.0
dt_tar_valid = dt_tar[valid_mask]
Q_pot_valid = Q_pot[valid_mask]
if dt_tar_valid.max() < 0:
return 0.0
Q_ts_max = Q_pot_valid.max()
Q_tt = np.full_like(Q_pot_valid, np.inf, dtype=float)
slope_mask = (-dt_tar_valid) > tol
if np.any(slope_mask):
Q_tt[slope_mask] = (
Q_pot_valid[slope_mask] / (-dt_tar_valid[slope_mask]) * abs(Tt - Ts)
)
Q_tt_max = Q_tt.min() if Q_tt.size > 0 else np.inf
return min(Q_ts_max, Q_tt_max) if dt_tar_valid.max() >= 0 else 0.0