Source code for OpenPinch.classes.zone

"""Zone data structure capturing nested scopes and their thermal targets."""

import warnings
from typing import TYPE_CHECKING, Optional

import numpy as np

from ..lib.config import Configuration
from ..lib.enums import ZT
from ..lib.schemas.targets import BaseTargetModel
from .stream_collection import StreamCollection

if TYPE_CHECKING:
    from .stream import Stream
    from .zone import Zone


[docs] class Zone: """Hierarchical analysis boundary containing streams, utilities, and targets. Zones form the backbone of the in-memory OpenPinch model. Each zone can own process streams, utility streams, solved targets, generated graphs, and nested child zones. Direct and indirect integration routines progressively populate this structure as the analysis moves from local process scopes up to site-style aggregation. """ def __init__( self, name: str = "Zone", type: str = ZT.P.value, zone_config: Optional[Configuration] = None, parent_zone: "Zone" = None, ): """Initialise an empty zone with stream, target, and graph containers.""" # === Metadata === self._name = name self._type = type self._config = zone_config or Configuration() self._parent_zone = parent_zone self._dt_cont_multiplier = ( parent_zone.dt_cont_multiplier if hasattr(parent_zone, "dt_cont_multiplier") else 1.0 ) self._active_state_name = None self._lock_dt_cont_multiplier = False self._active = True if parent_zone is not None: self._state_ids = parent_zone._state_ids self._num_states = ( len(self._state_ids) if self._state_ids is not None else 1 ) self._weights = parent_zone._weights else: self._state_ids = ( {state_id: idx for idx, state_id in enumerate(self._config.STATE_IDS)} if isinstance(self._config.STATE_IDS, list | tuple) else {"0": 0} ) self._num_states = len(self._state_ids) if isinstance(self._config.WEIGHTS, np.ndarray | list): if len(self._config.WEIGHTS) == self._num_states: self._weights = self._config.WEIGHTS else: self._weights = np.ones(len(self._state_ids), dtype=float) self._subzones = {} self._targets = {} self._graphs = {} # === Streams & Utilities === self._hot_streams: StreamCollection = self._new_stream_collection() self._cold_streams: StreamCollection = self._new_stream_collection() self._net_hot_streams: StreamCollection = self._new_stream_collection() self._net_cold_streams: StreamCollection = self._new_stream_collection() self._hot_utilities: StreamCollection = self._new_stream_collection() self._cold_utilities: StreamCollection = self._new_stream_collection() # === Properties === @property def name(self): """Display name used when addressing the zone in the hierarchy.""" return self._name @name.setter def name(self, value): """Set the display name used for this zone and its address.""" self._name = value @property def type(self): """Zone type type from :class:`ZoneType`.""" return self._type @type.setter def type(self, value): """Set the zone classification used by hierarchical targeting logic.""" self._type = value @property def config(self): """Configuration object controlling analysis behaviour for this zone.""" return self._config @config.setter def config(self, value): """Replace the runtime configuration attached to this zone.""" self._config = value @property def parent_zone(self): """Direct parent zone in the site hierarchy, if any.""" return self._parent_zone @parent_zone.setter def parent_zone(self, value): """Attach the zone to a different parent in the hierarchy.""" self._parent_zone = value @property def active(self) -> bool: """Whether the zone participates in the current analysis.""" return bool(self._active) @active.setter def active(self, value: bool): """Activate or deactivate the zone for subsequent analysis passes.""" self._active = bool(value) @property def state_ids(self) -> dict[str, int] | None: """Canonical ``state_id -> idx`` lookup for this zone.""" return self._state_ids @property def weights(self): """Canonical state weights for this zone.""" return self._weights @property def num_states(self): """Number of distinct states for this zone.""" return self._num_states @property def address(self) -> str: """Slash-delimited path from the root zone to this zone.""" if self.parent_zone is None: return str(self.name) return f"{self.parent_zone.address}/{self.name}" @property def dt_cont_multiplier(self) -> float: """Effective multiplier applied to stream and utility ``dt_cont`` values.""" return self._dt_cont_multiplier @dt_cont_multiplier.setter def dt_cont_multiplier(self, value: float): """Set the active ``dt_cont`` multiplier when the zone is still mutable.""" for zone in self.subzones.values(): if not (zone._lock_dt_cont_multiplier): zone.dt_cont_multiplier = value self._dt_cont_multiplier = float(value) self.all_streams.set_common_stream_attribute("dt_cont_multiplier", value) self._targets.clear() @property def hot_streams(self): """Process streams that release heat within this zone.""" return self._hot_streams @hot_streams.setter def hot_streams(self, data): """Replace the zone hot-stream collection.""" self._hot_streams = self._attach_stream_collection(data) @property def cold_streams(self): """Process streams that require heat within this zone.""" return self._cold_streams @cold_streams.setter def cold_streams(self, data): """Replace the zone cold-stream collection.""" self._cold_streams = self._attach_stream_collection(data) @property def net_hot_streams(self): """Net hot streams derived from zonal aggregation.""" return self._net_hot_streams @net_hot_streams.setter def net_hot_streams(self, data): """Replace the aggregated net hot-stream collection.""" self._net_hot_streams = self._attach_stream_collection(data) @property def net_cold_streams(self): """Net cold streams derived from zonal aggregation.""" return self._net_cold_streams @net_cold_streams.setter def net_cold_streams(self, data): """Replace the aggregated net cold-stream collection.""" self._net_cold_streams = self._attach_stream_collection(data) @property def hot_utilities(self): """Hot utility streams assigned to the zone.""" return self._hot_utilities @hot_utilities.setter def hot_utilities(self, data): """Replace the zone hot-utility collection.""" self._hot_utilities = self._attach_stream_collection(data) @property def cold_utilities(self): """Cold utility streams assigned to the zone.""" return self._cold_utilities @cold_utilities.setter def cold_utilities(self, data): """Replace the zone cold-utility collection.""" self._cold_utilities = self._attach_stream_collection(data) @property def graphs(self): """Graphs generated for this zone.""" return self._graphs @graphs.setter def graphs(self, data): """Replace the graph payloads cached on this zone.""" self._graphs = data @property def subzones(self): """Immediate child zones keyed by name.""" return self._subzones @property def targets(self): """Energy targets keyed by target name.""" return self._targets @property def process_streams(self): """Combined hot and cold process streams for the zone.""" return self._hot_streams + self._cold_streams @property def net_process_streams(self): """Combined net hot and net cold process streams for the zone.""" return self._net_hot_streams + self._net_cold_streams @property def utility_streams(self): """Combined hot and cold utility streams for the zone.""" return self._hot_utilities + self._cold_utilities @property def all_streams(self): """All process and utility streams defined on the zone.""" return self.process_streams + self.utility_streams def _new_stream_collection(self) -> StreamCollection: collection = StreamCollection() collection.set_state_context(self._state_ids, self._weights, self._num_states) return collection def _attach_stream_collection( self, collection: StreamCollection ) -> StreamCollection: if not isinstance(collection, StreamCollection): raise TypeError( "Zone stream containers must be StreamCollection instances." ) if self._state_ids is not None: collection.set_state_context( self._state_ids, self._weights, self._num_states ) return collection def _propagate_state_context(self) -> None: for collection in ( self._hot_streams, self._cold_streams, self._net_hot_streams, self._net_cold_streams, self._hot_utilities, self._cold_utilities, ): collection.set_state_context( self._state_ids, self._weights, self._num_states ) for subzone in self._subzones.values(): subzone.set_state_context(self._state_ids, self._weights, self._num_states)
[docs] def set_state_context( self, state_ids: dict[str, int] | list[str] | tuple[str, ...] | None, weights, num_states: int | None, ) -> None: """Set the canonical state lookup owned by this zone and propagate refs.""" if state_ids is None: self._state_ids = None self._weights = None self._num_states = None else: self._state_ids = ( state_ids if isinstance(state_ids, dict) else {str(state_id): idx for idx, state_id in enumerate(state_ids)} ) if weights is None: self._weights = None elif hasattr(weights, "copy"): self._weights = weights else: self._weights = np.asarray(weights, dtype=float).reshape(-1) self._num_states = num_states self._propagate_state_context()
# === Methods ===
[docs] def add_graph(self, name: str, result): """Store a graph result under ``name`` for later export or display.""" self._graphs[name] = result
[docs] def add_zone(self, zone_to_add, sub: bool = True): """Add a single zone object keyed by its name. If the zone name already exists: - If the zone is identical (e.g. same stream and utility objects), skip. - If it's different, add it with a suffix like '_1', '_2', etc. """ base_name = getattr(zone_to_add, "name", None) if not isinstance(base_name, str): raise ValueError( "Zone must have a string 'name' attribute, got: " f"{type(base_name).__name__}" ) if sub: self._add_to_correct_zone_collection(zone_to_add, base_name, self._subzones) else: self._add_to_correct_zone_collection(zone_to_add, base_name, self._targets)
def _add_to_correct_zone_collection(self, zone_to_add, base_name, loc): existing = loc.get(base_name) if existing: if self._zone_is_equal(existing, zone_to_add): return # identical, skip adding else: # Add with counter suffix until unique counter = 1 new_name = f"{base_name}_{counter}" while new_name in loc: counter += 1 new_name = f"{base_name}_{counter}" zone_to_add.name = new_name loc[new_name] = zone_to_add else: loc[base_name] = zone_to_add
[docs] def add_target(self, target_to_add: BaseTargetModel): """Add one target to a specific zone.""" if isinstance(target_to_add, BaseTargetModel): self._targets[target_to_add.type] = target_to_add
[docs] def add_targets(self, targets: list = []): """Add multiple targets to a specific zone.""" for t in targets: self.add_target(t)
[docs] def get_subzone(self, loc: str = None) -> "Zone": """Resolve a slash-delimited zone path relative to this zone.""" zone = self if loc is None: return zone loc_address = loc.split("/", 1) if loc_address[0] == zone.name: loc_address.pop(0) if len(loc_address) == 0: return zone loc_address = loc_address[-1].split("/", 1) sub = loc_address[0] if sub in zone.subzones.keys(): if len(loc_address) == 1: return zone.subzones[sub] else: sub_loc = loc_address[-1] return zone.subzones[sub].get_subzone(sub_loc) else: warnings.warn(f"Subzone '{loc}' not found.") return None
[docs] def calc_utility_cost(self): """Calculate and cache the annual utility cost across assigned utilities.""" self._utility_cost = sum([u.ut_cost for u in self.utility_streams]) return self._utility_cost
def _zone_is_equal(self, zone1: "Zone", zone2: "Zone"): """Basic equality check between two zones. Customize as needed.""" return ( zone1._hot_streams == zone2._hot_streams and zone1._cold_streams == zone2._cold_streams and zone1._hot_utilities == zone2._hot_utilities and zone1._cold_utilities == zone2._cold_utilities )
[docs] def import_hot_and_cold_streams_from_sub_zones( self, get_net_streams: bool = False, is_n_zone_depth: bool = True, is_new_stream_collection: bool = True, ): """Get referenced hot and cold streams across multiple subzones.""" z: Zone s: Stream if not get_net_streams: if is_new_stream_collection: self._hot_streams = self._new_stream_collection() self._cold_streams = self._new_stream_collection() hs_dst = self._hot_streams cs_dst = self._cold_streams else: if is_new_stream_collection: self._net_hot_streams = self._new_stream_collection() self._net_cold_streams = self._new_stream_collection() hs_dst = self._net_hot_streams cs_dst = self._net_cold_streams for z in self.subzones.values(): if len(z.subzones) > 0 and is_n_zone_depth: z.import_hot_and_cold_streams_from_sub_zones(get_net_streams) if not get_net_streams: hs_src = z.hot_streams cs_src = z.cold_streams else: hs_src = z.net_hot_streams cs_src = z.net_cold_streams for s in hs_src: key = f"{z.name}.{s.name}" hs_dst.add(s, key) for s in cs_src: key = f"{z.name}.{s.name}" cs_dst.add(s, key)
[docs] def get_target_zone(self, zone_name: Optional[str | list]) -> "Zone": """Resolve ``zone_name`` to the concrete zone that should receive a target.""" if zone_name is None: return self resolved = str(zone_name).strip() if resolved == self.name: return self resolved = resolved.split("/", 1) if resolved[0] == self.name: resolved.pop(0) return self.get_subzone(resolved)
[docs] def lock_dt_cont_multiplier(self): """Lock the dt_cont_multiplier to prevent further changes.""" self._lock_dt_cont_multiplier = True self.all_streams.set_common_stream_attribute("dt_cont_multiplier_locked", True)