"""Excel export utilities for OpenPinch targeting outputs."""
import re
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Iterable, Optional, Tuple
import pandas as pd
from ..streamlit_webviewer.web_graphing import problem_table_to_dataframe
from .miscellaneous import get_value
if TYPE_CHECKING:
from ..classes.zone import Zone
from ..lib.schemas.io import TargetOutput
__all__ = ["export_target_summary_to_excel_with_units"]
################################################################################
# Public API
################################################################################
[docs]
def export_target_summary_to_excel_with_units(
target_response: "TargetOutput",
master_zone: "Zone",
out_dir: str = ".",
) -> str:
"""Export solved targets and problem tables to an Excel workbook.
Parameters
----------
target_response:
Structured response returned by the high-level targeting service.
master_zone:
Solved zone hierarchy used to export shifted and real problem tables for
the master zone and all subzones. May be ``None`` when only the summary
sheet is required.
out_dir:
Destination directory for the workbook.
Returns
-------
str
Absolute or relative path to the workbook that was written.
Notes
-----
The workbook currently includes a summary sheet plus one or more problem-
table sheets. Value-with-unit objects are flattened into adjacent
``(value)`` and ``(unit)`` columns for easy review in Excel.
"""
df_summary = build_summary_dataframe(target_response.targets)
out_path = _compose_output_path(
project_name=getattr(target_response, "name", "Project"),
out_dir=out_dir,
)
with pd.ExcelWriter(out_path, engine="openpyxl") as xw:
_write_summary_sheet(df_summary, xw)
_write_problem_tables(master_zone, xw)
return str(out_path)
def build_summary_dataframe(targets) -> pd.DataFrame:
"""Convert ``TargetResults`` objects into a value/unit dataframe."""
rows = []
for target in targets:
rows.append(_make_summary_row(target))
return pd.DataFrame(rows)
################################################################################
# Helpers
################################################################################
def _split_vu(
x: Any,
state_id: str | None = None,
) -> Tuple[Optional[float], Optional[str]]:
"""Return ``(value, unit)`` for either a float or ValueWithUnit/None."""
if x is None:
return None, None
# If it's a pydantic model with attributes
if hasattr(x, "value") and hasattr(x, "unit"):
return x.value, x.unit
if hasattr(x, "unit"):
return get_value(x, state_id=state_id), x.unit
# plain number
try:
return get_value(x, state_id=state_id), None
except TypeError, ValueError:
return None, None
def _autosize_columns(df: pd.DataFrame, ws, start_col: int = 1, header_row: int = 1):
"""Best-effort column width: max(len(header), max len cell)."""
for offset, col in enumerate(df.columns):
i = start_col + offset
max_len = len(str(col))
column_values = df.iloc[:, offset]
for val in column_values:
text = "" if pd.isna(val) else str(val)
max_len = max(max_len, len(text))
ws.column_dimensions[
ws.cell(row=header_row, column=i).column_letter
].width = min(max_len + 2, 40)
def _safe_name(name: str) -> str:
"""Make a filesystem-safe project name (keep letters, numbers, - _ .)."""
name = name.strip()
name = re.sub(r"[\\/:*?\"<>|]+", "_", name) # replace forbidden characters
name = re.sub(r"\s+", "_", name) # spaces -> underscore
return name or "Project"
def _make_summary_row(t) -> dict:
state_id = getattr(t, "state_id", None)
cold_val, cold_unit = _split_vu(
getattr(t.temp_pinch, "cold_temp", None),
state_id=state_id,
)
hot_val, hot_unit = _split_vu(
getattr(t.temp_pinch, "hot_temp", None),
state_id=state_id,
)
Qh_val, Qh_unit = _split_vu(t.Qh, state_id=state_id)
Qc_val, Qc_unit = _split_vu(t.Qc, state_id=state_id)
Qr_val, Qr_unit = _split_vu(t.Qr, state_id=state_id)
deg_val, deg_unit = _split_vu(t.degree_of_integration, state_id=state_id)
base_columns = {
"Target": t.name,
"State ID": state_id,
"Cold Pinch (value)": cold_val,
"Cold Pinch (unit)": cold_unit,
"Hot Pinch (value)": hot_val,
"Hot Pinch (unit)": hot_unit,
"Qh (value)": Qh_val,
"Qh (unit)": Qh_unit,
"Qc (value)": Qc_val,
"Qc (unit)": Qc_unit,
"Qr (value)": Qr_val,
"Qr (unit)": Qr_unit,
"Degree of Integration (value)": deg_val,
"Degree of Integration (unit)": deg_unit,
}
utility_columns = _utility_columns(
t.hot_utilities,
t.cold_utilities,
state_id=state_id,
)
util_cost_val, util_cost_unit = _split_vu(t.utility_cost, state_id=state_id)
area_val, area_unit = _split_vu(t.area, state_id=state_id)
work_val, work_unit = _split_vu(t.work_target, state_id=state_id)
turb_eff_val, turb_eff_unit = _split_vu(
t.turbine_efficiency_target,
state_id=state_id,
)
ex_src_val, ex_src_unit = _split_vu(t.exergy_sources, state_id=state_id)
ex_sink_val, ex_sink_unit = _split_vu(t.exergy_sinks, state_id=state_id)
ex_req_val, ex_req_unit = _split_vu(t.exergy_req_min, state_id=state_id)
ex_des_val, ex_des_unit = _split_vu(t.exergy_des_min, state_id=state_id)
tail_columns = {
"Utility Cost (value)": util_cost_val,
"Utility Cost (unit)": util_cost_unit,
"Area (value)": area_val,
"Area (unit)": area_unit,
"Num Units": t.num_units,
"Capital Cost": t.capital_cost,
"Total Cost": t.total_cost,
"Work Target (value)": work_val,
"Work Target (unit)": work_unit,
"Turbine Eff Target (value)": turb_eff_val,
"Turbine Eff Target (unit)": turb_eff_unit,
"ETE": t.ETE,
"Exergy Sources (value)": ex_src_val,
"Exergy Sources (unit)": ex_src_unit,
"Exergy Sinks (value)": ex_sink_val,
"Exergy Sinks (unit)": ex_sink_unit,
"Exergy Req Min (value)": ex_req_val,
"Exergy Req Min (unit)": ex_req_unit,
"Exergy Des Min (value)": ex_des_val,
"Exergy Des Min (unit)": ex_des_unit,
}
return base_columns | utility_columns | tail_columns
def _utility_columns(
hot_utils: Optional[Iterable],
cold_utils: Optional[Iterable],
*,
state_id: str | None = None,
) -> dict:
"""Return flattened value/unit columns for the provided utilities."""
columns: dict[str, Any] = {}
def emit(utils):
for u in utils or []:
hf_val, hf_unit = _split_vu(u.heat_flow, state_id=state_id)
columns[f"{u.name} (value)"] = hf_val
columns[f"{u.name} (unit)"] = hf_unit
emit(hot_utils)
emit(cold_utils)
return columns
def _compose_output_path(project_name: str, out_dir: str) -> Path:
project = _safe_name(project_name)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"{project}_{timestamp}.xlsx"
output_dir = Path(out_dir)
output_dir.mkdir(parents=True, exist_ok=True)
return output_dir / filename
def _write_summary_sheet(df_summary: pd.DataFrame, writer: pd.ExcelWriter) -> None:
df_summary.to_excel(writer, sheet_name="Summary", index=False)
_autosize_columns(df_summary, writer.sheets["Summary"])
def _write_problem_tables(
master_zone: Optional["Zone"], writer: pd.ExcelWriter
) -> None:
"""Emit shifted and real temperature Problem Tables for every solved zone."""
if master_zone is None:
return
used_sheet_names: set[str] = set()
for zone in _iter_zones(master_zone):
for target_name, target in zone.targets.items():
table_specs = (
(f"{zone.name} - {target_name} (Shifted)", getattr(target, "pt", None)),
(
f"{zone.name} - {target_name} (Real)",
getattr(target, "pt_real", None),
),
)
for sheet_label, table in table_specs:
df = problem_table_to_dataframe(table, round_decimals=2)
if df.empty:
continue
sheet_name = _unique_sheet_name(sheet_label, used_sheet_names)
df.to_excel(
writer,
sheet_name=sheet_name,
index=False,
startcol=0,
startrow=2,
)
ws = writer.sheets[sheet_name]
ws["A1"] = target.name
_autosize_columns(df, ws, start_col=1, header_row=3)
def _iter_zones(zone: "Zone"):
"""Yield ``zone`` and all nested subzones depth-first."""
stack = [zone]
while stack:
current = stack.pop()
yield current
stack.extend(current.subzones.values())
def _unique_sheet_name(base: str, used: set[str]) -> str:
"""Return an Excel-safe, unique sheet name capped at 31 chars."""
cleaned = _sanitize_sheet_name(base)
candidate = cleaned[:31] or "Sheet"
if candidate not in used:
used.add(candidate)
return candidate
for idx in range(2, 1000):
suffix = f" ({idx})"
trimmed = (
candidate[: 31 - len(suffix)]
if len(candidate) + len(suffix) > 31
else candidate
)
alt = f"{trimmed}{suffix}"
if alt not in used:
used.add(alt)
return alt
raise ValueError("Unable to allocate unique sheet name.")
def _sanitize_sheet_name(name: str) -> str:
"""Replace Excel-forbidden sheet-name characters and trailing apostrophes."""
cleaned = re.sub(r"[:/?*\\\[\]]", "_", name).strip().rstrip("'")
return cleaned or "Sheet"