Source code for OpenPinch.services.heat_pump_integration.common.shared

"""Shared helpers for heat pump and refrigeration targeting."""

from typing import Any, Callable, Tuple

import numpy as np
from CoolProp.CoolProp import PropsSI

try:
    import plotly.graph_objects as go
except ImportError as exc:  # pragma: no cover - optional dependency guard
    go = None
    _PLOTLY_IMPORT_ERROR = exc
else:
    _PLOTLY_IMPORT_ERROR = None

from ....classes.stream import Stream
from ....classes.stream_collection import StreamCollection
from ....lib.config import tol
from ....lib.enums import PT
from ....lib.schemas.hpr import (
    HeatPumpTargetInputs,
    HPRBackendResult,
    HPRParsedState,
    HPRThermoArtifacts,
)
from ....utils.blackbox_minimisers import multiminima
from ....utils.miscellaneous import (
    clean_composite_curve_ends,
    delta_vals,
    g_ineq_penalty,
)
from ...common.problem_table_analysis import (
    create_problem_table_with_t_int,
    get_process_heat_cascade,
    get_utility_heat_cascade,
)

__all__ = [
    "PropsSI",
    "solve_hpr_placement",
    "create_stream_collection_of_background_profile",
    "get_Q_vals_at_T_hpr_from_bckgrd_profile",
    "compute_entropic_mean_temperature",
    "calc_carnot_heat_pump_cop",
    "calc_carnot_heat_engine_eta",
    "_append_unspecified_final_cascade_cooling_duty",
    "_get_hpr_cascade",
    "validate_vapour_hp_refrigerant_ls",
    "_get_carnot_hpr_cycle_cascade_profile",
    "get_carnot_hpr_cycle_streams",
    "_build_latent_streams",
    "get_ambient_air_stream",
    "calc_hpr_obj",
    "evaluate_carnot_hpr_result",
    "evaluate_vapour_hpr_result",
    "plot_multi_hp_profiles_from_results",
    "get_process_heat_cascade",
    "get_utility_heat_cascade",
    "g_ineq_penalty",
    "tol",
]


[docs] def plot_multi_hp_profiles_from_results( T_hot: np.ndarray = None, H_hot: np.ndarray = None, T_cold: np.ndarray = None, H_cold: np.ndarray = None, hpr_hot_streams: StreamCollection = None, hpr_cold_streams: StreamCollection = None, idx: int = 0, title: str = None, ) -> "go.Figure": # type: ignore """Plot background source/sink profiles alongside solved HPR cycle streams.""" go = _require_plotly() fig = go.Figure() if T_hot is not None and H_hot is not None: T_hot, H_hot = clean_composite_curve_ends(T_hot, H_hot) fig.add_trace( go.Scatter( x=H_hot, y=T_hot, mode="lines", name="Sink", line={"color": "red", "width": 2}, ) ) if T_cold is not None and H_cold is not None: T_cold, H_cold = clean_composite_curve_ends(T_cold, H_cold) fig.add_trace( go.Scatter( x=H_cold, y=T_cold, mode="lines", name="Source", line={"color": "blue", "width": 2}, ) ) if hpr_hot_streams is not None and hpr_cold_streams is not None: T_hpr_arr, H_hpr_hot, H_hpr_cold = _get_hpr_cascade( hpr_hot_streams, hpr_cold_streams, idx=idx, ) T_hpr_hot, H_hpr_hot = clean_composite_curve_ends(T_hpr_arr, H_hpr_hot) T_hpr_cold, H_hpr_cold = clean_composite_curve_ends(T_hpr_arr, H_hpr_cold) fig.add_trace( go.Scatter( x=H_hpr_hot, y=T_hpr_hot, mode="lines", name="Condenser", line={"color": "darkred", "width": 1.8, "dash": "dash"}, ) ) fig.add_trace( go.Scatter( x=H_hpr_cold, y=T_hpr_cold, mode="lines", name="Evaporator", line={"color": "darkblue", "width": 1.8, "dash": "dash"}, ) ) fig.update_layout( title=title, xaxis_title="Heat Flow / kW", yaxis_title="Temperature / degC", template="plotly_white", ) fig.update_xaxes(showgrid=True, gridcolor="rgba(0, 0, 0, 0.2)", zeroline=True) fig.update_yaxes(showgrid=True, gridcolor="rgba(0, 0, 0, 0.2)") fig.add_vline(x=0.0, line_color="black", line_width=2) return fig
def _require_plotly(): if _PLOTLY_IMPORT_ERROR is not None: raise ImportError( "Plotly is required for HPR profile plotting. " "Install it directly or reinstall OpenPinch with " "'pip install openpinch[notebook]' or 'pip install openpinch[dashboard]'." ) from _PLOTLY_IMPORT_ERROR return go def _verify_x0_ls(x0_ls: list | float | None) -> np.ndarray | None: if x0_ls is None: return None if isinstance(x0_ls, (int, float)): x0_ls = [x0_ls] x0_arr = np.asarray(x0_ls, dtype=np.float64) if x0_arr.size == 0: return None if x0_arr.ndim == 0: return x0_arr.reshape(1, 1) if x0_arr.ndim == 1: return x0_arr.reshape(1, -1) return x0_arr
[docs] def solve_hpr_placement( f_obj: Callable, x0_ls: list | float, bnds: list, args: HeatPumpTargetInputs, ) -> HPRBackendResult: """Run the configured multistart optimiser and post-process the best result.""" local_minima_x = multiminima( func=f_obj, func_kwargs=args, x0_ls=_verify_x0_ls(x0_ls), bounds=bnds, optimiser_handle=args.bb_minimiser, opt_kwargs={"n_runs": max(1, int(args.max_multi_start))}, ) if local_minima_x.size == 0: raise ValueError( "Heat pump and refrigeration targeting " f"({args.hpr_type}) failed to return any local minima." ) result = f_obj(local_minima_x[0], args, debug=args.debug) if not isinstance(result, HPRBackendResult): raise TypeError( "Heat pump and refrigeration objective functions must return " "HPRBackendResult." ) if not result.success: raise ValueError( "Heat pump and refrigeration targeting " f"({args.hpr_type}) failed to return an optimal result." ) return result.with_updates( success=True, amb_streams=get_ambient_air_stream(result.Q_amb_hot, result.Q_amb_cold, args), )
def _normalise_positive_scalar(value: Any) -> float: arr = np.asarray(value, dtype=float) if arr.size == 0: return 0.0 return max(float(arr.reshape(-1)[0]), 0.0) def _build_hpr_accounting( *, work: float, Q_ext_heat: float, Q_ext_cold: float, args: HeatPumpTargetInputs, penalty_terms: Any = None, penalise_external_cold_when_refrigerating: bool = False, ) -> tuple[float, float, float, float]: """Standardize external-utility, penalty, and objective semantics.""" Q_ext_heat = _normalise_positive_scalar(Q_ext_heat) Q_ext_cold = _normalise_positive_scalar(Q_ext_cold) penalty_terms = ( np.asarray([], dtype=float) if penalty_terms is None else np.asarray(penalty_terms, dtype=float).reshape(-1) ) positive_penalty_terms = np.maximum(penalty_terms, 0.0) eta_penalty = float(getattr(args, "eta_penalty", 0.01)) rho_penalty = float(getattr(args, "rho_penalty", 10.0)) penalty = ( float( g_ineq_penalty( positive_penalty_terms, eta=eta_penalty, rho=rho_penalty, form="square", ) ) if positive_penalty_terms.size else 0.0 ) if penalise_external_cold_when_refrigerating and not getattr( args, "is_heat_pumping", True ): penalty += float(g_ineq_penalty(g=Q_ext_cold, rho=rho_penalty, form="square")) obj = float( calc_hpr_obj( work=work, Q_ext_heat=Q_ext_heat, Q_ext_cold=Q_ext_cold, Q_hpr_target=args.Q_hpr_target, heat_to_power_ratio=args.heat_to_power_ratio, cold_to_power_ratio=args.cold_to_power_ratio, penalty=penalty, ) ) return Q_ext_heat, Q_ext_cold, penalty, obj
[docs] def evaluate_carnot_hpr_result( *, args: HeatPumpTargetInputs, state: HPRParsedState, w_net: float, w_hpr: float | list | np.ndarray, Q_cond_total: np.ndarray, Q_evap_total: np.ndarray, w_he: float | list | np.ndarray | None = None, heat_recovery: float | list | np.ndarray | None = None, cop_h: float | list | np.ndarray | None = None, eta_he: float | list | np.ndarray | None = None, Q_cond: np.ndarray | None = None, Q_evap: np.ndarray | None = None, Q_cond_he: np.ndarray | None = None, Q_evap_he: np.ndarray | None = None, penalty_terms: Any = None, debug: bool = False, ) -> HPRBackendResult: """Shared Carnot-family accounting, plotting, and result assembly.""" H_cold_with_amb = args.H_cold + args.z_amb_cold * state.Q_amb_cold H_hot_with_amb = args.H_hot + args.z_amb_hot * state.Q_amb_hot Q_ext_heat, Q_ext_cold, penalty, obj = _build_hpr_accounting( work=float(w_net), Q_ext_heat=np.abs(H_cold_with_amb[0]) - np.asarray(Q_cond_total, dtype=float).sum(), Q_ext_cold=np.abs(H_hot_with_amb[-1]) - np.asarray(Q_evap_total, dtype=float).sum(), args=args, penalty_terms=penalty_terms, penalise_external_cold_when_refrigerating=True, ) hpr_streams = get_carnot_hpr_cycle_streams( state.T_cond, Q_cond_total, state.T_evap, Q_evap_total, args, ) debug_figure = None if debug: debug_figure = plot_multi_hp_profiles_from_results( args.T_hot, H_hot_with_amb, args.T_cold, H_cold_with_amb, hpr_streams.get_hot_utility_streams(), hpr_streams.get_cold_utility_streams(), title=( f"Obj {obj:.5f} = {(float(w_net) / args.Q_hpr_target):.5f} + " f"{(Q_ext_heat / args.Q_hpr_target):.5f} + " f"{(Q_ext_cold / args.Q_hpr_target):.5f} + " f"{(penalty / args.Q_hpr_target):.5f}" ), idx=args.idx, ) return HPRBackendResult( obj=obj, utility_tot=float(w_net + Q_ext_heat + Q_ext_cold), w_net=float(w_net), w_hpr=w_hpr, w_he=w_he, heat_recovery=heat_recovery, Q_ext_heat=Q_ext_heat, Q_ext_cold=Q_ext_cold, Q_amb_hot=state.Q_amb_hot, Q_amb_cold=state.Q_amb_cold, cop_h=cop_h, eta_he=eta_he, T_cond=state.T_cond, T_evap=state.T_evap, Q_cond=Q_cond_total if Q_cond is None else Q_cond, Q_evap=Q_evap_total if Q_evap is None else Q_evap, Q_cond_he=Q_cond_he, Q_evap_he=Q_evap_he, artifacts=HPRThermoArtifacts( hpr_streams=hpr_streams, debug_figure=debug_figure ), )
[docs] def evaluate_vapour_hpr_result( *, args: HeatPumpTargetInputs, state: HPRParsedState, work: float, work_arr: float | list | np.ndarray, Q_heat: np.ndarray, Q_cool: np.ndarray, cop_h: float, hpr_streams: StreamCollection, model: Any = None, penalty_terms: Any = None, dT_subcool: np.ndarray | None = None, dT_superheat: np.ndarray | None = None, debug: bool = False, ) -> HPRBackendResult: """Shared simulated-vapour accounting, plotting, and result assembly.""" H_hot_with_amb = args.H_hot + args.z_amb_hot * state.Q_amb_hot H_cold_with_amb = args.H_cold + args.z_amb_cold * state.Q_amb_cold pt_cond = get_process_heat_cascade( hot_streams=hpr_streams.get_hot_utility_streams(), cold_streams=args.bckgrd_cold_streams + get_ambient_air_stream(Q_amb_cold=state.Q_amb_cold, args=args), is_shifted=True, idx=args.idx, ) pt_evap = get_process_heat_cascade( hot_streams=args.bckgrd_hot_streams + get_ambient_air_stream(Q_amb_hot=state.Q_amb_hot, args=args), cold_streams=hpr_streams.get_cold_utility_streams(), is_shifted=True, idx=args.idx, ) Q_ext_heat, Q_ext_cold, penalty, obj = _build_hpr_accounting( work=float(work), Q_ext_heat=pt_cond[PT.H_NET][0], Q_ext_cold=pt_evap[PT.H_NET][-1], args=args, penalty_terms=np.concatenate( [ np.array( [pt_cond[PT.H_NET][-1], pt_evap[PT.H_NET][0]], dtype=float, ), np.asarray( [] if penalty_terms is None else penalty_terms, dtype=float ).reshape(-1), ] ), penalise_external_cold_when_refrigerating=True, ) debug_figure = None if debug: debug_figure = plot_multi_hp_profiles_from_results( args.T_hot, H_hot_with_amb, args.T_cold, H_cold_with_amb, hpr_streams.get_hot_utility_streams(), hpr_streams.get_cold_utility_streams(), title=( f"Obj {obj:.5f} = {(float(work) / args.Q_hpr_target):.5f} + " f"{(Q_ext_heat / args.Q_hpr_target):.5f} + " f"{(Q_ext_cold / args.Q_hpr_target):.5f} + " f"{(penalty / args.Q_hpr_target):.5f}" ), idx=args.idx, ) return HPRBackendResult( obj=obj, utility_tot=float(work + Q_ext_heat + Q_ext_cold), w_net=float(work), w_hpr=work_arr, Q_ext_heat=Q_ext_heat, Q_ext_cold=Q_ext_cold, Q_amb_hot=state.Q_amb_hot, Q_amb_cold=state.Q_amb_cold, cop_h=float(cop_h), T_cond=state.T_cond, T_evap=state.T_evap, dT_subcool=dT_subcool, dT_superheat=dT_superheat, Q_heat=Q_heat, Q_cool=Q_cool, artifacts=HPRThermoArtifacts( hpr_streams=hpr_streams, model=model, debug_figure=debug_figure, ), )
[docs] def create_stream_collection_of_background_profile( T_vals: np.ndarray, H_vals: np.ndarray, ) -> StreamCollection: """Convert a temperature-enthalpy profile into piecewise stream segments.""" s = StreamCollection() T_vals = np.asarray(T_vals) H_vals = np.abs(np.asarray(H_vals)) if delta_vals(T_vals).min() < tol: raise ValueError("Infeasible temperature interval detected in _store_TSP_data") dh_vals = delta_vals(H_vals) dh_indices = np.argwhere(np.abs(dh_vals) > tol).flatten() for i in dh_indices: if dh_vals[i] > tol: s.add( Stream( t_supply=T_vals[i + 1], t_target=T_vals[i], heat_flow=dh_vals[i], ) ) elif -dh_vals[i] > tol: s.add( Stream( t_supply=T_vals[i], t_target=T_vals[i + 1], heat_flow=-dh_vals[i], ) ) return s
[docs] def get_Q_vals_at_T_hpr_from_bckgrd_profile( T_hpr: np.ndarray, T_vals: np.ndarray, H_vals: np.ndarray, *, is_cond: bool = True, ) -> np.ndarray: """Read stage duties from a background profile at proposed HPR temperatures.""" H_less_origin = np.interp(T_hpr, T_vals[::-1], H_vals[::-1]) H = ( np.concatenate((H_less_origin, np.array([0.0]))) if is_cond else np.concatenate((np.array([0.0]), H_less_origin)) ) temp = np.roll(H, -1) temp[-1] = 0 Q = H - temp Q_hx = Q[:-1] return np.where(Q_hx > 0.0, Q_hx, 0.0)
[docs] def compute_entropic_mean_temperature( T_arr: np.ndarray | list, Q_arr: np.ndarray | list, *, input_T_units: str = "C", ) -> float: """Return the entropic mean temperature for a distributed heat load.""" T_arr = np.asarray(T_arr, dtype=float) Q_arr = np.asarray(Q_arr, dtype=float) unit_offset = 273.15 if input_T_units == "C" else 0 if T_arr.var() < tol: return T_arr[0] + unit_offset S_tot = (Q_arr / (T_arr + unit_offset)).sum() return Q_arr.sum() / S_tot if S_tot > 0 else (T_arr.mean() + unit_offset)
[docs] def calc_carnot_heat_pump_cop( T_h: float | np.ndarray, T_l: float | np.ndarray, eta_ii: float, ) -> float | np.ndarray: """Compute a Carnot-based heating COP with a second-law efficiency factor.""" T_h_arr = np.asarray(T_h, dtype=float) T_l_arr = np.asarray(T_l, dtype=float) delta_T = T_h_arr - T_l_arr with np.errstate(divide="ignore", invalid="ignore"): cop = np.where( delta_T > 0.0, (T_l_arr / delta_T) * eta_ii + 1.0, np.inf, ) return cop.item() if cop.ndim == 0 else cop
[docs] def calc_carnot_heat_engine_eta( T_h: float | np.ndarray, T_l: float | np.ndarray, eta_ii: float, ) -> float | np.ndarray: """Compute a Carnot-based heat-engine efficiency with a second-law factor.""" T_h_arr = np.asarray(T_h, dtype=float) T_l_arr = np.asarray(T_l, dtype=float) eta = np.where( T_h_arr > T_l_arr, (1 - T_l_arr / T_h_arr) * eta_ii, 0.0, ) return eta.item() if eta.ndim == 0 else eta
[docs] def validate_vapour_hp_refrigerant_ls( num_stages: int, args: HeatPumpTargetInputs, ) -> list: """Return one refrigerant name per vapour-compression stage.""" if len(args.refrigerant_ls) > 0: if args.do_refrigerant_sort: refrigerants = [ ref for ref, _ in sorted( ((ref, PropsSI("Tcrit", ref)) for ref in args.refrigerant_ls), key=lambda x: x[1], reverse=True, ) ] else: refrigerants = args.refrigerant_ls if num_stages <= len(refrigerants): return refrigerants[:num_stages] padding = [refrigerants[-1]] * (num_stages - len(refrigerants)) return refrigerants + padding return ["water" for _ in range(num_stages)]
[docs] def get_carnot_hpr_cycle_streams( T_cond: np.ndarray, Q_cond: np.ndarray, T_evap: np.ndarray, Q_evap: np.ndarray, args: HeatPumpTargetInputs, ) -> StreamCollection: """Build one combined HPR utility stream collection for Carnot-cycle summaries.""" dt_phase_change = float(getattr(args, "dt_phase_change", 1.0)) return _build_latent_streams( T_cond, dt_phase_change, Q_cond, is_hot=True ) + _build_latent_streams(T_evap, dt_phase_change, Q_evap, is_hot=False)
[docs] def get_ambient_air_stream( Q_amb_hot: float = 0.0, Q_amb_cold: float = 0.0, args: HeatPumpTargetInputs = None, ) -> StreamCollection: """Build ambient-air exchange streams implied by the solved HPR result.""" sc = StreamCollection() if Q_amb_hot > tol: sc += _build_latent_streams( T_ls=np.array([args.T_env]), dT_phase_change=args.dt_phase_change, Q_ls=np.array([Q_amb_hot]), dt_cont=args.dt_env_cont, is_hot=True, is_process_stream=True, prefix="AIR", ) if Q_amb_cold > tol: sc += _build_latent_streams( T_ls=np.array([args.T_env]), dT_phase_change=args.dt_phase_change, Q_ls=np.array([Q_amb_cold]), dt_cont=args.dt_env_cont, is_hot=False, is_process_stream=True, prefix="AIR", ) return sc
[docs] def calc_hpr_obj( work: float, Q_ext_heat: float, Q_ext_cold: float, Q_hpr_target: float, heat_to_power_ratio: float = 1.0, cold_to_power_ratio: float = 0.0, penalty: float = 0.0, ) -> float: """Return the scalar screening objective used by HPR placement solvers.""" return ( work + (Q_ext_heat * heat_to_power_ratio) + (Q_ext_cold * cold_to_power_ratio) + penalty ) / Q_hpr_target
################################################################################ # Helper Functions ################################################################################ def _build_latent_streams( T_ls: np.ndarray, dT_phase_change: float, Q_ls: np.ndarray, *, dt_cont: float = 0.0, is_hot: bool = True, is_process_stream: bool = False, prefix: str = "HP", ) -> StreamCollection: if len(T_ls) > 1: T_ls, Q_ls = _get_carnot_hpr_cycle_cascade_profile( T_ls.tolist(), Q_ls.tolist(), dT_phase_change, is_hot ) sc = StreamCollection() for i in range(len(Q_ls)): sc.add( Stream( name=f"{prefix}_H{i + 1}" if is_hot else f"{prefix}_C{i + 1}", t_supply=T_ls[i] if is_hot else T_ls[i] - dT_phase_change, t_target=T_ls[i] - dT_phase_change if is_hot else T_ls[i], heat_flow=Q_ls[i], dt_cont=dt_cont, is_process_stream=is_process_stream, ) ) return sc def _get_carnot_hpr_cycle_cascade_profile( T_hpr: list, Q_hpr: list, dT_phase_change: float, is_hot: bool, i: int = None, ) -> Tuple[np.ndarray, np.ndarray]: inc = 1 if is_hot else -1 if i is None: i = 0 if is_hot else len(T_hpr) i_range = range(i, len(T_hpr) - 1) if is_hot else reversed(range(1, i)) for i in i_range: if abs(T_hpr[i] - T_hpr[i + inc]) < dT_phase_change: T_hpr.pop(i + inc) Q_hpr[i] += Q_hpr[i + inc] Q_hpr.pop(i + inc) T_hpr, Q_hpr = _get_carnot_hpr_cycle_cascade_profile( T_hpr, Q_hpr, dT_phase_change, is_hot, i, ) break return T_hpr, Q_hpr def _append_unspecified_final_cascade_cooling_duty(Q_cool: np.ndarray) -> np.ndarray: if Q_cool.size == 0: return np.array(np.nan) return np.concatenate([Q_cool, np.array([np.nan])]) def _get_hpr_cascade( hot_streams: StreamCollection, cold_streams: StreamCollection, *, idx: int = None, ) -> Tuple[np.ndarray, np.ndarray, np.ndarray]: idx = idx or 0 pt = create_problem_table_with_t_int( streams=hot_streams + cold_streams, is_shifted=False, idx=idx, ) pt.update( **get_utility_heat_cascade( pt[PT.T], hot_streams, cold_streams, is_shifted=False, idx=idx, ) ) return pt[PT.T], pt[PT.H_HOT_UT], pt[PT.H_COLD_UT]