… create a basic exporter¶
from loguru import logger
from pydantic import BaseModel
from r2x_core.exporter import BaseExporter
from r2x_core.store import DataStore
from r2x_core.system import System
from r2x_core import DataFile
# Define model configuration
class MyModelConfig(BaseModel):
"""Configuration for MyModel exporter."""
model_year: int
scenario_name: str
export_timeseries: bool = True
# Create exporter class
class MyModelExporter(BaseExporter):
"""Exporter for MyModel format."""
def __init__(self, config: MyModelConfig, system: System, data_store: DataStore, **kwargs):
super().__init__(config, system, data_store, **kwargs)
self.model_year = config.model_year
def export(self) -> None:
"""Export system to model format."""
logger.info(f"Exporting to MyModel format for year {self.model_year}")
# Export components
self._export_generators()
self._export_buses()
# Export time series if enabled
if self.config.export_timeseries:
self.export_time_series()
logger.info("Export complete")
def _export_generators(self) -> None:
"""Export generators."""
pass # Implementation
def _export_buses(self) -> None:
"""Export buses."""
pass # Implementation
def export_time_series(self) -> None:
"""Export time series (required abstract method)."""
# Get only time series files from DataStore
ts_files = [
df for df in self.data_store.data_files.values()
if df.is_timeseries
]
for datafile in ts_files:
logger.info(f"Exporting time series to {datafile.file_path}")
# Implementation here
# Use the exporter
config = MyModelConfig(model_year=2030, scenario_name="base")
system = System() # Already populated with components
data_store = DataStore(
data_files={
"generators": DataFile(name="generators", file_path="output/generators.csv"),
},
folder="/path/to/output",
)
exporter = MyModelExporter(config, system, data_store)
exporter.export()
… export components to CSV files¶
from loguru import logger
from r2x_core.exporter import BaseExporter
class MyModelExporter(BaseExporter):
"""Exporter for MyModel format."""
def _export_generators(self) -> None:
"""Export generator components to CSV."""
from my_components import Generator
# Get DataFile configuration for generators
gen_file = self.data_store.data_files["generators"]
# Export only Generator components using filter function
self.system.export_components_to_csv(
file_path=gen_file.file_path,
filter_func=lambda c: isinstance(c, Generator),
)
logger.info(f"Exported generators to {gen_file.file_path}")
def _export_high_voltage_buses(self) -> None:
"""Export only high voltage buses."""
from my_components import Bus
bus_file = self.data_store.data_files["hv_buses"]
# Filter by type AND attribute
self.system.export_components_to_csv(
file_path=bus_file.file_path,
filter_func=lambda c: isinstance(c, Bus) and c.voltage > 100,
fields=["name", "voltage", "area"], # Select specific fields
key_mapping={"voltage": "voltage_kv"} # Rename columns
)
logger.info(f"Exported high voltage buses to {bus_file.file_path}")
def _export_all_components(self) -> None:
"""Export all system components to a single CSV."""
all_file = self.data_store.data_files["all_components"]
# Export everything (no filter)
self.system.export_components_to_csv(
file_path=all_file.file_path
)
logger.info(f"Exported all components to {all_file.file_path}")
… export time series data¶
from loguru import logger
from r2x_core import BaseExporter, DataFile
from r2x_core.file_types import TableFile, H5File, ParquetFile
import polars as pl
class MyModelExporter(BaseExporter):
"""Exporter for MyModel format."""
def export_time_series(self) -> None:
"""Export time series to files."""
# Get only time series files from DataStore
timeseries_files = [
df for df in self.data_store.data_files.values()
if df.is_timeseries
]
for datafile in timeseries_files:
self._export_timeseries_file(datafile)
logger.info(f"Exported time series to {datafile.file_path}")
def _export_timeseries_file(self, datafile: DataFile) -> None:
"""Export time series based on file type."""
# Get time series data from system
ts_data = self._collect_timeseries_data(datafile.name)
match datafile.file_type:
case TableFile():
ts_data.write_csv(datafile.file_path)
case H5File():
self._write_h5(datafile, ts_data)
case ParquetFile():
ts_data.write_parquet(datafile.file_path)
def _collect_timeseries_data(self, name: str) -> pl.DataFrame:
"""Collect time series data from components."""
# Implementation to gather time series
return pl.DataFrame() # Placeholder
def _write_h5(self, datafile: DataFile, data: pl.DataFrame) -> None:
"""Write data to HDF5."""
import h5py
with h5py.File(datafile.file_path, "w") as f:
for col in data.columns:
f.create_dataset(col, data=data[col].to_numpy())
… handle multi-year time series export¶
from loguru import logger
from r2x_core import BaseExporter, DataFile
import h5py
import polars as pl
class MyModelExporter(BaseExporter):
"""Exporter for MyModel format."""
def _export_to_h5(self, datafile: DataFile) -> None:
"""Export multi-year time series to HDF5."""
with h5py.File(datafile.file_path, "w") as f:
# Get time series from system grouped by year
for component in self.system.get_components():
ts_data = self.system.get_time_series(component)
if ts_data is not None:
# Organize by year if multi-year data
if isinstance(ts_data, dict):
# Multi-year: {2020: array, 2021: array, ...}
for year, yearly_data in ts_data.items():
group_path = f"/{year}/{component.name}"
f.create_dataset(group_path, data=yearly_data)
else:
# Single year
f.create_dataset(f"/{component.name}", data=ts_data)
logger.info(f"Exported multi-year time series to {datafile.file_path}")
… validate export configuration¶
from r2x_core.exporter import BaseExporter
from r2x_core.exceptions import ExporterError
class MyModelExporter(BaseExporter):
"""Exporter for MyModel format."""
def validate_export(self) -> None:
"""Validate system can be exported."""
# Check required component types exist
required_types = ["Generator", "Bus", "Branch"]
for comp_type in required_types:
components = self.system.get_components_by_type(comp_type)
if not components:
raise ExporterError(f"No {comp_type} components found in system")
# Validate output directory exists
output_dir = self.data_store.folder
if not output_dir.exists():
raise ExporterError(f"Output directory does not exist: {output_dir}")
… use custom export transformations¶
from loguru import logger
from r2x_core import BaseExporter, DataFile
import polars as pl
class MyModelExporter(BaseExporter):
"""Exporter for MyModel format."""
def _export_generators(self) -> None:
"""Export generators with custom transformations."""
from my_components import Generator
# Get components as records (list of dicts) using filter
gen_records = self.system.components_to_records(
filter_func=lambda c: isinstance(c, Generator)
)
# Convert to DataFrame for transformations
gen_df = pl.DataFrame(gen_records)
# Apply model-specific transformations
transformed_df = gen_df.with_columns([
# Rename columns
pl.col("max_active_power").alias("PMax_MW"),
pl.col("min_active_power").alias("PMin_MW"),
# Add calculated fields
(pl.col("max_active_power") * 0.95).alias("AvailableCapacity_MW"),
]).select([
# Select and reorder columns for target model
"name",
"bus",
"PMax_MW",
"PMin_MW",
"AvailableCapacity_MW",
])
# Write to configured file
gen_file = self.data_store.data_files["generators"]
transformed_df.write_csv(gen_file.file_path)
logger.info(f"Exported {len(transformed_df)} generators to {gen_file.file_path}")
… export with progress logging¶
from loguru import logger
from r2x_core import BaseExporter
class MyModelExporter(BaseExporter):
"""Exporter for MyModel format."""
def export(self) -> None:
"""Export with progress logging."""
logger.info("Starting export process")
# Export components by type
component_types = {
"buses": lambda c: c.__class__.__name__ == "Bus",
"generators": lambda c: c.__class__.__name__ == "Generator",
"branches": lambda c: c.__class__.__name__ == "Branch",
"loads": lambda c: c.__class__.__name__ == "Load",
}
for name, filter_func in component_types.items():
# Count matching components
count = len(self.system.components_to_records(filter_func=filter_func))
logger.info(f"Exporting {count} {name} components")
self._export_component_type(name, filter_func)
# Export time series
if self.config.export_timeseries:
timeseries_files = [
df for df in self.data_store.data_files.values()
if df.is_timeseries
]
logger.info(f"Exporting {len(timeseries_files)} time series files")
for datafile in timeseries_files:
logger.info(f" - {datafile.name}")
self._export_timeseries_file(datafile)
logger.info("Export complete")
def export_time_series(self) -> None:
"""Export time series (required abstract method)."""
# Implementation moved to export() method for this example
pass
def _export_component_type(self, file_key: str, filter_func) -> None:
"""Export specific component type."""
if file_key in self.data_store.data_files:
datafile = self.data_store.data_files[file_key]
self.system.export_components_to_csv(
file_path=datafile.file_path,
filter_func=filter_func,
)
def _export_timeseries_file(self, datafile) -> None:
"""Export time series file."""
pass # Implementation
… handle optional time series files¶
from loguru import logger
from r2x_core import BaseExporter, DataFile
class MyModelExporter(BaseExporter):
"""Exporter for MyModel format."""
def export_time_series(self) -> None:
"""Export time series, handling optional files."""
timeseries_files = [
df for df in self.data_store.data_files.values()
if df.is_timeseries
]
for datafile in timeseries_files:
# Check if this is an optional file
if datafile.is_optional:
# Check if data exists before exporting
if self._has_timeseries_data(datafile.name):
self._export_timeseries_file(datafile)
else:
logger.info(f"Skipping optional file {datafile.name} (no data)")
else:
# Required file - must export
self._export_timeseries_file(datafile)
def _has_timeseries_data(self, name: str) -> bool:
"""Check if time series data exists for this file."""
# Check if any components have time series matching this name
for component in self.system.get_components():
ts_data = self.system.get_time_series(component)
if ts_data is not None and name in str(ts_data):
return True
return False
def _export_timeseries_file(self, datafile: DataFile) -> None:
"""Export time series file."""
pass # Implementation