"""Unit-aware scalar and discrete-state value wrapper powered by Pint quantities."""
from __future__ import annotations
import re
from collections.abc import Mapping
from typing import Any
import numpy as np
from pint import UnitRegistry
from pint.errors import DimensionalityError
ureg = UnitRegistry()
try:
ureg.define("USD = [currency]")
except Exception:
pass
try:
ureg.define("NZD = [currency]")
except Exception:
pass
Q_ = ureg.Quantity # type: ignore
_SERIALIZED_SCALAR_KEYS = {"value", "unit"}
_SERIALIZED_STATEFUL_KEYS = {"values", "state_ids", "weights", "unit"}
def _is_value_with_unit(data: Any) -> bool:
"""Return ``True`` for objects that look like ``ValueWithUnit`` instances."""
return hasattr(data, "value") and hasattr(data, "unit")
def _is_bool_like(data: Any) -> bool:
"""Return ``True`` for bool scalars including numpy bools."""
return isinstance(data, (bool, np.bool_))
[docs]
class Value:
"""Thin wrapper around a Pint ``Quantity`` with serialization helpers."""
def __init__(self, data=None, unit: str = None):
"""Create a scalar or stateful value from ``data`` and an optional ``unit``."""
quantity, weights = self._coerce_input(data, unit)
self._set_storage(quantity)
self._weights = weights
@property
def value(self):
"""Return the scalar magnitude or per-state magnitudes for stateful values."""
if not self._is_stateful():
return self._quantity.magnitude[0]
return self._quantity.magnitude.copy()
@value.setter
def value(self, data):
"""Set the scalar magnitude or per-state magnitudes in-place."""
if self._is_stateful():
magnitudes = self._coerce_magnitude_array(
data,
expected_len=len(self.state_values),
allow_scalar_broadcast=True,
label="value",
)
else:
if _is_bool_like(data):
raise TypeError("Boolean values are not supported.")
magnitudes = np.asarray([data], dtype=float)
self._set_storage(Q_(magnitudes, self._quantity.units))
@property
def min(self) -> "Value":
"""Return the minimum stored magnitude as a scalar ``Value``."""
return self._summary_value(np.min(self._quantity.magnitude))
@property
def max(self) -> "Value":
"""Return the maximum stored magnitude as a scalar ``Value``."""
return self._summary_value(np.max(self._quantity.magnitude))
@property
def mean(self) -> "Value":
"""Return the arithmetic mean stored magnitude as a scalar ``Value``."""
return self._summary_value(np.mean(self._quantity.magnitude))
@property
def weighted_mean(self) -> "Value":
"""Return the weighted mean stored magnitude as a scalar ``Value``."""
return self._summary_value(
np.average(self._quantity.magnitude, weights=self.weights)
)
@property
def median(self) -> "Value":
"""Return the median stored magnitude as a scalar ``Value``."""
return self._summary_value(np.median(self._quantity.magnitude))
@property
def state_values(self) -> np.ndarray:
"""Return the raw numpy magnitudes for each stored state."""
return self._quantity.magnitude.copy()
@property
def weights(self) -> np.ndarray:
"""Return optional passive state weights carried with this value."""
return self._weights
@property
def num_states(self) -> int:
"""Return the number of stored magnitudes."""
return len(self._quantity.magnitude)
@property
def unit(self):
"""Return the unit in a human-friendly compact representation."""
return self._format_units(self._quantity.units)
@unit.setter
def unit(self, unit_str):
"""Convert the stored quantity to ``unit_str`` in-place."""
self._set_storage(self._quantity.to(self._normalise_unit_input(unit_str)))
[docs]
def to(self, new_unit: str) -> "Value":
"""Return a copy converted to ``new_unit``."""
new_value = self._from_quantity(
self._quantity.to(self._normalise_unit_input(new_unit)),
)
new_value._weights = self._weights.copy() if self._weights is not None else None
return new_value
def __getitem__(self, idx):
"""Return one selected state as an independent ``Value``."""
if idx is None or not self._is_stateful():
return Value(self)
if isinstance(idx, slice):
subset = self._quantity.magnitude[idx]
result = Value(subset, unit=self.unit)
if self._weights is not None:
result._weights = np.asarray(self._weights[idx], dtype=float).reshape(
-1
)
return result
resolved_idx = self._resolve_state_index(idx)
return self._from_quantity(
Q_(
np.asarray([self._quantity.magnitude[resolved_idx]], dtype=float),
self._quantity.units,
)
)
def __iter__(self):
if not self._is_stateful():
raise TypeError("Scalar Value is not iterable.")
return iter(self._quantity.magnitude)
def __setitem__(self, idx, value):
resolved_idx = self._resolve_state_index(idx)
values = self._quantity.magnitude.copy()
values[resolved_idx] = self._coerce_scalar_magnitude(value)
self._set_storage(Q_(values, self._quantity.units))
def __len__(self):
"""Return the number of states stored."""
return len(self._quantity.magnitude)
def __str__(self):
return f"{self.value} {self.unit}"
def __repr__(self):
if not self._is_stateful():
return (
f"Value({self.value}, "
f"{repr(self._serialise_units(self._quantity.units))})"
)
return (
"Value("
f"values={self.state_values.tolist()}, "
f"unit={self._serialise_units(self._quantity.units)!r})"
)
def __float__(self):
if self._is_stateful():
raise TypeError("Cannot convert stateful Value to float.")
return float(self._quantity.magnitude[0])
def __int__(self):
if self._is_stateful():
raise TypeError("Cannot convert stateful Value to int.")
return int(self._quantity.magnitude[0])
def __round__(self, ndigits=None):
if self._is_stateful():
raise TypeError("Cannot round stateful Value.")
return round(self._quantity.magnitude[0], ndigits)
def __array__(self, dtype=None):
if self._is_stateful():
return np.asarray(self.state_values, dtype=dtype)
return np.asarray(float(self), dtype=dtype)
def __format__(self, format_spec):
return format(float(self), format_spec)
def __abs__(self):
return abs(float(self))
def __neg__(self):
return -float(self)
def __pos__(self):
return +float(self)
def __eq__(self, other):
try:
if self._is_numeric_scalar(other):
return np.all(self._quantity.magnitude == other)
if isinstance(other, Value):
return np.all(self.to(other.unit).value == other.value)
return False
except DimensionalityError, TypeError, ValueError:
return False
def __lt__(self, other):
try:
if self._is_numeric_scalar(other):
return np.all(self._quantity.magnitude < other)
if isinstance(other, Value):
return np.all(self.to(other.unit).value < other.value)
return False
except DimensionalityError, TypeError, ValueError:
return False
def __le__(self, other):
try:
if self._is_numeric_scalar(other):
return np.all(self._quantity.magnitude <= other)
if isinstance(other, Value):
return np.all(self.to(other.unit).value <= other.value)
return False
except DimensionalityError, TypeError, ValueError:
return False
def __gt__(self, other):
try:
if self._is_numeric_scalar(other):
return np.all(self._quantity.magnitude > other)
if isinstance(other, Value):
return np.all(self.to(other.unit).value > other.value)
return False
except DimensionalityError, TypeError, ValueError:
return False
def __ge__(self, other):
try:
if self._is_numeric_scalar(other):
return np.all(self._quantity.magnitude >= other)
if isinstance(other, Value):
return np.all(self.to(other.unit).value >= other.value)
return False
except DimensionalityError, TypeError, ValueError:
return False
def __add__(self, other):
if self._is_numeric_scalar(other):
return self._from_quantity(self._quantity + Q_(other, self._quantity.units))
return self._binary_operation(other, lambda left, right: left + right)
def __radd__(self, other):
if self._is_numeric_scalar(other):
return self._from_quantity(Q_(other, self._quantity.units) + self._quantity)
return self._binary_operation(
other, lambda left, right: left + right, reverse=True
)
def __sub__(self, other):
if self._is_numeric_scalar(other):
return self._from_quantity(self._quantity - Q_(other, self._quantity.units))
return self._binary_operation(other, lambda left, right: left - right)
def __rsub__(self, other):
if self._is_numeric_scalar(other):
return self._from_quantity(Q_(other, self._quantity.units) - self._quantity)
return self._binary_operation(
other, lambda left, right: left - right, reverse=True
)
def __mul__(self, other):
return self._binary_operation(other, lambda left, right: left * right)
def __rmul__(self, other):
return self._binary_operation(
other, lambda left, right: left * right, reverse=True
)
def __truediv__(self, other):
return self._binary_operation(other, lambda left, right: left / right)
def __rtruediv__(self, other):
return self._binary_operation(
other, lambda left, right: left / right, reverse=True
)
def _from_quantity(self, qty):
"""Build a new ``Value`` instance from a Pint quantity."""
instance = type(self).__new__(type(self))
instance._set_storage(qty)
instance._weights = self.weights
return instance
def _binary_operation(self, other, operator, reverse: bool = False):
left, right = (other, self) if reverse else (self, other)
left_qty, right_qty = self._align_operands(left, right)
result = operator(left_qty, right_qty)
return self._from_quantity(result)
def _align_operands(self, left, right):
left_value = left if isinstance(left, Value) else None
right_value = right if isinstance(right, Value) else None
left_qty = left_value._quantity if left_value is not None else Q_(left)
right_qty = right_value._quantity if right_value is not None else Q_(right)
if left_value is not None and right_value is not None:
if left_value._is_stateful() and right_value._is_stateful():
if len(left_value._quantity.magnitude) != len(
right_value._quantity.magnitude
):
raise ValueError(
"Stateful arithmetic requires identical state counts."
)
return left_qty, right_qty
def _resolve_state_index(self, idx: int | str | None) -> int:
if not self._is_stateful():
return 0
if idx is None:
return 0
if isinstance(idx, str):
try:
return int(idx)
except (TypeError, ValueError) as exc:
raise KeyError(idx) from exc
idx = int(idx)
if idx < 0 or idx >= self.num_states:
raise IndexError(idx)
if idx >= self.num_states:
idx = 0
return idx
def _is_stateful(self) -> bool:
return len(self._quantity.magnitude) > 1
def _set_storage(self, quantity) -> None:
magnitudes = np.array(quantity.magnitude, dtype=float, copy=True).reshape(-1)
if magnitudes.size == 0:
raise ValueError("Values cannot be empty.")
self._quantity = Q_(magnitudes, quantity.units)
def _summary_value(self, magnitude: float) -> "Value":
return self._from_quantity(
Q_(np.asarray([float(magnitude)], dtype=float), self._quantity.units)
)
def _coerce_scalar_magnitude(self, value) -> float:
if isinstance(value, Value):
return float(value.to(self.unit).value)
if hasattr(value, "units"):
quantity = Q_(value).to(self._quantity.units)
return float(np.asarray(quantity.magnitude, dtype=float).reshape(-1)[0])
return float(value)
def _coerce_input(
self,
data,
unit: str | None,
) -> tuple[Any, np.ndarray]:
payload = self._normalise_input_object(data)
if isinstance(payload, Value):
quantity = payload._quantity
weights = payload.weights
elif isinstance(payload, Mapping):
quantity, weights = self._coerce_mapping_input(payload, unit)
return quantity, weights
elif hasattr(payload, "units"):
quantity = payload
weights = None
elif self._is_array_like_input(payload):
quantity = self._quantity_from_values(payload, unit)
weights = None
elif _is_value_with_unit(payload):
quantity, weights = self._coerce_object_with_unit(payload, unit)
return quantity, weights
elif payload is None:
quantity = self._quantity_from_scalar(0.0, unit)
weights = None
else:
quantity = self._quantity_from_scalar(payload, unit)
weights = None
quantity = self._coerce_quantity_to_unit(quantity, unit)
return quantity, weights
def _normalise_input_object(self, data):
if hasattr(data, "model_dump") and not isinstance(data, Mapping):
return data.model_dump(mode="python")
return data
def _coerce_mapping_input(
self,
data: Mapping[Any, Any],
unit: str | None,
) -> tuple[Any, np.ndarray]:
if self._is_serialized_stateful_payload(data):
quantity = self._quantity_from_values(
data.get("values"),
data.get("unit") or unit,
)
weights = data.get("weights")
elif self._is_serialized_scalar_payload(data):
quantity = self._missing_or_zero_quantity(
data.get("value"),
data.get("unit") or unit,
)
weights = data.get("weights")
else:
quantity = self._quantity_from_values(list(data.values()), unit)
weights = None
return quantity, weights
def _coerce_object_with_unit(
self,
data,
unit: str | None,
) -> tuple[Any, np.ndarray]:
source_unit = getattr(data, "unit", None) or unit
if hasattr(data, "values"):
quantity = self._quantity_from_values(data.values, source_unit)
weights = getattr(data, "weights", None)
else:
quantity = self._missing_or_zero_quantity(
getattr(data, "value", None), source_unit
)
weights = getattr(data, "weights", None)
weights_arr = self._normalise_weights(
weights,
expected_len=np.asarray(quantity.magnitude, dtype=float).reshape(-1).size,
)
return quantity, weights_arr
def _quantity_from_scalar(self, data, unit: str | None):
if _is_bool_like(data):
raise TypeError("Boolean values are not supported.")
resolved_unit = self._normalise_unit_input(unit)
magnitude = np.asarray([data], dtype=float)
return Q_(magnitude, resolved_unit) if resolved_unit else Q_(magnitude)
def _missing_or_zero_quantity(self, data, unit: str | None):
resolved_unit = self._normalise_unit_input(unit)
magnitude = np.asarray([np.nan if data is None else data], dtype=float)
return Q_(magnitude, resolved_unit) if resolved_unit else Q_(magnitude)
def _quantity_from_values(self, data, unit: str | None):
values_list = list(data)
magnitudes = self._coerce_magnitude_array(
values_list,
expected_len=len(values_list),
label="values",
)
resolved_unit = self._normalise_unit_input(unit)
return Q_(magnitudes, resolved_unit) if resolved_unit else Q_(magnitudes)
def _coerce_quantity_to_unit(self, quantity, unit: str | None):
copied = Q_(
np.asarray(quantity.magnitude, dtype=float).reshape(-1),
quantity.units,
)
if unit is None:
return copied
resolved_unit = self._normalise_unit_input(unit)
try:
return copied.to(resolved_unit)
except DimensionalityError, TypeError, ValueError:
if self._quantity_is_dimensionless(copied) or self._same_dimensionality(
copied, resolved_unit
):
return Q_(
np.asarray(copied.magnitude, dtype=float).reshape(-1), resolved_unit
)
raise
def _coerce_magnitude_array(
self,
data,
*,
expected_len: int,
label: str,
allow_scalar_broadcast: bool = False,
) -> np.ndarray:
if _is_bool_like(data):
raise TypeError("Boolean values are not supported.")
if (
allow_scalar_broadcast
and np.isscalar(data)
and not isinstance(data, (str, bytes))
):
return np.full(expected_len, float(data), dtype=float)
if isinstance(data, (str, bytes)):
values = [data]
else:
try:
values = list(data)
except TypeError as exc:
raise TypeError(
f"{label} must be numeric scalar or 1-D array-like data."
) from exc
if not values:
raise ValueError(f"{label} cannot be empty.")
if any(_is_bool_like(value) for value in values):
raise TypeError("Boolean values are not supported.")
try:
magnitudes = np.asarray(values, dtype=float).reshape(-1)
except (TypeError, ValueError) as exc:
raise TypeError(f"{label} must contain numeric values.") from exc
if len(magnitudes) != expected_len:
raise ValueError(f"{label} length must match the number of states.")
return magnitudes
@staticmethod
def _is_numeric_scalar(other: Any) -> bool:
return isinstance(
other, (int, float, np.integer, np.floating)
) and not _is_bool_like(other)
@staticmethod
def _is_array_like_input(data: Any) -> bool:
if data is None or isinstance(data, (str, bytes, Mapping)):
return False
if _is_bool_like(data) or np.isscalar(data):
return False
try:
list(data)
except TypeError:
return False
return True
@staticmethod
def _is_serialized_scalar_payload(data: Mapping[Any, Any]) -> bool:
return set(data).issubset(_SERIALIZED_SCALAR_KEYS) and "value" in data
@staticmethod
def _is_serialized_stateful_payload(data: Mapping[Any, Any]) -> bool:
return set(data).issubset(_SERIALIZED_STATEFUL_KEYS) and ("values" in data)
def _format_units(self, units) -> str:
return (
format(units, "~").replace("USD", "$").replace("NZD", "$").replace(" ", "")
)
@staticmethod
def _serialise_units(units) -> str:
return (
format(units, "~")
.replace("°", "deg")
.replace("USD", "$")
.replace("NZD", "$")
.replace(" ", "")
)
@staticmethod
def _normalise_unit_input(unit: str | None) -> str | None:
if unit is None:
return None
text = str(unit).strip().replace("$", "USD")
if text in {"C", "°C"}:
return "degC"
if text == "degK":
return "K"
text = re.sub(r"(?<=[A-Za-z])2(?=($|[./*]))", "^2", text)
text = re.sub(r"(?<=[A-Za-z])3(?=($|[./*]))", "^3", text)
text = text.replace(".K", "/K").replace(".degC", "/degC")
return text
@staticmethod
def _quantity_is_dimensionless(quantity) -> bool:
return str(quantity.units) == "dimensionless"
@staticmethod
def _same_dimensionality(quantity, unit: str) -> bool:
try:
return quantity.dimensionality == Q_(1.0, unit).dimensionality
except Exception:
return False
[docs]
def to_dict(self):
"""Serialise the value into a JSON-friendly dictionary."""
if self._is_stateful():
return {
"values": self.state_values.tolist(),
"unit": self._serialise_units(self._quantity.units),
}
if np.isnan(self.value):
return {
"value": None,
"unit": self._serialise_units(self._quantity.units),
}
return {
"value": self.value,
"unit": self._serialise_units(self._quantity.units),
}
[docs]
@classmethod
def from_dict(cls, data):
"""Instantiate from a scalar or stateful serialized mapping."""
if not isinstance(data, Mapping):
raise TypeError("data must be a mapping.")
if cls._is_serialized_stateful_payload(data):
return cls(data)
return cls(data)