"""High-level convenience wrapper around the OpenPinch targeting service."""
from __future__ import annotations
import math
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from copy import deepcopy
from pathlib import Path
from typing import Any, Callable, Dict, Optional
import pandas as pd
from pydantic import ValidationError, warnings
from ..lib.schemas.io import TargetInput, TargetOutput
from ..lib.schemas.targets import BaseTargetModel
from ..resources import list_sample_cases, read_sample_case
from ..services import data_preprocessing_service
from ..services.input_data_processing._canonicalization import canonical_problem_inputs
from ..streamlit_webviewer.web_graphing import (
render_streamlit_dashboard as _render_streamlit_dashboard,
)
from ..utils.csv_to_json import get_problem_from_csv
from ..utils.export import (
build_summary_dataframe,
export_target_summary_to_excel_with_units,
)
from ..utils.miscellaneous import get_state_index
from ..utils.multiscale_targeting import (
extract_results,
get_targets_for_zone_and_sub_zones,
)
from ..utils.wkbook_to_json import get_problem_from_excel
from ._problem import (
JsonDict,
PathLike,
_LoadedProblemSource,
_PlotAccessorDescriptor,
_ProblemSourceAdapters,
_TargetAccessorDescriptor,
_validate_problem_semantics,
build_graph_payload,
build_problem_summary_frame,
load_problem_source,
prepare_in_memory_problem_source,
)
from ._problem import (
format_schema_validation_error as _format_schema_validation_error,
)
from ._problem import (
locate_summary_row as _locate_summary_row,
)
from .stream_collection import StreamCollection
from .zone import Zone
ZoneService = Callable[["Zone"], "Zone"]
[docs]
class PinchProblem:
"""Typed orchestrator for loading input data and running targeting."""
results_dir: Optional[Path]
_problem_filepath: Optional[Path]
_problem_data: Optional[JsonDict | TargetInput]
_project_name: str
_results: Optional[TargetOutput]
_validated_data: Optional[TargetInput]
_master_zone: Optional["Zone"]
_input_source_kind: str
_validation_context: Optional[dict[str, list[dict[str, Any]]]]
plot = _PlotAccessorDescriptor()
target = _TargetAccessorDescriptor()
def __init__(
self,
source: (
TargetInput | JsonDict | PathLike | tuple[PathLike, PathLike] | None
) = None,
*,
project_name: Optional[str] = "Site",
) -> None:
self._project_name = project_name
self._input_source_kind = "unknown"
self._validation_context = None
self._problem_filepath = None
self._problem_data = None
self._results = None
self._validated_data = None
self._master_zone = None
self.results_dir = None
if source is not None:
self.load(source=source)
[docs]
def load(
self,
source: (
TargetInput | JsonDict | PathLike | tuple[PathLike, PathLike] | None
) = None,
) -> Optional[Zone]:
"""Load problem inputs from JSON, Excel, CSV, or an in-memory object."""
if source is None:
if self.problem_filepath is None:
return None
source = Path(self.problem_filepath)
loaded_source = load_problem_source(
source,
current_project_name=self._project_name,
adapters=self._problem_source_adapters(),
)
self._apply_loaded_source(loaded_source)
return self._rebuild_problem_state()
def _run_targeting_for_zone_and_subzones(
self,
zone: Optional[Zone] = None,
direct_service_func: Optional[ZoneService] = None,
indirect_service_func: Optional[ZoneService] = None,
options: Optional[dict[str, Any]] = None,
sid: str = None,
) -> TargetOutput:
"""Run the targeting analysis against the loaded input and cache the result."""
if not isinstance(zone, Zone):
zone = self._build_execution_master_zone()
runtime_options, sid = self._resolve_runtime_state_options(
options,
zone=zone,
)
get_targets_for_zone_and_sub_zones(
zone=zone,
direct_service_func=direct_service_func,
indirect_service_func=indirect_service_func,
args=runtime_options,
)
self._results = TargetOutput.model_validate(extract_results(zone, state_id=sid))
return self._results
def _execute_targeting(
self,
*,
target_id: str,
application_zone: Optional[str | Zone],
options: Optional[dict[str, Any]],
include_subzones: bool,
direct_service_func: Optional[ZoneService] = None,
indirect_service_func: Optional[ZoneService] = None,
sid: str = None,
) -> BaseTargetModel:
execution_master_zone = self._build_execution_master_zone()
runtime_options, sid = self._resolve_runtime_state_options(
options,
zone=execution_master_zone,
)
zone = self._resolve_target_zone(
application_zone, master_zone=execution_master_zone
)
if include_subzones:
self._run_targeting_for_zone_and_subzones(
zone=zone,
direct_service_func=direct_service_func,
indirect_service_func=indirect_service_func,
options=runtime_options,
sid=sid,
)
else:
if direct_service_func is not None:
direct_service_func(zone, runtime_options)
if indirect_service_func is not None:
indirect_service_func(zone, runtime_options)
self._results = TargetOutput.model_validate(
extract_results(execution_master_zone, state_id=sid)
)
try:
return zone.targets[target_id]
except KeyError as exc:
raise RuntimeError(
f"Targeting did not produce target {target_id!r} "
f"for zone {zone.name!r}."
) from exc
def _execute_cogeneration_targeting(
self,
*,
application_zone: Optional[str | Zone],
options: Optional[dict[str, Any]],
include_subzones: bool,
service_func: Optional[ZoneService] = None,
sid: str = None,
) -> BaseTargetModel:
"""Run cogeneration targeting and return the family selected at runtime."""
execution_master_zone = self._build_execution_master_zone()
runtime_options, sid = self._resolve_runtime_state_options(
options,
zone=execution_master_zone,
)
zone = self._resolve_target_zone(
application_zone, master_zone=execution_master_zone
)
if include_subzones:
self._run_targeting_for_zone_and_subzones(
zone=zone,
direct_service_func=service_func,
options=runtime_options,
sid=sid,
)
else:
if service_func is not None:
service_func(zone, runtime_options)
self._results = TargetOutput.model_validate(
extract_results(execution_master_zone, state_id=sid)
)
selected_target_type = getattr(zone, "_selected_cogeneration_target_type", None)
if not isinstance(selected_target_type, str):
raise RuntimeError(
"Cogeneration did not select a compatible target "
f"for zone {zone.name!r}."
)
try:
return zone.targets[selected_target_type]
except KeyError as exc:
raise RuntimeError(
"Cogeneration selected target "
f"{selected_target_type!r} for zone {zone.name!r}, "
"but that target was not available on the zone."
) from exc
def _resolve_target_zone(
self,
application_zone: Optional[str] = None,
*,
master_zone: Optional["Zone"] = None,
) -> "Zone":
selected_master_zone = master_zone or self._master_zone
if selected_master_zone is None:
raise RuntimeError("Load problem source data first before targeting.")
if isinstance(application_zone, Zone):
return application_zone
if application_zone is None:
return selected_master_zone
return selected_master_zone.get_subzone(application_zone)
def _build_execution_master_zone(self) -> "Zone":
if self._problem_data is None and self._master_zone is None:
raise RuntimeError("No input loaded. Call load(...) first.")
if self._master_zone is None:
self.load(self._problem_data)
return self._master_zone
@property
def state_ids(self) -> dict[str, int]:
"""Return the canonical ``state_id -> idx`` lookup for the loaded problem."""
master_zone = self._require_prepared_root_zone()
return master_zone.state_ids
[docs]
def target_all_states(
self,
*,
parallel: bool | str = False,
max_workers: int | None = None,
preserve_cached_results: bool = True,
) -> dict[str, TargetOutput]:
"""Run default targeting once per canonical state id.
Parameters
----------
parallel:
``False`` runs serially. ``True`` and ``"process"`` use a process pool,
while ``"thread"`` uses a thread pool which is suitable for no-GIL
Python builds.
max_workers:
Optional executor worker limit for parallel runs.
preserve_cached_results:
Restore the original ``results`` cache after the batch run when ``True``.
"""
state_ids = list(self.state_ids.keys())
if not state_ids:
raise ValueError("This problem has no canonical state_ids to target.")
previous_results = self._results
try:
if parallel in (False, None):
results_by_requested_state = {
state_id: self._solve_target_for_state(state_id)
for state_id in state_ids
}
return self._order_state_results(
state_ids=state_ids,
results_by_requested_state=results_by_requested_state,
)
return self._target_all_states_parallel(
state_ids=state_ids,
backend="thread" if parallel == "thread" else "process",
max_workers=max_workers,
)
finally:
if preserve_cached_results:
self._results = previous_results
def _solve_target_for_state(self, state_id: str) -> TargetOutput:
result = self.target(state_id=state_id)
return TargetOutput.model_validate(result.model_dump(mode="python"))
def _resolve_runtime_state_options(
self,
options: Optional[dict[str, Any]],
*,
zone: "Zone",
) -> tuple[dict[str, Any], str | None, int]:
runtime_options = dict(options or {})
idx, sid = get_state_index(state_ids=zone.state_ids, args=runtime_options)
runtime_options["idx"] = idx
if sid is not None:
runtime_options["state_id"] = sid
return runtime_options, sid
def _state_result_key(
self,
result: TargetOutput,
*,
requested_state_id: str,
) -> str:
return (
str(result.state_id) if result.state_id is not None else requested_state_id
)
def _order_state_results(
self,
*,
state_ids: list[str],
results_by_requested_state: dict[str, TargetOutput],
) -> dict[str, TargetOutput]:
ordered_results: dict[str, TargetOutput] = {}
for requested_state_id in state_ids:
result = results_by_requested_state[requested_state_id]
ordered_results[
self._state_result_key(
result,
requested_state_id=requested_state_id,
)
] = result
return ordered_results
def _target_all_states_parallel(
self,
*,
state_ids: list[str],
backend: str,
max_workers: int | None,
) -> dict[str, TargetOutput]:
problem_inputs = self.canonical_problem_json()
executor_cls = (
ThreadPoolExecutor if backend == "thread" else ProcessPoolExecutor
)
results_by_requested_state: dict[str, TargetOutput] = {}
with executor_cls(max_workers=max_workers) as executor:
futures = {
executor.submit(
_solve_default_target_for_state,
problem_inputs,
self.project_name,
state_id,
): state_id
for state_id in state_ids
}
for future in as_completed(futures):
state_id = futures[future]
results_by_requested_state[state_id] = TargetOutput.model_validate(
future.result()
)
return self._order_state_results(
state_ids=state_ids,
results_by_requested_state=results_by_requested_state,
)
[docs]
def validate(self) -> TargetInput:
"""Validate the currently loaded problem data without running targeting."""
if self._problem_data is None:
raise RuntimeError("No input loaded. Call load(...) first.")
try:
input_data = TargetInput.model_validate(self._problem_data)
except ValidationError as exc:
raise ValueError(
_format_schema_validation_error(
exc,
problem_data=self._problem_data,
context=self._validation_context or {},
)
) from exc
_validate_problem_semantics(
input_data,
context=self._validation_context or {},
)
return input_data
[docs]
def summary_frame(self, *, detailed: bool = False) -> pd.DataFrame:
"""Return the solved target summary as a pandas DataFrame."""
results = self._results if self._results is not None else self.target()
if detailed:
return build_summary_dataframe(results.targets)
return build_problem_summary_frame(results, detailed=False)
[docs]
def export_excel(self, results_dir: Optional[PathLike] = None) -> Path:
"""Export the solved target summary and problem tables to an Excel file."""
if results_dir is not None:
self.results_dir = Path(results_dir)
if self.results_dir is None:
raise ValueError("No results_dir set. Provide a path to export results.")
if self._results is None:
self.target()
output_path = export_target_summary_to_excel_with_units(
target_response=self._results,
master_zone=self._master_zone,
out_dir=self.results_dir,
)
return Path(output_path)
[docs]
def compare_to(
self,
other_problem: "PinchProblem",
*,
target_name: Optional[str] = None,
base_label: str = "Base case",
other_label: str = "Scenario",
) -> pd.DataFrame:
"""Compare the compact summaries of two solved problems."""
base_frame = self.summary_frame()
other_frame = other_problem.summary_frame()
base_row = _locate_summary_row(base_frame, target_name=target_name)
other_row = _locate_summary_row(
other_frame,
target_name=target_name or str(base_row["Target"]),
)
columns = [
"Hot Utility Target",
"Cold Utility Target",
"Heat Recovery",
"Hot Pinch",
"Cold Pinch",
]
comparison = pd.DataFrame(
[
pd.Series({col: base_row.get(col) for col in columns}, name=base_label),
pd.Series(
{col: other_row.get(col) for col in columns},
name=other_label,
),
]
)
comparison.loc["Change"] = (
comparison.loc[other_label] - comparison.loc[base_label]
)
comparison.insert(0, "Target", str(base_row["Target"]))
comparison.loc["Change", "Target"] = str(base_row["Target"])
return comparison
def _data_preprocessing(self) -> "Zone":
if isinstance(self._validated_data, TargetInput) and isinstance(
self._project_name, str
):
return data_preprocessing_service(
input_data=self._validated_data,
project_name=self._project_name,
)
raise ValueError("No validated data load. Try ``load(source)``.")
@property
def problem_filepath(self) -> Optional[Path]:
"""Return the filepath of the problem that was loaded or supplied."""
return self._problem_filepath
@property
def problem_data(self) -> Optional[TargetInput | JsonDict]:
"""Return the raw problem definition that was loaded or supplied."""
return self._problem_data
@property
def results(self) -> Optional[TargetOutput]:
"""Return the cached targeting results, if targeting has been executed."""
return self._results
@property
def master_zone(self) -> Optional["Zone"]:
"""Return the prepared root zone after a successful ``load()`` pass."""
return self._master_zone
@property
def hot_streams(self) -> StreamCollection:
"""Hot process streams on the root analysis zone."""
return self._require_prepared_root_zone().hot_streams
@property
def cold_streams(self) -> StreamCollection:
"""Cold process streams on the root analysis zone."""
return self._require_prepared_root_zone().cold_streams
@property
def hot_utilities(self) -> StreamCollection:
"""Hot utility streams on the root analysis zone."""
return self._require_prepared_root_zone().hot_utilities
@property
def cold_utilities(self) -> StreamCollection:
"""Cold utility streams on the root analysis zone."""
return self._require_prepared_root_zone().cold_utilities
@property
def project_name(self) -> str:
"""Return the project label used for the root zone and exports."""
return self._project_name
@project_name.setter
def project_name(self, value: str):
"""Update the root project label and mirror it onto the loaded root zone."""
self._project_name = value
if isinstance(self._master_zone, Zone):
self._master_zone.name = value
[docs]
@classmethod
def from_json(cls, data: JsonDict) -> "PinchProblem":
"""Build from an in-memory mapping and apply the normal input cleaners."""
obj = cls()
obj._apply_loaded_source(
prepare_in_memory_problem_source(data, source_kind="in_memory")
)
return obj
[docs]
def to_problem_json(self, *, canonical: bool = False) -> JsonDict:
"""Return the currently loaded problem inputs."""
if self._problem_data is None:
raise RuntimeError(
"No problem_data available. Did you call load(...) or from_json(...)?"
)
if canonical:
return self._canonical_problem_inputs()
return self._problem_data
[docs]
def canonical_problem_json(self) -> JsonDict:
"""Return canonical mutable problem inputs with an explicit zone tree."""
return self.to_problem_json(canonical=True)
[docs]
def set_dt_cont_multiplier(
self,
value: float,
*,
zone_name: Optional[str] = None,
) -> Zone:
"""Update one zone-tree multiplier and rebuild the prepared analysis state."""
resolved_value = float(value)
if not math.isfinite(resolved_value) or resolved_value < 0.0:
warnings.warn(
"dt_cont_multiplier must be a finite non-negative value. "
"Used default value of 1.0 instead.",
UserWarning,
)
resolved_value = 1.0
self._master_zone.get_subzone(zone_name).dt_cont_multiplier = resolved_value
self._results = None # Clear cached results since multipliers have changed
return self._master_zone
[docs]
def update_options(
self,
options: Dict[str, Any],
*,
replace: bool = False,
) -> Zone:
"""Update the problem options in-place and rebuild the analysis state."""
if not isinstance(options, dict):
raise TypeError("options must be provided as a dict.")
problem_inputs = self.canonical_problem_json()
current_options = problem_inputs.get("options") or {}
problem_inputs["options"] = (
deepcopy(options)
if replace
else {**deepcopy(current_options), **deepcopy(options)}
)
self._replace_problem_inputs(problem_inputs)
return self._master_zone
def _canonical_problem_inputs(self) -> JsonDict:
"""Return canonical mutable problem inputs with an explicit zone tree."""
validated = self.validate()
return canonical_problem_inputs(validated, project_name=self.project_name)
def __repr__(self) -> str:
"""Return a compact summary of the source and cached result state."""
src = (
str(self._problem_filepath)
if self._problem_filepath is not None
else "<in-memory or CSV tuple>"
)
export = str(self.results_dir) if self.results_dir is not None else "<unset>"
has_results = "yes" if self._results is not None else "no"
return f"PinchProblem(source={src}, export={export}, results={has_results})"
[docs]
def show_dashboard(
self,
*,
zone: Optional["Zone"] = None,
graph_payload: Optional[Dict[str, Any]] = None,
page_title: Optional[str] = "OpenPinch Dashboard",
value_rounding: int = 2,
) -> None:
"""Launch the Streamlit dashboard for the analysed problem."""
active_zone = zone or self._master_zone
if active_zone is None:
raise RuntimeError(
"No analysed zone is available. Run target() before rendering."
)
payload = graph_payload
if payload is None and self._results is not None:
payload = build_graph_payload(self._results)
_render_streamlit_dashboard(
active_zone,
graph_payload=payload,
page_title=page_title,
value_rounding=value_rounding,
)
def _refresh_results_from_master_zone(self) -> TargetOutput:
if self._master_zone is None:
raise RuntimeError("No analysed zone is available. Run target() first.")
self._results = TargetOutput.model_validate(extract_results(self._master_zone))
return self._results
def _require_prepared_root_zone(self) -> Zone:
"""Return the prepared root zone, rebuilding it lazily when possible."""
if self._master_zone is None:
if self._problem_data is None:
raise RuntimeError("No input loaded. Call load(...) first.")
return self._rebuild_problem_state()
return self._master_zone
def _problem_source_adapters(self) -> _ProblemSourceAdapters:
"""Build source adapters lazily so tests can monkeypatch module symbols."""
return _ProblemSourceAdapters(
get_problem_from_excel=get_problem_from_excel,
get_problem_from_csv=get_problem_from_csv,
list_sample_cases=list_sample_cases,
read_sample_case=read_sample_case,
)
def _apply_loaded_source(self, loaded_source: _LoadedProblemSource) -> None:
"""Apply one normalized source bundle to this problem instance."""
self._problem_data = loaded_source.input_data
self._input_source_kind = loaded_source.source_kind
self._validation_context = loaded_source.validation_context
self._problem_filepath = loaded_source.problem_filepath
if loaded_source.project_name is not None:
self._project_name = loaded_source.project_name
def _rebuild_problem_state(self) -> Zone:
"""Revalidate, reconstruct the zone tree, and clear cached results."""
self._validated_data = self.validate()
self._master_zone = self._data_preprocessing()
self._results = None
return self._master_zone
def _replace_problem_inputs(self, problem_inputs: JsonDict) -> Zone:
"""Replace the current problem inputs and rebuild analysis state."""
current_filepath = self._problem_filepath
loaded_source = prepare_in_memory_problem_source(
problem_inputs,
source_kind=self._input_source_kind or "target_input",
)
self._apply_loaded_source(loaded_source)
self._problem_filepath = current_filepath
return self._rebuild_problem_state()
def _solve_default_target_for_state(
problem_inputs: JsonDict,
project_name: str,
state_id: str,
) -> dict[str, Any]:
problem = PinchProblem(source=problem_inputs, project_name=project_name)
result = problem.target(state_id=state_id)
return result.model_dump(mode="python")