"""Area targeting methods."""
import numpy as np
from ...classes.stream_collection import StreamCollection
from ...lib.config import Configuration, tol
from ...lib.enums import PT
from ...lib.problem_table_types import ProblemTableUpdateKwargs
from ...utils.costing import compute_annual_capital_cost, compute_capital_cost
from ...utils.heat_exchanger import compute_LMTD_from_dts
from ...utils.miscellaneous import clean_composite_curve_ends
from .temperature_driving_force import get_temperature_driving_forces
__all__ = [
"get_balanced_CC",
"get_capital_cost_targets",
"get_area_targets",
"get_min_number_hx",
]
################################################################################
# Public API
################################################################################
[docs]
def get_capital_cost_targets(
area: float,
num_units: int,
zone_config: Configuration,
) -> dict:
"""Estimate equipment and annualized capital costs from area/unit targets.
Parameters
----------
area:
Total heat-transfer area target from balanced composite curves.
num_units:
Minimum exchanger count estimate for the same targeting scenario.
zone_config:
Active configuration containing fixed/variable cost coefficients,
capital exponent, discount rate, and service life assumptions.
Returns
-------
tuple[float, float]
``(capital_cost, annual_capital_cost)``.
"""
capital_cost = compute_capital_cost(
area,
num_units,
zone_config.FIXED_COST,
zone_config.VARIABLE_COST,
zone_config.COST_EXP,
)
annual_capital_cost = compute_annual_capital_cost(
capital_cost,
zone_config.DISCOUNT_RATE,
zone_config.SERV_LIFE,
)
return capital_cost, annual_capital_cost
[docs]
def get_balanced_CC(
T_col: np.ndarray,
H_hot: np.ndarray,
H_cold: np.ndarray,
H_hot_ut: np.ndarray,
H_cold_ut: np.ndarray,
dT_vals: np.ndarray = None,
RCP_hot: np.ndarray = None,
RCP_cold: np.ndarray = None,
RCP_hot_ut: np.ndarray = None,
RCP_cold_ut: np.ndarray = None,
) -> ProblemTableUpdateKwargs:
"""Create the balanced composite curve using process and utility streams."""
H_hot_bal = H_hot + H_hot_ut
H_cold_bal = H_cold + H_cold_ut
res = {
PT.H_HOT_BAL: H_hot_bal,
PT.H_COLD_BAL: H_cold_bal,
}
if (
RCP_hot is not None
and RCP_cold is not None
and RCP_hot_ut is not None
and RCP_cold_ut is not None
):
dH_hot_bal = np.insert(
H_hot_bal[:-1] - H_hot_bal[1:],
0,
0,
)
dH_cold_bal = np.insert(
H_cold_bal[:-1] - H_cold_bal[1:],
0,
0,
)
R_hot_bal = np.zeros_like(dH_hot_bal, dtype=float)
mask_hot = dH_hot_bal > tol
np.divide(
(RCP_hot + RCP_hot_ut) * dT_vals,
dH_hot_bal,
out=R_hot_bal,
where=mask_hot,
)
R_cold_bal = np.zeros_like(dH_cold_bal, dtype=float)
mask_cold = dH_cold_bal > tol
np.divide(
(RCP_cold + RCP_cold_ut) * dT_vals,
dH_cold_bal,
out=R_cold_bal,
where=mask_cold,
)
res.update(
{
PT.H_HOT_BAL: H_hot + H_hot_ut,
PT.H_COLD_BAL: H_cold + H_cold_ut,
PT.RCP_HOT_BAL: RCP_hot + RCP_hot_ut,
PT.RCP_COLD_BAL: RCP_cold + RCP_cold_ut,
PT.R_HOT_BAL: R_hot_bal,
PT.R_COLD_BAL: R_cold_bal,
}
)
return {"T_col": T_col, "updates": res}
[docs]
def get_area_targets(
T_vals: np.ndarray,
H_hot_bal: np.ndarray,
H_cold_bal: np.ndarray,
R_hot_bal: np.ndarray,
R_cold_bal: np.ndarray,
) -> dict:
"""Estimate heat-transfer area targets with vectorised counter-current logic."""
if abs((H_hot_bal[0] - H_hot_bal[-1]) - (H_cold_bal[0] - H_cold_bal[-1])) > tol:
# Raise an error because heat-flow balance is required for this analysis.
raise ValueError(
"The temperature driving force plot requires the inputted "
"composite curves to be balanced."
)
# Shift the hot and cold cascades to start from zero at the lowest temperature.
if abs(H_hot_bal[0]) > tol:
H_hot_bal = H_hot_bal - H_hot_bal[-1]
if abs(H_cold_bal[0]) > tol:
H_cold_bal = H_cold_bal - H_cold_bal[-1]
Th, Hh = clean_composite_curve_ends(T_vals, H_hot_bal)
Tc, Hc = clean_composite_curve_ends(T_vals, H_cold_bal)
tdf = get_temperature_driving_forces(Th, Hh, Tc, Hc)
dt_lm_i = compute_LMTD_from_dts(
tdf["delta_T1"],
tdf["delta_T2"],
)
Q_i: np.ndarray = tdf["dh_vals"]
R_i = _map_interval_resistances_to_tdf(
T_vals,
R_hot_bal,
R_cold_bal,
tdf["t_h1"],
tdf["t_h2"],
tdf["t_c1"],
tdf["t_c2"],
)
with np.errstate(divide="ignore", invalid="ignore"):
U_i = np.where(R_i > tol, 1.0 / R_i, 1.0)
if not (Q_i.shape == U_i.shape == dt_lm_i.shape):
raise ValueError("Shape of heat exchanger area calculation arrays are unequal.")
area_i = Q_i / (U_i * dt_lm_i)
return area_i.sum()
[docs]
def get_min_number_hx(
T_vals: np.ndarray,
H_hot_bal: np.ndarray,
H_cold_bal: np.ndarray,
hot_streams: StreamCollection,
cold_streams: StreamCollection,
hot_utilities: StreamCollection,
cold_utilities: StreamCollection,
idx: int | None = None,
) -> int:
"""Estimate the minimum number of exchangers using vectorised interval logic."""
num_hx: int = 0
H_net_bal = H_cold_bal - H_hot_bal
mask = np.isclose(H_net_bal, 0.0, atol=tol)
mask_true_positions = np.flatnonzero(mask).tolist()
idx_pairs = []
for i in range(len(mask_true_positions) - 1):
if mask_true_positions[i] + 1 < mask_true_positions[i + 1]:
idx_pairs.append((mask_true_positions[i], mask_true_positions[i + 1]))
for i0, i1 in idx_pairs:
T_high, T_low = T_vals[i0], T_vals[i1]
num_hx += _count_crossing(T_low, T_high, hot_streams, idx=idx)
num_hx += _count_crossing(T_low, T_high, cold_streams, idx=idx)
num_hx += _count_utility_range_container(T_low, T_high, hot_utilities, idx=idx)
num_hx += _count_utility_range_container(T_low, T_high, cold_utilities, idx=idx)
return int(num_hx - len(idx_pairs))
################################################################################
# Helper functions
################################################################################
def _map_interval_resistances_to_tdf(
T_vals: np.ndarray,
R_hot_bal: np.ndarray,
R_cold_bal: np.ndarray,
t_h1: np.ndarray,
t_h2: np.ndarray,
t_c1: np.ndarray,
t_c2: np.ndarray,
) -> tuple[np.ndarray, np.ndarray]:
"""
Align total resistances with temperature-driving-force intervals.
Returns:
tuple[np.ndarray, np.ndarray]:
``(total_resistance, mask)`` where ``total_resistance`` has the
same length as the temperature-driving-force intervals and
``mask`` maps each interval to a corresponding temperature band in
``T_vals``.
"""
interval_lower = T_vals[1:]
interval_upper = T_vals[:-1]
active_hot = (t_h1[np.newaxis, :] >= interval_lower[:, np.newaxis] - tol) & (
t_h2[np.newaxis, :] <= interval_upper[:, np.newaxis] + tol
)
active_cold = (t_c1[np.newaxis, :] >= interval_lower[:, np.newaxis] - tol) & (
t_c2[np.newaxis, :] <= interval_upper[:, np.newaxis] + tol
)
R_hot_mat = np.ones(shape=active_hot.shape) * R_hot_bal[1:, np.newaxis]
R_cold_mat = np.ones(shape=active_hot.shape) * R_cold_bal[1:, np.newaxis]
Rh = (active_hot * R_hot_mat).sum(axis=0)
Rc = (active_cold * R_cold_mat).sum(axis=0)
return Rh + Rc
def _count_crossing(
T_low: float,
T_high: float,
streams: StreamCollection,
idx: int | None = None,
):
"""Count process streams intersecting interval ``[T_low, T_high]``."""
t_max = np.array([s.t_max_star[idx] for s in streams])
t_min = np.array([s.t_min_star[idx] for s in streams])
return np.sum(
((t_max > T_low + tol) & (t_max <= T_high + tol))
| ((t_min >= T_low - tol) & (t_min < T_high - tol))
| ((t_min < T_low - tol) & (t_max > T_high + tol))
)
def _count_utility_range_container(
T_low: float,
T_high: float,
utilities: StreamCollection,
idx: int | None = None,
):
"""Count utility streams intersecting interval ``[T_low, T_high]``."""
t_max = np.array([u.t_max_star[idx] for u in utilities])
t_min = np.array([u.t_min_star[idx] for u in utilities])
active = np.array([1 if u.heat_flow[idx] > tol else 0 for u in utilities])
return np.sum((t_min >= T_low - tol) & (t_max <= T_high + tol) & (active))