"""Generate piecewise-linear approximations for non-linear thermodynamic streams."""
from typing import List
import numpy as np
from scipy.optimize import NonlinearConstraint, minimize
from ..lib.schemas.io import NonLinearStream
__all__ = ["get_piecewise_linearisation_for_streams", "get_piecewise_data_points"]
################################################################################
# Public API
################################################################################
[docs]
def get_piecewise_linearisation_for_streams(
streams: List[NonLinearStream],
t_h_data: list,
dt_diff_max: float = 0.1,
) -> np.array:
"""Generate piecewise-linear T-H profiles for non-linear streams."""
if len(streams) != len(t_h_data):
raise ValueError(
"Piecewise linearisation failed due to a different number of "
"streams and temperature-enthalpy datasets."
)
return_data = {"t_h_points": []}
# Create and Linearize stream
for index, s in enumerate(streams):
is_hot_stream = s.t_supply > s.t_target
curve_points = t_h_data[index]
mask_points = get_piecewise_data_points(
curve=curve_points, dt_diff_max=dt_diff_max, is_hot_stream=is_hot_stream
)
return_data["t_h_points"] = mask_points.tolist()
return return_data
[docs]
def get_piecewise_data_points(
curve: list,
is_hot_stream: bool,
dt_diff_max: float = 0.1,
) -> np.array:
"""
Perform piecewise linearisation with the Ramer-Douglas-Peucker algorithm.
:param curve: Numpy array of plot points for th curve
:param dt_diff_max: Maximum allowed temperature differential tolerance
:returns: Numpy array of new curve points
"""
curve = np.array(curve)
try:
return _get_piecewise_breakpoints(
curve=curve, epsilon=dt_diff_max, is_hot_stream=is_hot_stream
)
except FloatingPointError, RuntimeError, ValueError:
try:
return _rdp(
curve=curve,
epsilon=dt_diff_max,
)
except (FloatingPointError, RuntimeError, ValueError) as exc:
raise ValueError("Piecewise linearisation failed.") from exc
################################################################################
# Helper functions
################################################################################
def _rdp(curve: np.array, epsilon: float) -> np.array:
"""
Linearize and simplify a curve using the Ramer-Douglas-Peucker (_rdp) algorithm.
:param curve: Array of points (N, 2).
:param epsilon: Maximum allowed perpendicular distance for simplification
:returns: Simplified array of points.
"""
n = len(curve)
indices = np.ones(n, dtype=bool)
stack = [[0, n - 1]]
while stack:
start, end = stack.pop()
dmax = 0.0
index = start
line_vector = curve[end] - curve[start]
line_length = np.linalg.norm(line_vector)
if line_length == 0:
continue
# Get point with max distance from line
for i in range(start + 1, end):
point_vector = curve[i] - curve[start]
# Use a determinant form to avoid NumPy 2D cross deprecation.
cross_magnitude = (
line_vector[0] * point_vector[1] - line_vector[1] * point_vector[0]
)
distance = abs(cross_magnitude) / line_length
if distance > dmax:
dmax = distance
index = i
# Split if distance > epsilon (dt_diff_max)
if dmax > epsilon:
stack.append([start, index])
stack.append([index, end])
else:
indices[start + 1 : end] = False
return curve[indices]
def _refine_pw_points_for_heating_or_cooling(
curve: np.array, pw_points: np.array, eps_lb: float = 0.0, hot_stream: bool = True
) -> np.array:
"""
Refine a piecewise T-h approximation to preserve hot/cold stream integrity.
:param curve: Array of points (N, 2).
:param eps_lb: Maximum allowed hot or cold stream violation.
:param hot_stream: True if the stream is hot, False if the stream is cold.
:returns: Simplified array of points, maximum error.
"""
# Ensure the data is a numpy array
curve = np.flipud(curve)
pw_points = np.flipud(pw_points)
# Remove the first and last points because they are fixed
# Convert 2d array to 1d array
x0 = pw_points[1:-1].flatten()
# Get arguments for the optimisation
args = {
"first_point": pw_points[0],
"last_point": pw_points[-1],
}
def delta_pw_and_data(x, args):
"""Return the difference between the data and the piecewise points."""
# Reshape so the first half are x values and the second half are y values.
new_pw_points = np.vstack(
(args["first_point"], x.reshape(-1, 2), args["last_point"])
)
# Interpolate the piecewise points to the original data points
int_points = np.interp(curve[:, 0], new_pw_points[:, 0], new_pw_points[:, 1])
# Find the difference between the data and the piecewise points
return int_points - curve[:, 1]
# Define the constraints for the optimisation
if hot_stream:
def con(x):
return np.max(delta_pw_and_data(x, args))
nlc = NonlinearConstraint(con, -np.inf, eps_lb)
else:
def con(x):
return np.min(delta_pw_and_data(x, args))
nlc = NonlinearConstraint(con, -eps_lb, np.inf)
def obj(x, args):
"""Return the L2 norm between the data and the piecewise points."""
return np.sum(np.square(delta_pw_and_data(x, args)))
# Perform the optimisation
res = minimize(fun=obj, x0=x0, constraints=nlc, args=args, method="SLSQP", tol=1e-6)
refined_pw_points = np.vstack(
(args["first_point"], res.x.reshape(-1, 2), args["last_point"])
)
return np.flipud(refined_pw_points), np.max(np.abs(delta_pw_and_data(res.x, args)))
def _get_piecewise_breakpoints(
curve: np.array, epsilon: float, is_hot_stream: bool = True
) -> np.array:
"""
Get piecewise breakpoints using RDP plus an integrity refinement step.
:param curve: Array of points (N, 2).
:param epsilon: Maximum allowed perpendicular distance for simplification.
:param hot_stream: True if the stream is hot, False if the stream is cold.
:returns: Simplified array of breakpoints that define the piecewise linearisation.
"""
for _ in range(10):
pw_points = _rdp(curve, epsilon=epsilon)
if len(pw_points) > 10:
pw_points, max_err = _refine_pw_points_for_heating_or_cooling(
curve, pw_points, epsilon / 10, is_hot_stream
)
else:
break
if max_err > epsilon:
epsilon = epsilon * 0.9
else:
break
return pw_points