import logging as _logging
import pathlib as _pl
import time as _time
from collections import abc as _abc
from concurrent.futures import ProcessPoolExecutor, as_completed
from typing import List, Optional, Sequence, Union
import matplotlib.pyplot as _plt
import pandas as _pd
from pytrnsys_process import config as conf
from pytrnsys_process import log, util
from pytrnsys_process.process import data_structures as ds
from pytrnsys_process.process import process_sim as ps
[docs]
class UnableToProcessSimulationError(Exception):
"""Raised when a simulation cannot be processed."""
# pylint: disable=too-many-locals
def _process_batch(
sim_folders: list[_pl.Path],
processing_scenario: Union[
_abc.Callable[[ds.Simulation], None],
Sequence[_abc.Callable[[ds.Simulation], None]],
],
results_folder: _pl.Path,
parallel: bool = False,
max_workers: int | None = None,
) -> ds.SimulationsData:
"""Common processing logic for both sequential and parallel batch processing.
This internal function implements the core processing logic used by both sequential
and parallel processing modes. It handles the setup of processing infrastructure,
execution of processing tasks, and collection of results.
Parameters
__________
sim_folders:
List of simulation folders to process
processing_scenario:
Processing scenario(s) to apply to each simulation
results_folder:
Root folder containing all simulations
parallel:
Whether to process simulations in parallel
max_workers:
Maximum number of worker processes for parallel execution
Returns
_______
SimulationsData containing the processed simulation results and metadata
Note:
_____
This is an internal function that should not be called directly.
Use process_single_simulation, process_whole_result_set, or
process_whole_result_set_parallel instead.
"""
start_time = _time.time()
results = ds.ProcessingResults()
simulations_data = ds.SimulationsData(
path_to_simulations=results_folder.as_posix()
)
main_logger = log.get_main_logger(results_folder)
if parallel:
with ProcessPoolExecutor(max_workers=max_workers) as executor:
tasks = {}
for sim_folder in sim_folders:
main_logger.info(
"Submitting simulation folder for processing: %s",
sim_folder.name,
)
tasks[
executor.submit(
_process_simulation, sim_folder, processing_scenario
)
] = sim_folder
for future in as_completed(tasks):
try:
_handle_simulation_result(
future.result(), results, simulations_data
)
except Exception as e: # pylint: disable=broad-except
_handle_simulation_error(
e, tasks[future], results, main_logger
)
else:
for sim_folder in sim_folders:
try:
main_logger.info("Processing simulation: %s", sim_folder.name)
result = _process_simulation(sim_folder, processing_scenario)
_handle_simulation_result(result, results, simulations_data)
except Exception as e: # pylint: disable=broad-except
_handle_simulation_error(e, sim_folder, results, main_logger)
simulations_data = _concat_scalar(simulations_data)
_log_processing_results(results, main_logger)
end_time = _time.time()
execution_time = end_time - start_time
main_logger.info(
"%s execution time: %.2f seconds",
"Parallel" if parallel else "Total",
execution_time,
)
return simulations_data
def _handle_simulation_result(
result: tuple[ds.Simulation, List[str]],
results: ds.ProcessingResults,
simulations_data: ds.SimulationsData,
) -> None:
"""Handle the result of a processed simulation.
Parameters
__________
result: Tuple of (simulation, failed_scenarios)
sim_folder: Path to the simulation folder
results: ProcessingResults to update
simulations_data: SimulationsData to update
"""
simulation, failed_scenarios = result
results.processed_count += 1
simulations_data.simulations[_pl.Path(simulation.path).name] = simulation
if failed_scenarios:
results.failed_scenarios[_pl.Path(simulation.path).name] = (
failed_scenarios
)
def _handle_simulation_error(
error: Exception,
sim_folder: _pl.Path,
results: ds.ProcessingResults,
main_logger: _logging.Logger,
) -> None:
"""Handle an error that occurred during simulation processing.
Parameters
__________
error: The exception that occurred
sim_folder: Path to the simulation folder
results: ProcessingResults to update
"""
results.error_count += 1
results.failed_simulations.append(sim_folder.name)
main_logger.error(
"Failed to process simulation in %s: %s",
sim_folder,
str(error),
exc_info=True,
)
[docs]
def process_single_simulation(
sim_folder: _pl.Path,
processing_scenarios: Union[
_abc.Callable[[ds.Simulation], None],
Sequence[_abc.Callable[[ds.Simulation], None]],
],
) -> ds.Simulation:
"""Process a single simulation folder using the provided processing scenario(s).
Parameters
__________
sim_folder:
Path to the simulation folder to process
processing_scenarios:
Single callable or sequence of callables that implement
the processing logic for a simulation. Each callable should take a Simulation
object as its only parameter.
Returns
_______
Simulation object containing the processed data
Example
_______
>>> from process import data_structures >>> import pathlib as _pl
>>> from pytrnsys_process import api
...
>>> def processing_step_1(sim: data_structures.Simulation):
... # Process simulation data
... pass
>>> results = api.process_single_simulation(
... _pl.Path("path/to/simulation"),
... processing_step_1
... )
"""
main_logger = log.get_main_logger(sim_folder)
log.initialize_logs(sim_folder)
main_logger.info("Starting processing of simulation %s", sim_folder)
sim_folders = [sim_folder]
simulations_data = _process_batch(
sim_folders, processing_scenarios, sim_folder.parent
)
try:
return simulations_data.simulations[sim_folder.name]
except KeyError as exc:
raise UnableToProcessSimulationError(
f"Failed to process simulation in {sim_folder}"
) from exc
[docs]
def process_whole_result_set(
results_folder: _pl.Path,
processing_scenario: Union[
_abc.Callable[[ds.Simulation], None],
Sequence[_abc.Callable[[ds.Simulation], None]],
],
) -> ds.SimulationsData:
"""Process all simulation folders in a results directory sequentially.
Processes each simulation folder found in the results directory one at a time,
applying the provided processing scenario(s) to each simulation.
Parameters
__________
results_folder:
Path to the directory containing simulation folders.
Each subfolder should contain valid simulation data files.
processing_scenario:
Single callable or sequence of callables that implement
the processing logic for each simulation. Each callable should take a
Simulation object as its only parameter and modify it in place.
Returns
_______
SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
- monthly: Dict mapping simulation names to monthly DataFrame results
- hourly: Dict mapping simulation names to hourly DataFrame results
- scalar: DataFrame containing scalar/deck values from all simulations
Raises
______
ValueError: If results_folder doesn't exist or is not a directory
Exception: Individual simulation failures are logged but not re-raised
Example
_______
>>> import pathlib as _pl
>>> from pytrnsys_process import api
...
>>> def processing_step_1(sim):
... # Process simulation data
... pass
>>> def processing_step_2(sim):
... # Process simulation data
... pass
>>> results = api.process_whole_result_set(
... _pl.Path("path/to/results"),
... [processing_step_1, processing_step_2]
... )
"""
_validate_folder(results_folder)
main_logger = log.get_main_logger(results_folder)
log.initialize_logs(results_folder)
main_logger.info(
"Starting batch processing of simulations in %s", results_folder
)
sim_folders = [
sim_folder
for sim_folder in results_folder.iterdir()
if sim_folder.is_dir()
]
simulations_data = _process_batch(
sim_folders, processing_scenario, results_folder
)
util.save_to_pickle(
simulations_data,
results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
)
return simulations_data
[docs]
def process_whole_result_set_parallel(
results_folder: _pl.Path,
processing_scenario: Union[
_abc.Callable[[ds.Simulation], None],
Sequence[_abc.Callable[[ds.Simulation], None]],
],
max_workers: int | None = None,
) -> ds.SimulationsData:
"""Process all simulation folders in a results directory in parallel.
Uses a ProcessPoolExecutor to process multiple simulations concurrently.
Parameters
__________
results_folder:
Path to the directory containing simulation folders.
Each subfolder should contain valid simulation data files.
processing_scenario:
Single callable or sequence of callables that implement
the processing logic for each simulation. Each callable should take a
Simulation object as its only parameter.
max_workers:
Maximum number of worker processes to use. If None, defaults to
the number of processors on the machine.
Returns
_______
SimulationsData: :class:`pytrnsys_process.api.SimulationsData`
- monthly: Dict mapping simulation names to monthly DataFrame results
- hourly: Dict mapping simulation names to hourly DataFrame results
- scalar: DataFrame containing scalar/deck values from all simulations
Raises
_______
ValueError: If results_folder doesn't exist or is not a directory
Exception: Individual simulation failures are logged but not re-raised
Example
_______
>>> import pathlib as _pl
>>> from pytrnsys_process import api
...
>>> def processing_step_1(sim):
... # Process simulation data
... pass
>>> def processing_step_2(sim):
... # Process simulation data
... pass
>>> results = api.process_whole_result_set_parallel(
... _pl.Path("path/to/results"),
... [processing_step_1, processing_step_2]
... )
"""
# The last :returns: ensures that the formatting works in PyCharm
_validate_folder(results_folder)
log.initialize_logs(results_folder)
main_logger = log.get_main_logger(results_folder)
main_logger.info(
"Starting batch processing of simulations in %s with parallel execution",
results_folder,
)
sim_folders = [
sim_folder
for sim_folder in results_folder.iterdir()
if sim_folder.is_dir()
]
simulations_data = _process_batch(
sim_folders,
processing_scenario,
results_folder,
parallel=True,
max_workers=max_workers,
)
util.save_to_pickle(
simulations_data,
results_folder / conf.FileNames.SIMULATIONS_DATA_PICKLE_FILE.value,
)
return simulations_data
[docs]
def do_comparison(
comparison_scenario: Union[
_abc.Callable[[ds.SimulationsData], None],
_abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
],
simulations_data: Optional[ds.SimulationsData] = None,
results_folder: Optional[_pl.Path] = None,
) -> None:
"""Execute comparison scenarios on processed simulation results.
Parameters
__________
comparison_scenario:
Single callable or sequence of callables that implement
the comparison logic. Each callable should take a SimulationsData
object as its only parameter.
simulations_data:
Optional SimulationsData object containing the processed
simulation data to be compared.
results_folder:
Optional Path to the directory containing simulation results.
Used if simulations_data is not provided.
Example
__________
>>> from pytrnsys_process import api
...
>>> def comparison_step(simulations_data: ds.SimulationsData):
... # Compare simulation results
... pass
...
>>> api.do_comparison(comparison_step, simulations_data=processed_results)
"""
if not simulations_data:
if not results_folder:
raise ValueError(
"Either simulations_data or results_folder must be provided to perform comparison"
)
path_to_simulations_data = results_folder / "simulations_data.pickle"
if path_to_simulations_data.exists():
simulations_data = util.load_simulations_data_from_pickle(
path_to_simulations_data
)
else:
simulations_data = process_whole_result_set_parallel(
results_folder, []
)
main_logger = log.get_main_logger(
_pl.Path(simulations_data.path_to_simulations)
)
_process_comparisons(simulations_data, comparison_scenario, main_logger)
_plt.close("all")
def _process_comparisons(
simulations_data: ds.SimulationsData,
comparison_scenario: Union[
_abc.Callable[[ds.SimulationsData], None],
_abc.Sequence[_abc.Callable[[ds.SimulationsData], None]],
],
main_logger: _logging.Logger,
):
scenario = (
[comparison_scenario]
if callable(comparison_scenario)
else comparison_scenario
)
for step in scenario:
try:
step(simulations_data)
except Exception as e: # pylint: disable=broad-except
scenario_name = getattr(step, "__name__", str(step))
main_logger.error(
"Scenario %s failed for comparison: %s ",
scenario_name,
str(e),
exc_info=True,
)
def _concat_scalar(simulation_data: ds.SimulationsData) -> ds.SimulationsData:
scalar_values_to_concat = {
sim_name: sim.scalar
for sim_name, sim in simulation_data.simulations.items()
if not sim.scalar.empty
}
if scalar_values_to_concat:
simulation_data.scalar = _pd.concat(
scalar_values_to_concat.values(),
keys=scalar_values_to_concat.keys(),
).droplevel(1)
return simulation_data
def _validate_folder(folder: _pl.Path) -> None:
if not folder.exists():
raise ValueError(f"Folder does not exist: {folder}")
if not folder.is_dir():
raise ValueError(f"Path is not a directory: {folder}")
def _process_simulation(
sim_folder: _pl.Path,
processing_scenarios: Union[
_abc.Callable[[ds.Simulation], None],
Sequence[_abc.Callable[[ds.Simulation], None]],
],
) -> tuple[ds.Simulation, List[str]]:
sim_logger = log.get_simulation_logger(sim_folder)
sim_logger.info("Starting simulation processing")
sim_pickle_file = sim_folder / conf.FileNames.SIMULATION_PICKLE_FILE.value
simulation: ds.Simulation
if (
sim_pickle_file.exists()
and not conf.global_settings.reader.force_reread_prt
):
sim_logger.info("Loading simulation from pickle file")
simulation = util.load_simulation_from_pickle(
sim_pickle_file, sim_logger
)
else:
sim_logger.info("Processing simulation from raw files")
sim_files = util.get_files([sim_folder])
simulation = ps.process_sim(sim_files, sim_folder)
util.save_to_pickle(simulation, sim_pickle_file, sim_logger)
failed_scenarios = []
# Convert single scenario to list for uniform handling
scenarios = (
[processing_scenarios]
if callable(processing_scenarios)
else processing_scenarios
)
for scenario in scenarios:
try:
scenario_name = getattr(scenario, "__name__", str(scenario))
sim_logger.info("Running scenario: %s", scenario_name)
scenario(simulation)
sim_logger.info(
"Successfully completed scenario: %s", scenario_name
)
except Exception as e: # pylint: disable=broad-except
failed_scenarios.append(scenario_name)
sim_logger.error(
"Scenario %s failed: %s",
scenario_name,
str(e),
exc_info=True,
)
if failed_scenarios:
sim_logger.warning(
"Simulation completed with %d failed scenarios",
len(failed_scenarios),
)
else:
sim_logger.info("Simulation completed successfully")
_plt.close("all")
return simulation, failed_scenarios
def _log_processing_results(
results: ds.ProcessingResults, main_logger: _logging.Logger
) -> None:
main_logger.info("=" * 80)
main_logger.info("BATCH PROCESSING SUMMARY")
main_logger.info("-" * 80)
main_logger.info(
"Total simulations processed: %d | Failed: %d",
results.processed_count,
results.error_count,
)
if results.error_count > 0:
main_logger.warning(
"Some simulations failed to process. Check the log for details."
)
main_logger.warning("Failed simulations:")
for sim in results.failed_simulations:
main_logger.warning(" • %s", sim)
if results.failed_scenarios:
main_logger.warning("Failed scenarios by simulation:")
for sim, scenarios in results.failed_scenarios.items():
if scenarios:
main_logger.warning(" • %s:", sim)
for scenario in scenarios:
main_logger.warning(" - %s", scenario)
main_logger.info("=" * 80)