Source code for PyDSS.reports.feeder_losses


from typing import Annotated, Dict
from datetime import timedelta
import math
import os

from pydantic import BaseModel, Field
from pydantic import ConfigDict
from loguru import logger

from PyDSS.reports.reports import ReportBase

[docs] class FeederLossesMetricsModel(BaseModel): """Data model for metrics describing feeder losses""" model_config = ConfigDict(title="FeederLossesMetricsModel", str_strip_whitespace=True, validate_assignment=True, validate_default=True, extra="forbid", use_enum_values=False) total_losses_kwh: Annotated[ float, Field( None, title="total_losses_kwh", description="Total losses in the circuit", )] line_losses_kwh: Annotated[ float, Field( None, title="line_losses_kwh", description="Total line losses", )] transformer_losses_kwh: Annotated[ float, Field( None, title="transformer_losses_kwh", description="Total transformer losses", )] total_load_demand_kwh: Annotated[ float, Field( None, title="total_load_demand_kwh", description="Total power output of loads", )]
[docs] class SimulationFeederLossesMetricsModel(BaseModel): scenarios: Dict[str, FeederLossesMetricsModel] = Field( title="scenarios", description="Feeder losses by PyDSS scenario name", )
def compare_feeder_losses( metrics1: SimulationFeederLossesMetricsModel, metrics2: SimulationFeederLossesMetricsModel, rel_tol=0.000001, ): """Compares the values of two instances of FeederLossesMetricsModel. Returns ------- bool Return True if they match. """ match = True for scenario in metrics1.scenarios: for field in FeederLossesMetricsModel.model_fields: val1 = getattr(metrics1.scenarios[scenario], field) val2 = getattr(metrics2.scenarios[scenario], field) if not math.isclose(val1, val2, rel_tol=rel_tol): logger.error("field=%s mismatch %s : %s", field, val1, val2) match = False return match class FeederLossesReport(ReportBase): """Reports the feeder losses. The report generates a feeder_losses.json output file. """ FILENAME = "feeder_losses.json" NAME = "Feeder Losses" DEFAULTS = { "store_all_time_points": False, } def generate(self, output_dir): resolution = self._get_simulation_resolution() to_kwh = resolution / timedelta(hours=1) assert len(self._results.scenarios) >= 1 scenarios = {} for scenario in self._results.scenarios: inputs = FeederLossesReport.get_inputs_from_defaults(self._settings, self.NAME) if inputs["store_all_time_points"]: scenarios[scenario.name] = self._generate_from_all_time_points(scenario, to_kwh) else: scenarios[scenario.name] = self._generate_from_in_memory_metrics(scenario, to_kwh) model = SimulationFeederLossesMetricsModel(scenarios=scenarios) filename = os.path.join(output_dir, self.FILENAME) with open(filename, "w") as f_out: f_out.write(model.model_dump_json(indent=2)) f_out.write("\n") logger.info("Generated %s", filename) return filename def _generate_from_in_memory_metrics(self, scenario, to_kwh): total_losses_dict = scenario.get_summed_element_total("Circuits", "LossesSum") total_losses = abs(next(iter(total_losses_dict.values()))) / 1000 # OpenDSS reports total losses in Watts line_losses_dict = scenario.get_summed_element_total("Circuits", "LineLossesSum") line_losses = abs(next(iter(line_losses_dict.values()))) transformer_losses = (total_losses - line_losses) total_load_power_dict = scenario.get_summed_element_total("Loads", "PowersSum") total_load_power = 0 for val in total_load_power_dict.values(): total_load_power += val.real # TODO: total losses as a percentage of total load demand? return FeederLossesMetricsModel( total_losses_kwh=total_losses * to_kwh, line_losses_kwh=line_losses * to_kwh, transformer_losses_kwh=transformer_losses * to_kwh, total_load_demand_kwh=total_load_power * to_kwh, ) def _generate_from_all_time_points(self, scenario, to_kwh): df_losses = scenario.get_full_dataframe("Circuits", "Losses") assert len(df_losses.columns) == 1 df_line_losses = scenario.get_full_dataframe("Circuits", "LineLosses") assert len(df_line_losses.columns) == 1 df_loads_powers = scenario.get_full_dataframe("Loads", "Powers") total_losses = abs(df_losses.sum().sum()) / 1000 # OpenDSS reports total losses in Watts line_losses = abs(df_line_losses.sum().sum()) transformer_losses = total_losses - line_losses return FeederLossesMetricsModel( total_losses_kwh=total_losses * to_kwh, line_losses_kwh=line_losses * to_kwh, transformer_losses_kwh=transformer_losses * to_kwh, total_load_demand_kwh=df_loads_powers.sum().sum() * to_kwh, ) @staticmethod def get_required_exports(simulation_config): inputs = FeederLossesReport.get_inputs_from_defaults( simulation_config, FeederLossesReport.NAME ) if inputs["store_all_time_points"]: return { "Circuits": [ { "property": "Losses", "store_values_type": "all", }, { "property": "LineLosses", "store_values_type": "all", } ], "Loads": [ { "property": "Powers", "store_values_type": "all", "data_conversion": "abs_sum", }, ] } return { "Circuits": [ { "property": "Losses", "store_values_type": "sum", "sum_elements": True, }, { "property": "LineLosses", "store_values_type": "sum", "sum_elements": True, } ], "Loads": [ { "property": "Powers", "store_values_type": "sum", "sum_elements": True, "data_conversion": "abs_sum", }, ] } @staticmethod def get_required_scenario_names(): return set()