Skip to content

Reference manual

Simulation components

Base class for the simulator

Source code in pypsse\simulator.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
class Simulator:
    "Base class for the simulator"

    _status: SimulationStatus = SimulationStatus.NOT_INITIALIZED

    def __init__(
        self,
        settings: SimulationSettings,
        export_settings: Union[ExportFileOptions, None] = None,
        psse_version: str = PSSE_VERSIONS.PSSE35.value,
    ):
        """Load a valid PyPSSE project and sets up simulation

        Args:
            settings (SimulationSettings): simulation settings
            export_settings (Union[ExportFileOptions, None]): export settings
            psse_path (Union[str, Path], optional): Path to python environment within the PSS/e install directory
        """

        self._status = SimulationStatus.STARTING_INSTANCE
        self.settings = settings

        logger.debug(f"Instantiating psse version {psse_version}")
        __import__(psse_version, fromlist=[""])  # noqa: F401

        import dyntools
        import psspy
        logger.debug(f"Imported psspy suscessfully")

        ierr = psspy.psseinit(N_BUS)
        assert ierr == 0, f"Error code: {ierr}"
        logger.debug(f"PSSE initialized sucessfully suscessfully")
        if export_settings is None:
            export_settings_path = (
                Path(self.settings.simulation.project_path)
                / EXPORTS_SETTINGS_FILENAME
            )
            assert (
                export_settings_path.exists()
            ), f"{export_settings_path} does nor exist"
            export_settings = toml.load(export_settings_path)
            export_settings = ExportFileOptions(**export_settings)

        self.export_settings = export_settings
        log_path = os.path.join(
            self.settings.simulation.project_path, LOGS_FOLDER
        )
        logger.debug("Starting PSSE instance")

        self.dyntools = dyntools
        self.psse = psspy
        # logger.debug('Initializing PSS/E. connecting to license server')

        self.start_simulation()
        self.init()
        self._status = SimulationStatus.INITIALIZATION_COMPLETE

    @classmethod
    def from_setting_files(
        cls, simulation_settiings_file: Path, export_Settings_file: Path = None
    ):
        """build 'Simulator' from toml settings files

        Args:
            simulation_settiings_file (Path): simulation settings
            export_Settings_file (Path): export settings
        """
        simulation_settiings = toml.load(simulation_settiings_file)
        if export_Settings_file:
            export_Settings = toml.load(export_Settings_file)
        else:
            export_Settings = toml.load(
                simulation_settiings_file.parent / EXPORTS_SETTINGS_FILENAME
            )

        simulation_settiings = SimulationSettings(**simulation_settiings)
        export_Settings = ExportFileOptions(**export_Settings)
        return cls(simulation_settiings, export_Settings)

    def dump_settings(
        self,
        dest_dir: Path,
        simulation_file: str = SIMULATION_SETTINGS_FILENAME,
        export_file: str = EXPORTS_SETTINGS_FILENAME,
    ):
        """Dumps simulation settings to a provided path

        Args:
            dest_dir (Path): Directory where settins are dumped
            simulation_file (str, optional): simulation filename. Defaults to SIMULATION_SETTINGS_FILENAME.
            export_file (str, optional): export setting filename. Defaults to EXPORTS_SETTINGS_FILENAME.
        """

        settings_json = self.settings.model_dump_json()
        json.dump(settings_json, open(dest_dir / simulation_file, "w"))

        export_settings_json = self.settings.model_dump_json()
        json.dump(export_settings_json, open(dest_dir / export_file, "w"))

    def start_simulation(self):
        """Starts a loaded simulation

        Raises:
            Exception: Please pass a RAW or SAV file in the settings dictionary
        """

        self.hi = None
        self.simStartTime = time.time()

        if self.settings.simulation.case_study.exists():
            self.psse.case(str(self.settings.simulation.case_study))
        elif self.settings.simulation.raw_file.exists():
            self.psse.read(0, str(self.settings.simulation.raw_file))
        else:
            msg = "Please pass a RAW or SAV file in the settings dictionary"
            raise Exception(msg)

        logger.info(
            f"Trying to read a file >>{self.settings.simulation.case_study}"
        )
        self.raw_data = rd.Reader(self.psse)
        (
            self.bus_subsystems,
            self.all_subsysten_buses,
        ) = self.define_bus_subsystems()

        if self.export_settings.defined_subsystems_only:
            valid_buses = self.all_subsysten_buses
        else:
            valid_buses = self.raw_data.buses

        self.sim = sc.sim_controller(
            self.psse,
            self.dyntools,
            self.settings,
            self.export_settings,
            valid_buses,
            self.raw_data,
        )

        self.contingencies = self.build_contingencies()

        if self.settings.helics and self.settings.helics.cosimulation_mode:
            if self.settings.simulation.simulation_mode in [
                SimulationModes.DYNAMIC,
                SimulationModes.SNAP,
            ]:
                ...
            self.hi = HelicsInterface(
                self.psse,
                self.sim,
                self.settings,
                self.export_settings,
                self.bus_subsystems,
            )
            self.publications = self.hi.register_publications(
                self.bus_subsystems
            )
            if self.settings.helics.create_subscriptions:
                self.subscriptions = self.hi.register_subscriptions()

        if self.settings.simulation.gic_file:
            self.network_graph = self.parse_gic_file()
            self.bus_ids = self.network_graph.nodes.keys()
        else:
            self.network_graph = None

        self.results = Container(self.settings, self.export_settings)
        self.exp_vars = self.results.get_export_variables()
        self.inc_time = True

    def init(self):
        """Initializes the model"""

        self.sim.init(self.bus_subsystems)

        if self.settings.simulation.use_profile_manager:
            self.pm = ProfileManager(self.sim, self.settings)
            self.pm.setup_profiles()
        if self.settings.helics and self.settings.helics.cosimulation_mode:
            self.hi.enter_execution_mode()

    def parse_gic_file(self) -> Graph:
        """Parses the GIC file (if included in the project)

        Returns:
            Graph: Networkx graph representation for the model
        """

        gicdata = gp.GICParser(self.settings)
        return gicdata.psse_graph

    def define_bus_subsystems(self) -> (dict, list):
        """Defines a bussystem in the loaded PSSE model

        Raises:
            LookupError: Failed to create bus subsystem chosen buses.
            ValueError: Number of subsystems can not be more that 12. See PSSE documentation
            RuntimeError: Failed to add buses to bus subsystem

        Returns:
            dict: mapping of bus subsystems to buses
            list: List of bus subsystems
        """

        bus_subsystems_dict = {}
        bus_subsystems = self.get_bus_indices()
        # valid bus subsystem ID. Valid bus subsystem IDs range from 0 to 11 (PSSE documentation)
        if len(bus_subsystems) > MAX_PSSE_BUSSYSTEMS:
            msg = "Number of subsystems can not be more that 12. See PSSE documentation"
            raise ValueError(msg)

        all_subsysten_buses = []
        for i, buses in enumerate(bus_subsystems):
            if not buses:
                continue

            all_subsysten_buses.extend(buses)
            ierr = self.psse.bsysinit(i)
            if ierr:
                msg = "Failed to create bus subsystem chosen buses."
                raise LookupError(msg)
            else:
                logger.debug(f'Bus subsystem "{i}" created')

            ierr = self.psse.bsys(sid=i, numbus=len(buses), buses=buses)
            if ierr:
                msg = "Failed to add buses to bus subsystem."
                raise RuntimeError(msg)
            else:
                bus_subsystems_dict[i] = buses
                logger.debug(f'Buses {buses} added to subsystem "{i}"')
        all_subsysten_buses = [str(x) for x in all_subsysten_buses]
        return bus_subsystems_dict, all_subsysten_buses

    def get_bus_indices(self) -> BusSubsystems:
        """Returns bus indices for bus subsystems

        Returns:
            BusSubsystems: Bus subsystem model
        """

        if self.settings.bus_subsystems.from_file:
            bus_file = self.settings.bus_subsystems.bus_file
            bus_info = pd.read_csv(bus_file, index_col=None)
            bus_info = bus_info.values
            _, n_cols = bus_info.shape
            bus_data = []
            for col in range(n_cols):
                data = [int(x) for x in bus_info[:, col] if not np.isnan(x)]
                bus_data.append(data)
        else:
            bus_data = self.settings.bus_subsystems.bus_subsystem_list
        return bus_data

    def run(self):
        """Launches the simulation"""

        self._status = SimulationStatus.RUNNING_SIMULATION
        if self.sim.initialization_complete:
            if self.settings.plots and self.settings.plots.enable_dynamic_plots:
                bokeh_server_proc = subprocess.Popen(
                    ["bokeh", "serve"], stdout=subprocess.PIPE
                )  # noqa: S603,S607
            else:
                bokeh_server_proc = None

            logger.debug(
                f"Running dynamic simulation for time {self.settings.simulation.simulation_time.total_seconds()} sec"
            )
            total_simulation_time = (
                self.settings.simulation.simulation_time.total_seconds()
            )
            t = 0
            while True:
                self.step(t)
                if self.inc_time:
                    t += (
                        self.settings.simulation.simulation_step_resolution.total_seconds()
                    )
                if t >= total_simulation_time:
                    break

            self.psse.pssehalt_2()
            if not self.export_settings.export_results_using_channels:
                self.results.export_results()
            else:
                self.sim.export()

            if bokeh_server_proc is not None:
                bokeh_server_proc.terminate()
        else:
            logger.error(
                "Run init() command to initialize models before running the simulation"
            )
        self._status = "Simulation complete"

    def get_bus_ids(self) -> list:
        """Returns bus IDs

        Returns:
            list: Array of bus numbers
        """

        ierr, iarray = self.psse.abusint(-1, 1, "NUMBER")
        assert ierr == 0, f"Error code: {ierr}"
        return iarray

    def step(self, t: float) -> dict:
        """Steps through a single simulation time step. Is called iteratively to increment the simualtion

        Args:
            t (float): time step for the simulation

        Returns:
            dict: results from the current timestep
        """

        self.update_contingencies(t)
        if self.settings.simulation.use_profile_manager:
            self.pm.update()
        ctime = time.time() - self.simStartTime
        logger.debug(
            f"Simulation time: {t} seconds\nRun time: {ctime}\npsse time: {self.sim.get_time()}"
        )
        if self.settings.helics and self.settings.helics.cosimulation_mode:
            if self.settings.helics.create_subscriptions:
                self.update_subscriptions()
                logger.debug(f"Time requested: {t}")
                self.inc_time, helics_time = self.update_federate_time(t)
                logger.debug(f"Time granted: {helics_time}")

        if self.inc_time:
            self.sim.step(t)
        else:
            self.sim.resolve_step()

        if self.settings.helics and self.settings.helics.cosimulation_mode:
            self.publish_data()

        curr_results = self.update_result_container(t)
        return curr_results

    def update_result_container(self, t: float) -> dict:
        """Updates the result container with results from the current time step

        Args:
            t (float): simulation time in seconds

        Returns:
            dict: simulation reults from the current time step
        """

        if self.export_settings.defined_subsystems_only:
            curr_results = self.sim.read_subsystems(
                self.exp_vars, self.all_subsysten_buses
            )
        else:
            curr_results = self.sim.read_subsystems(
                self.exp_vars, self.raw_data.buses
            )

        if not USING_NAERM:
            if not self.export_settings.export_results_using_channels:
                self.results.update(
                    curr_results,
                    t,
                    self.sim.get_time(),
                    self.sim.has_converged(),
                )
        return curr_results

    def update_subscriptions(self):
        """Updates subscriptions (co-simulation mode only)"""

        self.hi.subscribe()

    def update_federate_time(self, t: float) -> (float, float):
        """Makes a time request to teh HELICS broker (co-simulation mode only)

        Args:
            t (float): simulation time in seconds

        Returns:
            float: requested time in seconds
            float: current simualtion time in seconds
        """

        inc_time, curr_time = self.hi.request_time(t)
        return inc_time, curr_time

    def publish_data(self):
        """Updates publications (co-simulation mode only)"""

        self.hi.publish()

    def get_results(self, params: Union[ExportAssetTypes, dict]) -> dict:
        """Returns queried simulation results

        Args:
            params (Union[ExportAssetTypes, dict]): _description_

        Returns:
            dict: simulation results
        """

        self._status = SimulationStatus.STARTING_RESULT_EXPORT
        self.exp_vars = self.results.update_export_variables(params)
        curr_results = (
            self.sim.read_subsystems(self.exp_vars, self.all_subsysten_buses)
            if self.export_settings.defined_subsystems_only
            else self.sim.read_subsystems(self.exp_vars, self.raw_data.buses)
        )
        self._status = SimulationStatus.RESULT_EXPORT_COMPLETE
        return curr_results

    def status(self) -> SimulationStatus:
        """returns current simulation status

        Returns:
            SimulationStatus: state of the simulator
        """
        return self._status.value

    def build_contingencies(
        self,
    ) -> List[
        Union[
            BusTripObject,
            BusFaultObject,
            LineTripObject,
            LineFaultObject,
            MachineTripObject,
        ]
    ]:
        """Builds user defined contengingies

        Returns:
            List[Union[BusFault, LineFault, LineTrip, BusTrip, MachineTrip]]: List of contingencies
        """

        contingencies = c.build_contingencies(self.psse, self.settings)
        return contingencies

    def inject_contingencies_external(self, contigencies: Contingencies):
        """Inject external contingencies.

        Args:
            contigencies (Contingencies): Contigencies Object
        """
        contingencies = c.build_contingencies(self.psse, contigencies)
        self.contingencies.extend(contingencies)

    def update_contingencies(self, t: float):
        """Updates contingencies during the simualtion run

        Args:
            t (float): simulation time in seconds
        """

        for contingency in self.contingencies:
            contingency.update(t)

    def force_psse_halt(self):
        """forces cleaup of pypss imort"""
        ierr = self.psse.pssehalt_2()
        assert ierr == 0, f"failed to halt PSSE. Error code - {ierr}"

    def __del__(self):
        try:
            self.force_psse_halt()
        except:
            pass

__init__(settings, export_settings=None, psse_version=PSSE_VERSIONS.PSSE35.value)

Load a valid PyPSSE project and sets up simulation

Parameters:

Name Type Description Default
settings SimulationSettings

simulation settings

required
export_settings Union[ExportFileOptions, None]

export settings

None
psse_path Union[str, Path]

Path to python environment within the PSS/e install directory

required
Source code in pypsse\simulator.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __init__(
    self,
    settings: SimulationSettings,
    export_settings: Union[ExportFileOptions, None] = None,
    psse_version: str = PSSE_VERSIONS.PSSE35.value,
):
    """Load a valid PyPSSE project and sets up simulation

    Args:
        settings (SimulationSettings): simulation settings
        export_settings (Union[ExportFileOptions, None]): export settings
        psse_path (Union[str, Path], optional): Path to python environment within the PSS/e install directory
    """

    self._status = SimulationStatus.STARTING_INSTANCE
    self.settings = settings

    logger.debug(f"Instantiating psse version {psse_version}")
    __import__(psse_version, fromlist=[""])  # noqa: F401

    import dyntools
    import psspy
    logger.debug(f"Imported psspy suscessfully")

    ierr = psspy.psseinit(N_BUS)
    assert ierr == 0, f"Error code: {ierr}"
    logger.debug(f"PSSE initialized sucessfully suscessfully")
    if export_settings is None:
        export_settings_path = (
            Path(self.settings.simulation.project_path)
            / EXPORTS_SETTINGS_FILENAME
        )
        assert (
            export_settings_path.exists()
        ), f"{export_settings_path} does nor exist"
        export_settings = toml.load(export_settings_path)
        export_settings = ExportFileOptions(**export_settings)

    self.export_settings = export_settings
    log_path = os.path.join(
        self.settings.simulation.project_path, LOGS_FOLDER
    )
    logger.debug("Starting PSSE instance")

    self.dyntools = dyntools
    self.psse = psspy
    # logger.debug('Initializing PSS/E. connecting to license server')

    self.start_simulation()
    self.init()
    self._status = SimulationStatus.INITIALIZATION_COMPLETE

build_contingencies()

Builds user defined contengingies

Returns:

Type Description
List[Union[BusTripObject, BusFaultObject, LineTripObject, LineFaultObject, MachineTripObject]]

List[Union[BusFault, LineFault, LineTrip, BusTrip, MachineTrip]]: List of contingencies

Source code in pypsse\simulator.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
def build_contingencies(
    self,
) -> List[
    Union[
        BusTripObject,
        BusFaultObject,
        LineTripObject,
        LineFaultObject,
        MachineTripObject,
    ]
]:
    """Builds user defined contengingies

    Returns:
        List[Union[BusFault, LineFault, LineTrip, BusTrip, MachineTrip]]: List of contingencies
    """

    contingencies = c.build_contingencies(self.psse, self.settings)
    return contingencies

define_bus_subsystems()

Defines a bussystem in the loaded PSSE model

Raises:

Type Description
LookupError

Failed to create bus subsystem chosen buses.

ValueError

Number of subsystems can not be more that 12. See PSSE documentation

RuntimeError

Failed to add buses to bus subsystem

Returns:

Name Type Description
dict (dict, list)

mapping of bus subsystems to buses

list (dict, list)

List of bus subsystems

Source code in pypsse\simulator.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
def define_bus_subsystems(self) -> (dict, list):
    """Defines a bussystem in the loaded PSSE model

    Raises:
        LookupError: Failed to create bus subsystem chosen buses.
        ValueError: Number of subsystems can not be more that 12. See PSSE documentation
        RuntimeError: Failed to add buses to bus subsystem

    Returns:
        dict: mapping of bus subsystems to buses
        list: List of bus subsystems
    """

    bus_subsystems_dict = {}
    bus_subsystems = self.get_bus_indices()
    # valid bus subsystem ID. Valid bus subsystem IDs range from 0 to 11 (PSSE documentation)
    if len(bus_subsystems) > MAX_PSSE_BUSSYSTEMS:
        msg = "Number of subsystems can not be more that 12. See PSSE documentation"
        raise ValueError(msg)

    all_subsysten_buses = []
    for i, buses in enumerate(bus_subsystems):
        if not buses:
            continue

        all_subsysten_buses.extend(buses)
        ierr = self.psse.bsysinit(i)
        if ierr:
            msg = "Failed to create bus subsystem chosen buses."
            raise LookupError(msg)
        else:
            logger.debug(f'Bus subsystem "{i}" created')

        ierr = self.psse.bsys(sid=i, numbus=len(buses), buses=buses)
        if ierr:
            msg = "Failed to add buses to bus subsystem."
            raise RuntimeError(msg)
        else:
            bus_subsystems_dict[i] = buses
            logger.debug(f'Buses {buses} added to subsystem "{i}"')
    all_subsysten_buses = [str(x) for x in all_subsysten_buses]
    return bus_subsystems_dict, all_subsysten_buses

dump_settings(dest_dir, simulation_file=SIMULATION_SETTINGS_FILENAME, export_file=EXPORTS_SETTINGS_FILENAME)

Dumps simulation settings to a provided path

Parameters:

Name Type Description Default
dest_dir Path

Directory where settins are dumped

required
simulation_file str

simulation filename. Defaults to SIMULATION_SETTINGS_FILENAME.

SIMULATION_SETTINGS_FILENAME
export_file str

export setting filename. Defaults to EXPORTS_SETTINGS_FILENAME.

EXPORTS_SETTINGS_FILENAME
Source code in pypsse\simulator.py
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
def dump_settings(
    self,
    dest_dir: Path,
    simulation_file: str = SIMULATION_SETTINGS_FILENAME,
    export_file: str = EXPORTS_SETTINGS_FILENAME,
):
    """Dumps simulation settings to a provided path

    Args:
        dest_dir (Path): Directory where settins are dumped
        simulation_file (str, optional): simulation filename. Defaults to SIMULATION_SETTINGS_FILENAME.
        export_file (str, optional): export setting filename. Defaults to EXPORTS_SETTINGS_FILENAME.
    """

    settings_json = self.settings.model_dump_json()
    json.dump(settings_json, open(dest_dir / simulation_file, "w"))

    export_settings_json = self.settings.model_dump_json()
    json.dump(export_settings_json, open(dest_dir / export_file, "w"))

force_psse_halt()

forces cleaup of pypss imort

Source code in pypsse\simulator.py
520
521
522
523
def force_psse_halt(self):
    """forces cleaup of pypss imort"""
    ierr = self.psse.pssehalt_2()
    assert ierr == 0, f"failed to halt PSSE. Error code - {ierr}"

from_setting_files(simulation_settiings_file, export_Settings_file=None) classmethod

build 'Simulator' from toml settings files

Parameters:

Name Type Description Default
simulation_settiings_file Path

simulation settings

required
export_Settings_file Path

export settings

None
Source code in pypsse\simulator.py
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
@classmethod
def from_setting_files(
    cls, simulation_settiings_file: Path, export_Settings_file: Path = None
):
    """build 'Simulator' from toml settings files

    Args:
        simulation_settiings_file (Path): simulation settings
        export_Settings_file (Path): export settings
    """
    simulation_settiings = toml.load(simulation_settiings_file)
    if export_Settings_file:
        export_Settings = toml.load(export_Settings_file)
    else:
        export_Settings = toml.load(
            simulation_settiings_file.parent / EXPORTS_SETTINGS_FILENAME
        )

    simulation_settiings = SimulationSettings(**simulation_settiings)
    export_Settings = ExportFileOptions(**export_Settings)
    return cls(simulation_settiings, export_Settings)

get_bus_ids()

Returns bus IDs

Returns:

Name Type Description
list list

Array of bus numbers

Source code in pypsse\simulator.py
354
355
356
357
358
359
360
361
362
363
def get_bus_ids(self) -> list:
    """Returns bus IDs

    Returns:
        list: Array of bus numbers
    """

    ierr, iarray = self.psse.abusint(-1, 1, "NUMBER")
    assert ierr == 0, f"Error code: {ierr}"
    return iarray

get_bus_indices()

Returns bus indices for bus subsystems

Returns:

Name Type Description
BusSubsystems BusSubsystems

Bus subsystem model

Source code in pypsse\simulator.py
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
def get_bus_indices(self) -> BusSubsystems:
    """Returns bus indices for bus subsystems

    Returns:
        BusSubsystems: Bus subsystem model
    """

    if self.settings.bus_subsystems.from_file:
        bus_file = self.settings.bus_subsystems.bus_file
        bus_info = pd.read_csv(bus_file, index_col=None)
        bus_info = bus_info.values
        _, n_cols = bus_info.shape
        bus_data = []
        for col in range(n_cols):
            data = [int(x) for x in bus_info[:, col] if not np.isnan(x)]
            bus_data.append(data)
    else:
        bus_data = self.settings.bus_subsystems.bus_subsystem_list
    return bus_data

get_results(params)

Returns queried simulation results

Parameters:

Name Type Description Default
params Union[ExportAssetTypes, dict]

description

required

Returns:

Name Type Description
dict dict

simulation results

Source code in pypsse\simulator.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def get_results(self, params: Union[ExportAssetTypes, dict]) -> dict:
    """Returns queried simulation results

    Args:
        params (Union[ExportAssetTypes, dict]): _description_

    Returns:
        dict: simulation results
    """

    self._status = SimulationStatus.STARTING_RESULT_EXPORT
    self.exp_vars = self.results.update_export_variables(params)
    curr_results = (
        self.sim.read_subsystems(self.exp_vars, self.all_subsysten_buses)
        if self.export_settings.defined_subsystems_only
        else self.sim.read_subsystems(self.exp_vars, self.raw_data.buses)
    )
    self._status = SimulationStatus.RESULT_EXPORT_COMPLETE
    return curr_results

init()

Initializes the model

Source code in pypsse\simulator.py
228
229
230
231
232
233
234
235
236
237
def init(self):
    """Initializes the model"""

    self.sim.init(self.bus_subsystems)

    if self.settings.simulation.use_profile_manager:
        self.pm = ProfileManager(self.sim, self.settings)
        self.pm.setup_profiles()
    if self.settings.helics and self.settings.helics.cosimulation_mode:
        self.hi.enter_execution_mode()

inject_contingencies_external(contigencies)

Inject external contingencies.

Parameters:

Name Type Description Default
contigencies Contingencies

Contigencies Object

required
Source code in pypsse\simulator.py
501
502
503
504
505
506
507
508
def inject_contingencies_external(self, contigencies: Contingencies):
    """Inject external contingencies.

    Args:
        contigencies (Contingencies): Contigencies Object
    """
    contingencies = c.build_contingencies(self.psse, contigencies)
    self.contingencies.extend(contingencies)

parse_gic_file()

Parses the GIC file (if included in the project)

Returns:

Name Type Description
Graph Graph

Networkx graph representation for the model

Source code in pypsse\simulator.py
239
240
241
242
243
244
245
246
247
def parse_gic_file(self) -> Graph:
    """Parses the GIC file (if included in the project)

    Returns:
        Graph: Networkx graph representation for the model
    """

    gicdata = gp.GICParser(self.settings)
    return gicdata.psse_graph

publish_data()

Updates publications (co-simulation mode only)

Source code in pypsse\simulator.py
448
449
450
451
def publish_data(self):
    """Updates publications (co-simulation mode only)"""

    self.hi.publish()

run()

Launches the simulation

Source code in pypsse\simulator.py
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def run(self):
    """Launches the simulation"""

    self._status = SimulationStatus.RUNNING_SIMULATION
    if self.sim.initialization_complete:
        if self.settings.plots and self.settings.plots.enable_dynamic_plots:
            bokeh_server_proc = subprocess.Popen(
                ["bokeh", "serve"], stdout=subprocess.PIPE
            )  # noqa: S603,S607
        else:
            bokeh_server_proc = None

        logger.debug(
            f"Running dynamic simulation for time {self.settings.simulation.simulation_time.total_seconds()} sec"
        )
        total_simulation_time = (
            self.settings.simulation.simulation_time.total_seconds()
        )
        t = 0
        while True:
            self.step(t)
            if self.inc_time:
                t += (
                    self.settings.simulation.simulation_step_resolution.total_seconds()
                )
            if t >= total_simulation_time:
                break

        self.psse.pssehalt_2()
        if not self.export_settings.export_results_using_channels:
            self.results.export_results()
        else:
            self.sim.export()

        if bokeh_server_proc is not None:
            bokeh_server_proc.terminate()
    else:
        logger.error(
            "Run init() command to initialize models before running the simulation"
        )
    self._status = "Simulation complete"

start_simulation()

Starts a loaded simulation

Raises:

Type Description
Exception

Please pass a RAW or SAV file in the settings dictionary

Source code in pypsse\simulator.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
def start_simulation(self):
    """Starts a loaded simulation

    Raises:
        Exception: Please pass a RAW or SAV file in the settings dictionary
    """

    self.hi = None
    self.simStartTime = time.time()

    if self.settings.simulation.case_study.exists():
        self.psse.case(str(self.settings.simulation.case_study))
    elif self.settings.simulation.raw_file.exists():
        self.psse.read(0, str(self.settings.simulation.raw_file))
    else:
        msg = "Please pass a RAW or SAV file in the settings dictionary"
        raise Exception(msg)

    logger.info(
        f"Trying to read a file >>{self.settings.simulation.case_study}"
    )
    self.raw_data = rd.Reader(self.psse)
    (
        self.bus_subsystems,
        self.all_subsysten_buses,
    ) = self.define_bus_subsystems()

    if self.export_settings.defined_subsystems_only:
        valid_buses = self.all_subsysten_buses
    else:
        valid_buses = self.raw_data.buses

    self.sim = sc.sim_controller(
        self.psse,
        self.dyntools,
        self.settings,
        self.export_settings,
        valid_buses,
        self.raw_data,
    )

    self.contingencies = self.build_contingencies()

    if self.settings.helics and self.settings.helics.cosimulation_mode:
        if self.settings.simulation.simulation_mode in [
            SimulationModes.DYNAMIC,
            SimulationModes.SNAP,
        ]:
            ...
        self.hi = HelicsInterface(
            self.psse,
            self.sim,
            self.settings,
            self.export_settings,
            self.bus_subsystems,
        )
        self.publications = self.hi.register_publications(
            self.bus_subsystems
        )
        if self.settings.helics.create_subscriptions:
            self.subscriptions = self.hi.register_subscriptions()

    if self.settings.simulation.gic_file:
        self.network_graph = self.parse_gic_file()
        self.bus_ids = self.network_graph.nodes.keys()
    else:
        self.network_graph = None

    self.results = Container(self.settings, self.export_settings)
    self.exp_vars = self.results.get_export_variables()
    self.inc_time = True

status()

returns current simulation status

Returns:

Name Type Description
SimulationStatus SimulationStatus

state of the simulator

Source code in pypsse\simulator.py
473
474
475
476
477
478
479
def status(self) -> SimulationStatus:
    """returns current simulation status

    Returns:
        SimulationStatus: state of the simulator
    """
    return self._status.value

step(t)

Steps through a single simulation time step. Is called iteratively to increment the simualtion

Parameters:

Name Type Description Default
t float

time step for the simulation

required

Returns:

Name Type Description
dict dict

results from the current timestep

Source code in pypsse\simulator.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
def step(self, t: float) -> dict:
    """Steps through a single simulation time step. Is called iteratively to increment the simualtion

    Args:
        t (float): time step for the simulation

    Returns:
        dict: results from the current timestep
    """

    self.update_contingencies(t)
    if self.settings.simulation.use_profile_manager:
        self.pm.update()
    ctime = time.time() - self.simStartTime
    logger.debug(
        f"Simulation time: {t} seconds\nRun time: {ctime}\npsse time: {self.sim.get_time()}"
    )
    if self.settings.helics and self.settings.helics.cosimulation_mode:
        if self.settings.helics.create_subscriptions:
            self.update_subscriptions()
            logger.debug(f"Time requested: {t}")
            self.inc_time, helics_time = self.update_federate_time(t)
            logger.debug(f"Time granted: {helics_time}")

    if self.inc_time:
        self.sim.step(t)
    else:
        self.sim.resolve_step()

    if self.settings.helics and self.settings.helics.cosimulation_mode:
        self.publish_data()

    curr_results = self.update_result_container(t)
    return curr_results

update_contingencies(t)

Updates contingencies during the simualtion run

Parameters:

Name Type Description Default
t float

simulation time in seconds

required
Source code in pypsse\simulator.py
510
511
512
513
514
515
516
517
518
def update_contingencies(self, t: float):
    """Updates contingencies during the simualtion run

    Args:
        t (float): simulation time in seconds
    """

    for contingency in self.contingencies:
        contingency.update(t)

update_federate_time(t)

Makes a time request to teh HELICS broker (co-simulation mode only)

Parameters:

Name Type Description Default
t float

simulation time in seconds

required

Returns:

Name Type Description
float (float, float)

requested time in seconds

float (float, float)

current simualtion time in seconds

Source code in pypsse\simulator.py
434
435
436
437
438
439
440
441
442
443
444
445
446
def update_federate_time(self, t: float) -> (float, float):
    """Makes a time request to teh HELICS broker (co-simulation mode only)

    Args:
        t (float): simulation time in seconds

    Returns:
        float: requested time in seconds
        float: current simualtion time in seconds
    """

    inc_time, curr_time = self.hi.request_time(t)
    return inc_time, curr_time

update_result_container(t)

Updates the result container with results from the current time step

Parameters:

Name Type Description Default
t float

simulation time in seconds

required

Returns:

Name Type Description
dict dict

simulation reults from the current time step

Source code in pypsse\simulator.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def update_result_container(self, t: float) -> dict:
    """Updates the result container with results from the current time step

    Args:
        t (float): simulation time in seconds

    Returns:
        dict: simulation reults from the current time step
    """

    if self.export_settings.defined_subsystems_only:
        curr_results = self.sim.read_subsystems(
            self.exp_vars, self.all_subsysten_buses
        )
    else:
        curr_results = self.sim.read_subsystems(
            self.exp_vars, self.raw_data.buses
        )

    if not USING_NAERM:
        if not self.export_settings.export_results_using_channels:
            self.results.update(
                curr_results,
                t,
                self.sim.get_time(),
                self.sim.has_converged(),
            )
    return curr_results

update_subscriptions()

Updates subscriptions (co-simulation mode only)

Source code in pypsse\simulator.py
429
430
431
432
def update_subscriptions(self):
    """Updates subscriptions (co-simulation mode only)"""

    self.hi.subscribe()

This class defines the structure of a PyPSSE project

Source code in pypsse\project.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class Project:
    "This class defines the structure of a PyPSSE project"

    def __init__(self):
        self.basepath = Path(__file__).parent

    def create(
        self,
        parent_path: Path,
        project_name: str,
        psse_folder: Path,
        simulation_settings_file: Path,
        export_settings_file: Path,
        profile_store_file: Path,
        profile_mapping_file: Path,
        overwrite: bool = True,
        autofill: bool = True,
    ):
        """The methods creates a new PyPSSE project

        Args:
            parent_path (Path): path to new pypsse project
            project_name (str): project name
            psse_folder (Path): _description_
            simulation_settings_file (Path): simulation settings toml file path
            export_settings_file (Path): export settings toml file path
            profile_store_file (Path): path to a valid Profiles.hdf5 file (Contains profiles for time series simulations)
            profile_mapping_file (Path): path to a valid Profile_mapping.toml file (used to map profile to PSSE elements)
            overwrite (bool, optional): Attempt to auto fill settings. (Verify manually settings file is correct). Defaults to True.
            autofill (bool, optional): Overwrite project is it already exists. Defaults to True.
        """

        self.project_path = Path(parent_path) / project_name

        exports_dict = toml.load(self.basepath / DEFAULTS_FOLDER / EXPORTS_SETTINGS_FILENAME)
        export_settings = ExportFileOptions(**exports_dict)

        if export_settings_file:
            export_settings_file = Path(export_settings_file)
            assert export_settings_file.exists(), f"Export file '{export_settings_file}' does not exist"
            new_export_settings = toml.load(export_settings_file)
            export_settings.update(**new_export_settings)

        sim_setting_dict = toml.load(self.basepath / DEFAULTS_FOLDER / SIMULATION_SETTINGS_FILENAME)
        sim_setting_dict["simulation"]["project_path"] = str(self.project_path)
        simulation_settings = SimulationSettings(**sim_setting_dict)

        if simulation_settings_file:
            simulation_settings_file = Path(simulation_settings_file)
            assert simulation_settings_file.exists(), f"Export file '{simulation_settings_file}' does not exist"
            sim_setting_dict = toml.load(simulation_settings_file)
            simulation_settings.update(**sim_setting_dict)

        simulation_settings.simulation.project_path = Path(parent_path) / project_name

        self.project = ProjectDefination(
            project_name=project_name,
            simulation_settings=simulation_settings,
            export_settings=export_settings,
            overwrite=overwrite,
            autofill=autofill,
        )

        self._create_folders()

        if psse_folder:
            psse_files = self._copy_psse_project_files(psse_folder)
            if autofill and psse_files:
                self._autofill_settings(psse_files, profile_store_file, profile_mapping_file)

        self._update_export_files()
        self._write_setting_files()

    def _update_export_files(self):
        """sets up export file paths"""
        self.project.simulation_settings.export.out_file = DEFAULT_OUT_FILE
        self.project.simulation_settings.export.outx_file = DEFAULT_OUTX_FILE
        self.project.simulation_settings.export.log_file = DEFAULT_LOG_FILE
        self.project.simulation_settings.export.excel_file = DEFAULT_EXCEL_FILE
        self.project.simulation_settings.export.coordinate_file = DEFAULT_COORDINATES_FILE
        self.project.simulation_settings.export.networkx_graph_file = DEFAULT_GRAPH_FILE

    def _psse_project_file_dict(self, path: Path) -> dict:
        """Creates a mapping of all project files in a folder

        Args:
            path (Path): path (folder) to existing psse project

        Returns:
            dict: mapping of file types to paths
        """

        file_dict = {}
        for _, _, files in os.walk(path):
            for file in files:
                _, ext = file.split(".")
                if ext not in file_dict:
                    file_dict[ext.lower()] = [file]
                else:
                    file_dict[ext.lower()].append(file)
        return file_dict

    def _copy_psse_project_files(self, psse_folder: Path) -> dict:
        """Copies psse files to a new project

        Args:
            psse_folder (Path): path (folder) to existing psse project

        Raises:
            FileExistsError: raised if provided project path does not exist

        Returns:
            dict: mapping of file types to paths
        """

        psse_folder = Path(psse_folder)
        if psse_folder.exists():
            new_path = self.project_path / CASESTUDY_FOLDER
            copy_tree(str(psse_folder.absolute()), str(new_path.absolute()))
            psse_files = self._psse_project_file_dict(new_path)
        else:
            msg = f"PSSE project path does not exist. ({psse_folder}) {os.getcwd()}"
            raise FileExistsError(msg)
        return psse_files

    def _write_setting_files(self):
        """serialized simulation and export setting  files for the new project"""
        sim_file_path = self.project_path / SIMULATION_SETTINGS_FILENAME
        with open(sim_file_path, "w") as f:
            toml.dump(json.loads(self.project.simulation_settings.model_dump_json()), f)
            logger.info(f"writing file : {sim_file_path!s}")

        export_file_path = self.project_path / EXPORTS_SETTINGS_FILENAME
        with open(export_file_path, "w") as f:
            toml.dump(json.loads(self.project.export_settings.model_dump_json()), f)
            logger.info(f"writing file : {export_file_path!s}")

    def _create_folders(self):
        """Creates folder structure for a new project. Older project can be over-written

        Raises:
            FileExistsError: raised if provided folder path does not exist
        """

        for folder in self.project.project_folders:
            project_folder = self.project_path / folder.value
            if project_folder.exists() and not self.project.overwrite:
                msg = "Project folder already exists. Set overwrite=true to overwrite existing projects"
                raise FileExistsError(msg)
            elif not project_folder.exists():
                project_folder.mkdir(parents=True, exist_ok=True)
                logger.info(f"folder created: {project_folder!s}")

    def _autofill_settings(self, psse_files: dict, profile_store_file: Path, profile_mapping_file: Path):
        """The method auto populates fields for a new PyPSSE project

        Args:
            psse_files (dict): mapping of file type to file path
            profile_store_file (Path): path to profile store file (hdf5)
            profile_mapping_file (Path): path to profile mapping file (toml)
        """

        self._update_setting("sav", "case_study", psse_files)
        self._update_setting("raw", "raw_file", psse_files)
        self._update_setting("snp", "snp_file", psse_files)
        self._update_setting("dyr", "dyr_file", psse_files)
        self._update_setting("gic", "gic_file", psse_files)
        self._update_setting("rwm", "rwm_file", psse_files)

        if "dll" in psse_files:
            self.project.simulation_settings.simulation.user_models = psse_files["dll"]
            logger.info(f"user_models={psse_files['dll']}")
        else:
            logger.info("No DLL files found in project path")

        if "idv" in psse_files:
            self.project.simulation_settings.simulation.setup_files = psse_files["idv"]
            logger.info(
                f"setup_files={psse_files['idv']}"
                f"\nSequence of IDV setup files is important. Manually change in TOML file if needed"
            )
        else:
            logger.info("No IDV files found in project path")

        store_path = self.project_path / PROFILES_FOLDER
        if profile_store_file and Path(profile_store_file).exists():
            assert Path(profile_store_file).suffix.lower() == ".hdf5", "Store file should be a valid hdf5 file"
            copy(profile_store_file, store_path)
        else:
            ProfileManager(None, self.project.simulation_settings)

        if profile_mapping_file and Path(profile_mapping_file).exists():
            assert Path(profile_mapping_file).suffix.lower() == ".toml", "Profile mapping should be a valid toml file"
            copy(profile_mapping_file, store_path)
            self.project.simulation_settings.simulation.use_profile_manager = True
        else:
            # TODO: auto generate mapping file from bus subsystem files
            with open(os.path.join(store_path, DEFAULT_PROFILE_MAPPING_FILENAME), "w") as _:
                pass
            self.project.simulation_settings.simulation.use_profile_manager = False

        self._create_default_sub_file()

    def _create_default_sub_file(self):
        """Method creates a subscription file for the HELICS interface"""

        subscription_fields = [x.value for x in SubscriptionFileRequiredColumns]
        data = pd.DataFrame({}, columns=list(subscription_fields))
        data.to_csv(self.project_path / DEFAULT_SUBSCRIPTION_FILENAME, index=False)
        logger.info("Creating subscription template")
        self.project.simulation_settings.simulation.subscriptions_file = DEFAULT_SUBSCRIPTION_FILENAME

    def _update_setting(self, f_type: str, key: str, psse_files: dict):
        """updates settings for the new project

        Args:
            f_type (str): file type
            key (str): simulation setting to update
            psse_files (dict): mapping of file type to file path
        """
        if f_type in psse_files:
            relevent_files = psse_files[f_type]
            setattr(self.project.simulation_settings.simulation, key, relevent_files[0])
            logger.info(f"Settings:{key}={relevent_files[0]}")
            if len(relevent_files) > 1:
                logger.warning(
                    f"More than one file with extension {f_type} exist."
                    f"\nFiles found: {relevent_files}"
                    f"\nManually update the settings toml file"
                )
        else:
            logger.warning(f"No file with extension '{f_type}'")

create(parent_path, project_name, psse_folder, simulation_settings_file, export_settings_file, profile_store_file, profile_mapping_file, overwrite=True, autofill=True)

The methods creates a new PyPSSE project

Parameters:

Name Type Description Default
parent_path Path

path to new pypsse project

required
project_name str

project name

required
psse_folder Path

description

required
simulation_settings_file Path

simulation settings toml file path

required
export_settings_file Path

export settings toml file path

required
profile_store_file Path

path to a valid Profiles.hdf5 file (Contains profiles for time series simulations)

required
profile_mapping_file Path

path to a valid Profile_mapping.toml file (used to map profile to PSSE elements)

required
overwrite bool

Attempt to auto fill settings. (Verify manually settings file is correct). Defaults to True.

True
autofill bool

Overwrite project is it already exists. Defaults to True.

True
Source code in pypsse\project.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def create(
    self,
    parent_path: Path,
    project_name: str,
    psse_folder: Path,
    simulation_settings_file: Path,
    export_settings_file: Path,
    profile_store_file: Path,
    profile_mapping_file: Path,
    overwrite: bool = True,
    autofill: bool = True,
):
    """The methods creates a new PyPSSE project

    Args:
        parent_path (Path): path to new pypsse project
        project_name (str): project name
        psse_folder (Path): _description_
        simulation_settings_file (Path): simulation settings toml file path
        export_settings_file (Path): export settings toml file path
        profile_store_file (Path): path to a valid Profiles.hdf5 file (Contains profiles for time series simulations)
        profile_mapping_file (Path): path to a valid Profile_mapping.toml file (used to map profile to PSSE elements)
        overwrite (bool, optional): Attempt to auto fill settings. (Verify manually settings file is correct). Defaults to True.
        autofill (bool, optional): Overwrite project is it already exists. Defaults to True.
    """

    self.project_path = Path(parent_path) / project_name

    exports_dict = toml.load(self.basepath / DEFAULTS_FOLDER / EXPORTS_SETTINGS_FILENAME)
    export_settings = ExportFileOptions(**exports_dict)

    if export_settings_file:
        export_settings_file = Path(export_settings_file)
        assert export_settings_file.exists(), f"Export file '{export_settings_file}' does not exist"
        new_export_settings = toml.load(export_settings_file)
        export_settings.update(**new_export_settings)

    sim_setting_dict = toml.load(self.basepath / DEFAULTS_FOLDER / SIMULATION_SETTINGS_FILENAME)
    sim_setting_dict["simulation"]["project_path"] = str(self.project_path)
    simulation_settings = SimulationSettings(**sim_setting_dict)

    if simulation_settings_file:
        simulation_settings_file = Path(simulation_settings_file)
        assert simulation_settings_file.exists(), f"Export file '{simulation_settings_file}' does not exist"
        sim_setting_dict = toml.load(simulation_settings_file)
        simulation_settings.update(**sim_setting_dict)

    simulation_settings.simulation.project_path = Path(parent_path) / project_name

    self.project = ProjectDefination(
        project_name=project_name,
        simulation_settings=simulation_settings,
        export_settings=export_settings,
        overwrite=overwrite,
        autofill=autofill,
    )

    self._create_folders()

    if psse_folder:
        psse_files = self._copy_psse_project_files(psse_folder)
        if autofill and psse_files:
            self._autofill_settings(psse_files, profile_store_file, profile_mapping_file)

    self._update_export_files()
    self._write_setting_files()

Class defination for the simulation result container

Source code in pypsse\result_container.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
class Container:
    "Class defination for the simulation result container"

    BULK_WRITE_MODES = [m.value for m in BulkWriteModes]
    STREAMED_WRITE_MODES = [m.value for m in StreamedWriteModes]

    def __init__(self, settings: SimulationSettings, export_settings: ExportFileOptions):
        """Sets up the result container object

        Args:
            settings (SimulationSettings): _description_
            export_settings (ExportAssetTypes): _description_
        """

        export__list = [m.value for m in ModelTypes]
        self.export_path = settings.simulation.project_path / EXPORTS_FOLDER
        self.export_settings = export_settings
        self.settings = settings
        self.results = {}
        self.export_vars = {}
        for class_name in export__list:
            mapped_name = MAPPED_CLASS_NAMES[class_name.lower()]
            variables = getattr(export_settings, class_name.lower())
            if variables:
                for variable in variables:
                    self.results[f"{mapped_name}_{variable.value}"] = None
                    if mapped_name not in self.export_vars:
                        self.export_vars[mapped_name] = []
                    self.export_vars[mapped_name].append(variable.value)

        time_steps = int(
            self.settings.simulation.simulation_time.total_seconds()
            / self.settings.simulation.simulation_step_resolution.total_seconds()
        )
        if self.export_settings.file_format not in self.BULK_WRITE_MODES:
            self.dataWriter = DataWriter(self.export_path, export_settings.file_format.value, time_steps,  self.export_settings.filename_prefix)

    def update_export_variables(self, params: Union[ExportAssetTypes, dict]) -> dict:
        """Updates the container with current system state.
        Method is called iteratively to store results as a simulation executes

        Args:
            params (Union[ExportAssetTypes, dict]): _description_

        Returns:
            dict: mapping of export variables to values
        """
        export__list = [m.value for m in ModelTypes]
        self.results = {}
        self.export_vars = {}
        if params:
            if isinstance(params, ExportAssetTypes):
                class_assets = params
            else:
                class_assets = ExportAssetTypes(**params)
        else:
            class_assets = self.export_settings

        inv_map = {v: k for k, v in MAPPED_CLASS_NAMES.items()}

        for class_name in export__list:
            if inv_map[class_name] in params:
                variables = getattr(class_assets, inv_map[class_name])
                if variables:
                    for variable in variables:
                        self.results[f"{class_name}_{variable.value}"] = None
                        if class_name not in self.export_vars:
                            self.export_vars[class_name] = []
                        self.export_vars[class_name].append(variable.value)

        return self.export_vars

    def get_export_variables(self) -> dict:
        """Queries and return results from the current timestep

        Returns:
            dict: mapping of export variables to values
        """

        return self.export_vars

    def update(self, bus_data: dict, _, time: datetime.datetime, has_converged: bool):
        """Updates the results cotainer

        Args:
            bus_data (dict): mapping of vairables to values
            _ (_type_): _description_
            time (datetime.datetime): simulation time
            has_converged (bool): flag showing if simulation converged
        """

        if self.export_settings.file_format not in self.BULK_WRITE_MODES:
            self.dataWriter.write(time, bus_data, has_converged)
        else:
            for variable_name, _ in bus_data.items():
                if not isinstance(self.results[f"{variable_name}"], pd.DataFrame):
                    self.results[f"{variable_name}"] = pd.DataFrame(bus_data[variable_name], index=[0])
                else:
                    df1 = self.results[f"{variable_name}"]
                    df2 = pd.DataFrame.from_dict([bus_data[variable_name]])
                    concatenated = pd.concat([df1, df2])
                    self.results[f"{variable_name}"] = concatenated
        logger.debug("result container updated")

    def export_results(self):
        """exports all results stored to an external file"""

        if self.export_settings.file_format in self.BULK_WRITE_MODES:
            for df_name, df in self.results.items():
                export_path = (
                    self.settings.simulation.project_path
                    / EXPORTS_FOLDER
                    / f'{df_name}.{self.export_settings.file_format}'
                )
                if self.export_settings.file_format == BulkWriteModes.CSV:
                    if isinstance(df, pd.DataFrame):
                        df.to_csv(export_path)
                elif self.export_settings.file_format == BulkWriteModes.PKL:
                    df.to_pickle(export_path)
                logger.info(f"results exported to {export_path}")

__init__(settings, export_settings)

Sets up the result container object

Parameters:

Name Type Description Default
settings SimulationSettings

description

required
export_settings ExportAssetTypes

description

required
Source code in pypsse\result_container.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
def __init__(self, settings: SimulationSettings, export_settings: ExportFileOptions):
    """Sets up the result container object

    Args:
        settings (SimulationSettings): _description_
        export_settings (ExportAssetTypes): _description_
    """

    export__list = [m.value for m in ModelTypes]
    self.export_path = settings.simulation.project_path / EXPORTS_FOLDER
    self.export_settings = export_settings
    self.settings = settings
    self.results = {}
    self.export_vars = {}
    for class_name in export__list:
        mapped_name = MAPPED_CLASS_NAMES[class_name.lower()]
        variables = getattr(export_settings, class_name.lower())
        if variables:
            for variable in variables:
                self.results[f"{mapped_name}_{variable.value}"] = None
                if mapped_name not in self.export_vars:
                    self.export_vars[mapped_name] = []
                self.export_vars[mapped_name].append(variable.value)

    time_steps = int(
        self.settings.simulation.simulation_time.total_seconds()
        / self.settings.simulation.simulation_step_resolution.total_seconds()
    )
    if self.export_settings.file_format not in self.BULK_WRITE_MODES:
        self.dataWriter = DataWriter(self.export_path, export_settings.file_format.value, time_steps,  self.export_settings.filename_prefix)

export_results()

exports all results stored to an external file

Source code in pypsse\result_container.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
def export_results(self):
    """exports all results stored to an external file"""

    if self.export_settings.file_format in self.BULK_WRITE_MODES:
        for df_name, df in self.results.items():
            export_path = (
                self.settings.simulation.project_path
                / EXPORTS_FOLDER
                / f'{df_name}.{self.export_settings.file_format}'
            )
            if self.export_settings.file_format == BulkWriteModes.CSV:
                if isinstance(df, pd.DataFrame):
                    df.to_csv(export_path)
            elif self.export_settings.file_format == BulkWriteModes.PKL:
                df.to_pickle(export_path)
            logger.info(f"results exported to {export_path}")

get_export_variables()

Queries and return results from the current timestep

Returns:

Name Type Description
dict dict

mapping of export variables to values

Source code in pypsse\result_container.py
85
86
87
88
89
90
91
92
def get_export_variables(self) -> dict:
    """Queries and return results from the current timestep

    Returns:
        dict: mapping of export variables to values
    """

    return self.export_vars

update(bus_data, _, time, has_converged)

Updates the results cotainer

Parameters:

Name Type Description Default
bus_data dict

mapping of vairables to values

required
_ _type_

description

required
time datetime

simulation time

required
has_converged bool

flag showing if simulation converged

required
Source code in pypsse\result_container.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def update(self, bus_data: dict, _, time: datetime.datetime, has_converged: bool):
    """Updates the results cotainer

    Args:
        bus_data (dict): mapping of vairables to values
        _ (_type_): _description_
        time (datetime.datetime): simulation time
        has_converged (bool): flag showing if simulation converged
    """

    if self.export_settings.file_format not in self.BULK_WRITE_MODES:
        self.dataWriter.write(time, bus_data, has_converged)
    else:
        for variable_name, _ in bus_data.items():
            if not isinstance(self.results[f"{variable_name}"], pd.DataFrame):
                self.results[f"{variable_name}"] = pd.DataFrame(bus_data[variable_name], index=[0])
            else:
                df1 = self.results[f"{variable_name}"]
                df2 = pd.DataFrame.from_dict([bus_data[variable_name]])
                concatenated = pd.concat([df1, df2])
                self.results[f"{variable_name}"] = concatenated
    logger.debug("result container updated")

update_export_variables(params)

Updates the container with current system state. Method is called iteratively to store results as a simulation executes

Parameters:

Name Type Description Default
params Union[ExportAssetTypes, dict]

description

required

Returns:

Name Type Description
dict dict

mapping of export variables to values

Source code in pypsse\result_container.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def update_export_variables(self, params: Union[ExportAssetTypes, dict]) -> dict:
    """Updates the container with current system state.
    Method is called iteratively to store results as a simulation executes

    Args:
        params (Union[ExportAssetTypes, dict]): _description_

    Returns:
        dict: mapping of export variables to values
    """
    export__list = [m.value for m in ModelTypes]
    self.results = {}
    self.export_vars = {}
    if params:
        if isinstance(params, ExportAssetTypes):
            class_assets = params
        else:
            class_assets = ExportAssetTypes(**params)
    else:
        class_assets = self.export_settings

    inv_map = {v: k for k, v in MAPPED_CLASS_NAMES.items()}

    for class_name in export__list:
        if inv_map[class_name] in params:
            variables = getattr(class_assets, inv_map[class_name])
            if variables:
                for variable in variables:
                    self.results[f"{class_name}_{variable.value}"] = None
                    if class_name not in self.export_vars:
                        self.export_vars[class_name] = []
                    self.export_vars[class_name].append(variable.value)

    return self.export_vars

Profile management

ProfileManager

Implentation for the profile manager for PyPSSE. Enables attacheing profilse to all PSSE objects and associated properties

Source code in pypsse\profile_manager\profile_store.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
class ProfileManager:
    """Implentation for the profile manager for PyPSSE.
    Enables attacheing profilse to all PSSE objects and associated properties"""

    def __init__(
        self, solver: Union[ProductionCostModel, Snap, Static, Dynamic], settings: SimulationSettings, mode: str = "r+"
    ):
        """Creates an instance of the profile manager

        Args:
            solver (Union[ProductionCostModel, Snap, Static, Dynamic]): instance of simulation controller
            settings (SimulationSettings): simulation settings
            mode (str, optional): file update mode. Defaults to "r+".
        """

        self.solver = solver
        self.settings = settings

        file_path = settings.simulation.project_path / PROFILES_FOLDER / DEFAULT_PROFILE_STORE_FILENAME

        if file_path.exists():
            logger.info("Loading existing h5 store")
            self.store = h5py.File(file_path, mode)
        else:
            logger.info("Creating new h5 store")
            self.store = h5py.File(file_path, "w")
            for profile_group in [p.value for p in ProfileTypes]:
                self.store.create_group(profile_group)

    def load_data(self, file_path: Path) -> dict:
        """Load in external profile data

        Args:
            file_path (Path): path to profile mapping file

        Returns:
            dict: profile mapping dictionary
        """

        toml_dict = toml.load(file_path)
        return toml_dict

    def setup_profiles(self):
        """sets up all profiles in the profile manager

        Raises:
            Exception: raised if mapped object not found in profile DB
        """

        mapping_path = self.settings.simulation.project_path / PROFILES_FOLDER / DEFAULT_PROFILE_MAPPING_FILENAME
        if mapping_path.exists():
            self.profile_mapping = self.load_data(mapping_path)
            self.profiles = {}
            for group, profile_map in self.profile_mapping.items():
                if group in self.store:
                    grp = self.store[group]
                    for profile_name, mapping_dict in profile_map.items():
                        if profile_name in grp:
                            self.profiles[f"{group}/{profile_name}"] = Profile(
                                grp[profile_name], self.solver, mapping_dict
                            )
                        else:
                            logger.warning(rf"Group {group} \ data set {profile_name} not found in the h5 store")
                else:
                    logger.warning(f"Group {group} not found in the h5 store")
        else:
            msg = f"Profile_mapping.toml file does not exist in path {mapping_path}"
            raise Exception(msg)

    def create_dataset(
        self,
        dname: str,
        p_type: ProfileTypes,
        data: pd.DataFrame,
        start_time: datetime.datetime,
        resolution: float,
        units:str,
        info: str,
    ):
        """Create new profile datasets

        Args:
            dname (str): dateset name
            p_type (ProfileTypes): profile type
            data (pd.DataFrame): data
            start_time (datetime.datetime): profile start time
            resolution (float): profile resolution
            _ (_type_): _description_
            info (:str): profile description

        Raises:
            Exception: raised if dataset already exists
        """

        grp = self.store[p_type]
        if dname not in grp:
            sa, sa_type = self.df_to_sarray(data)
            dset = grp.create_dataset(
                dname, data=sa, chunks=True, compression="gzip", compression_opts=4, shuffle=True, dtype=sa_type
            )
            self.create_metadata(dset, start_time, resolution, data, list(data.columns), info, p_type)
        else:
            logger.error(f'Data set "{dname}" already exists in group "{p_type}".')
            msg = f'Data set "{dname}" already exists in group "{p_type}".'
            raise Exception(msg)

    def df_to_sarray(self, df: pd.DataFrame) -> [str, str]:
        """Enables data converson

        Args:
            df (pd.DataFrame): _description_

        Raises:
            SystemError: raised if unable to convert

        Returns:
            [str, str]: returns column name and datatype
        """

        def make_col_type(col_type, col):
            try:
                if "numpy.object_" in str(col_type.type):
                    maxlens = col.dropna().str.len()
                    if maxlens.any():
                        maxlen = maxlens.max().astype(int)
                        col_type = ("S%s" % maxlen, 1)
                    else:
                        col_type = "f2"
                return col.name, col_type
            except:
                raise SystemError("Unable to convert dataframe to np array")

        v = df.values
        types = df.dtypes
        numpy_struct_types = [make_col_type(types[col], df.loc[:, col]) for col in df.columns]
        dtype = np.dtype(numpy_struct_types)
        z = np.zeros(v.shape[0], dtype)
        for i, k in enumerate(z.dtype.names):
            # This is in case you have problems with the encoding, remove the if branch if not
            try:
                if dtype[i].str.startswith("|S"):
                    z[k] = df[k].str.encode("latin").astype("S")
                else:
                    z[k] = v[:, i]
            except:
                raise

        return z, dtype

    def add_profiles_from_csv(
        self,
        csv_file: Path,
        name: str,
        p_type: ProfileTypes,
        start_time: datetime.date,
        resolution_sec: float = 900,
        units: str = "",
        info: str = "",
    ):
        """enables profiles from existing csv files

        Args:
            csv_file (Path): path to profiles in a csv file
            name (str): profile name
            p_type (ProfileTypes): profile type
            start_time (datetime.date): profile start time
            resolution_sec (float, optional): profile resolution in seconds. Defaults to 900.
            units (str, optional): profile units. Defaults to "".
            info (str, optional): profile into. Defaults to "".

        Raises:
            ValueError: rasied if invalid profile name passed
            ValueError: rasied if invalid profile type passed
        """

        if p_type not in ProfileTypes:
            msg = f"Valid profile types are: {list(PROFILE_VALIDATION.keys())}"
            raise ValueError(msg)
        logger.debug("Reading profile")
        data = pd.read_csv(csv_file)

        for c in data.columns:
            if c not in PROFILE_VALIDATION[p_type]:
                msg = f"{c} is not valid, Valid subtypes for '{p_type}' are: {PROFILE_VALIDATION[p_type]}"
                raise ValueError(msg)

        logger.debug("Attempting to add a profile")
        self.add_profiles(name, data, p_type, start_time, resolution_sec=resolution_sec, units=units, info=info)

    def add_profiles(
        self,
        name: str,
        data: object,
        p_type: ProfileTypes,
        start_time: datetime.date,
        resolution_sec: float = 900,
        units: str = "",
        info: str = "",
    ):
        """adds a profile to the profile manager

        Args:
            name (str): profile name
            data (object): profile data object
            p_type (ProfileTypes): profile type
            start_time (datetime.date): profile start time
            resolution_sec (float, optional): profile resolution in seconds. Defaults to 900.
            units (str, optional): profile units. Defaults to "".
            info (str, optional): profile into. Defaults to "".

        Raises:
            InvalidParameterError: raised if start_time not a datetime object
            InvalidParameterError: raised if invalid profile type passed
        """

        if type(start_time) is not datetime.datetime:
            msg = "start_time should be a python datetime object"
            raise InvalidParameterError(msg)

        if p_type not in ProfileTypes:
            msg = f"Valid values for p_type are {ProfileTypes.names()}"
            raise InvalidParameterError(msg)
        self.create_dataset(name, p_type, data, start_time, resolution_sec, units=units, info=info)

    def create_metadata(
        self,
        d_set: str,
        start_time: datetime.date,
        resolution: float,
        data: object,
        units: str,
        info: str,
        p_type: ProfileTypes,
    ):
        """adds a metadata to a new profile

        Args:
            d_set (str): dataset name
            start_time (datetime.date): profile start time
            resolution (float): profile resolution
            data (object): profile data object
            units (str): profile units
            info (str): profile info
            p_type (ProfileTypes): profile type
        """

        metadata = {
            "sTime": str(start_time),
            "eTime": str(start_time + datetime.timedelta(seconds=resolution * len(data))),
            "resTime": resolution,
            "npts": len(data),
            "min": data.min(),
            "max": data.max(),
            "mean": np.mean(data),
            "units": units,
            "info": info,
            "type": p_type,
        }
        for key, value in metadata.items():
            if isinstance(value, str):
                value_mod = np.string_(value)
            else:
                value_mod = value
            d_set.attrs[key] = value_mod

    def update(self) -> dict:
        """returns data for the current timestep for all mapped profiles

        Returns:
            dict: values for profiles at the current time step
        """

        results = {}
        for profile_name, profile_obj in self.profiles.items():
            result = profile_obj.update()
            results[profile_name] = result

        return results

    def __del__(self):
        self.store.flush()

__init__(solver, settings, mode='r+')

Creates an instance of the profile manager

Parameters:

Name Type Description Default
solver Union[ProductionCostModel, Snap, Static, Dynamic]

instance of simulation controller

required
settings SimulationSettings

simulation settings

required
mode str

file update mode. Defaults to "r+".

'r+'
Source code in pypsse\profile_manager\profile_store.py
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self, solver: Union[ProductionCostModel, Snap, Static, Dynamic], settings: SimulationSettings, mode: str = "r+"
):
    """Creates an instance of the profile manager

    Args:
        solver (Union[ProductionCostModel, Snap, Static, Dynamic]): instance of simulation controller
        settings (SimulationSettings): simulation settings
        mode (str, optional): file update mode. Defaults to "r+".
    """

    self.solver = solver
    self.settings = settings

    file_path = settings.simulation.project_path / PROFILES_FOLDER / DEFAULT_PROFILE_STORE_FILENAME

    if file_path.exists():
        logger.info("Loading existing h5 store")
        self.store = h5py.File(file_path, mode)
    else:
        logger.info("Creating new h5 store")
        self.store = h5py.File(file_path, "w")
        for profile_group in [p.value for p in ProfileTypes]:
            self.store.create_group(profile_group)

add_profiles(name, data, p_type, start_time, resolution_sec=900, units='', info='')

adds a profile to the profile manager

Parameters:

Name Type Description Default
name str

profile name

required
data object

profile data object

required
p_type ProfileTypes

profile type

required
start_time date

profile start time

required
resolution_sec float

profile resolution in seconds. Defaults to 900.

900
units str

profile units. Defaults to "".

''
info str

profile into. Defaults to "".

''

Raises:

Type Description
InvalidParameterError

raised if start_time not a datetime object

InvalidParameterError

raised if invalid profile type passed

Source code in pypsse\profile_manager\profile_store.py
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
def add_profiles(
    self,
    name: str,
    data: object,
    p_type: ProfileTypes,
    start_time: datetime.date,
    resolution_sec: float = 900,
    units: str = "",
    info: str = "",
):
    """adds a profile to the profile manager

    Args:
        name (str): profile name
        data (object): profile data object
        p_type (ProfileTypes): profile type
        start_time (datetime.date): profile start time
        resolution_sec (float, optional): profile resolution in seconds. Defaults to 900.
        units (str, optional): profile units. Defaults to "".
        info (str, optional): profile into. Defaults to "".

    Raises:
        InvalidParameterError: raised if start_time not a datetime object
        InvalidParameterError: raised if invalid profile type passed
    """

    if type(start_time) is not datetime.datetime:
        msg = "start_time should be a python datetime object"
        raise InvalidParameterError(msg)

    if p_type not in ProfileTypes:
        msg = f"Valid values for p_type are {ProfileTypes.names()}"
        raise InvalidParameterError(msg)
    self.create_dataset(name, p_type, data, start_time, resolution_sec, units=units, info=info)

add_profiles_from_csv(csv_file, name, p_type, start_time, resolution_sec=900, units='', info='')

enables profiles from existing csv files

Parameters:

Name Type Description Default
csv_file Path

path to profiles in a csv file

required
name str

profile name

required
p_type ProfileTypes

profile type

required
start_time date

profile start time

required
resolution_sec float

profile resolution in seconds. Defaults to 900.

900
units str

profile units. Defaults to "".

''
info str

profile into. Defaults to "".

''

Raises:

Type Description
ValueError

rasied if invalid profile name passed

ValueError

rasied if invalid profile type passed

Source code in pypsse\profile_manager\profile_store.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def add_profiles_from_csv(
    self,
    csv_file: Path,
    name: str,
    p_type: ProfileTypes,
    start_time: datetime.date,
    resolution_sec: float = 900,
    units: str = "",
    info: str = "",
):
    """enables profiles from existing csv files

    Args:
        csv_file (Path): path to profiles in a csv file
        name (str): profile name
        p_type (ProfileTypes): profile type
        start_time (datetime.date): profile start time
        resolution_sec (float, optional): profile resolution in seconds. Defaults to 900.
        units (str, optional): profile units. Defaults to "".
        info (str, optional): profile into. Defaults to "".

    Raises:
        ValueError: rasied if invalid profile name passed
        ValueError: rasied if invalid profile type passed
    """

    if p_type not in ProfileTypes:
        msg = f"Valid profile types are: {list(PROFILE_VALIDATION.keys())}"
        raise ValueError(msg)
    logger.debug("Reading profile")
    data = pd.read_csv(csv_file)

    for c in data.columns:
        if c not in PROFILE_VALIDATION[p_type]:
            msg = f"{c} is not valid, Valid subtypes for '{p_type}' are: {PROFILE_VALIDATION[p_type]}"
            raise ValueError(msg)

    logger.debug("Attempting to add a profile")
    self.add_profiles(name, data, p_type, start_time, resolution_sec=resolution_sec, units=units, info=info)

create_dataset(dname, p_type, data, start_time, resolution, units, info)

Create new profile datasets

Parameters:

Name Type Description Default
dname str

dateset name

required
p_type ProfileTypes

profile type

required
data DataFrame

data

required
start_time datetime

profile start time

required
resolution float

profile resolution

required
_ _type_

description

required
info

str): profile description

required

Raises:

Type Description
Exception

raised if dataset already exists

Source code in pypsse\profile_manager\profile_store.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
def create_dataset(
    self,
    dname: str,
    p_type: ProfileTypes,
    data: pd.DataFrame,
    start_time: datetime.datetime,
    resolution: float,
    units:str,
    info: str,
):
    """Create new profile datasets

    Args:
        dname (str): dateset name
        p_type (ProfileTypes): profile type
        data (pd.DataFrame): data
        start_time (datetime.datetime): profile start time
        resolution (float): profile resolution
        _ (_type_): _description_
        info (:str): profile description

    Raises:
        Exception: raised if dataset already exists
    """

    grp = self.store[p_type]
    if dname not in grp:
        sa, sa_type = self.df_to_sarray(data)
        dset = grp.create_dataset(
            dname, data=sa, chunks=True, compression="gzip", compression_opts=4, shuffle=True, dtype=sa_type
        )
        self.create_metadata(dset, start_time, resolution, data, list(data.columns), info, p_type)
    else:
        logger.error(f'Data set "{dname}" already exists in group "{p_type}".')
        msg = f'Data set "{dname}" already exists in group "{p_type}".'
        raise Exception(msg)

create_metadata(d_set, start_time, resolution, data, units, info, p_type)

adds a metadata to a new profile

Parameters:

Name Type Description Default
d_set str

dataset name

required
start_time date

profile start time

required
resolution float

profile resolution

required
data object

profile data object

required
units str

profile units

required
info str

profile info

required
p_type ProfileTypes

profile type

required
Source code in pypsse\profile_manager\profile_store.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def create_metadata(
    self,
    d_set: str,
    start_time: datetime.date,
    resolution: float,
    data: object,
    units: str,
    info: str,
    p_type: ProfileTypes,
):
    """adds a metadata to a new profile

    Args:
        d_set (str): dataset name
        start_time (datetime.date): profile start time
        resolution (float): profile resolution
        data (object): profile data object
        units (str): profile units
        info (str): profile info
        p_type (ProfileTypes): profile type
    """

    metadata = {
        "sTime": str(start_time),
        "eTime": str(start_time + datetime.timedelta(seconds=resolution * len(data))),
        "resTime": resolution,
        "npts": len(data),
        "min": data.min(),
        "max": data.max(),
        "mean": np.mean(data),
        "units": units,
        "info": info,
        "type": p_type,
    }
    for key, value in metadata.items():
        if isinstance(value, str):
            value_mod = np.string_(value)
        else:
            value_mod = value
        d_set.attrs[key] = value_mod

df_to_sarray(df)

Enables data converson

Parameters:

Name Type Description Default
df DataFrame

description

required

Raises:

Type Description
SystemError

raised if unable to convert

Returns:

Type Description
[str, str]

[str, str]: returns column name and datatype

Source code in pypsse\profile_manager\profile_store.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def df_to_sarray(self, df: pd.DataFrame) -> [str, str]:
    """Enables data converson

    Args:
        df (pd.DataFrame): _description_

    Raises:
        SystemError: raised if unable to convert

    Returns:
        [str, str]: returns column name and datatype
    """

    def make_col_type(col_type, col):
        try:
            if "numpy.object_" in str(col_type.type):
                maxlens = col.dropna().str.len()
                if maxlens.any():
                    maxlen = maxlens.max().astype(int)
                    col_type = ("S%s" % maxlen, 1)
                else:
                    col_type = "f2"
            return col.name, col_type
        except:
            raise SystemError("Unable to convert dataframe to np array")

    v = df.values
    types = df.dtypes
    numpy_struct_types = [make_col_type(types[col], df.loc[:, col]) for col in df.columns]
    dtype = np.dtype(numpy_struct_types)
    z = np.zeros(v.shape[0], dtype)
    for i, k in enumerate(z.dtype.names):
        # This is in case you have problems with the encoding, remove the if branch if not
        try:
            if dtype[i].str.startswith("|S"):
                z[k] = df[k].str.encode("latin").astype("S")
            else:
                z[k] = v[:, i]
        except:
            raise

    return z, dtype

load_data(file_path)

Load in external profile data

Parameters:

Name Type Description Default
file_path Path

path to profile mapping file

required

Returns:

Name Type Description
dict dict

profile mapping dictionary

Source code in pypsse\profile_manager\profile_store.py
51
52
53
54
55
56
57
58
59
60
61
62
def load_data(self, file_path: Path) -> dict:
    """Load in external profile data

    Args:
        file_path (Path): path to profile mapping file

    Returns:
        dict: profile mapping dictionary
    """

    toml_dict = toml.load(file_path)
    return toml_dict

setup_profiles()

sets up all profiles in the profile manager

Raises:

Type Description
Exception

raised if mapped object not found in profile DB

Source code in pypsse\profile_manager\profile_store.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
def setup_profiles(self):
    """sets up all profiles in the profile manager

    Raises:
        Exception: raised if mapped object not found in profile DB
    """

    mapping_path = self.settings.simulation.project_path / PROFILES_FOLDER / DEFAULT_PROFILE_MAPPING_FILENAME
    if mapping_path.exists():
        self.profile_mapping = self.load_data(mapping_path)
        self.profiles = {}
        for group, profile_map in self.profile_mapping.items():
            if group in self.store:
                grp = self.store[group]
                for profile_name, mapping_dict in profile_map.items():
                    if profile_name in grp:
                        self.profiles[f"{group}/{profile_name}"] = Profile(
                            grp[profile_name], self.solver, mapping_dict
                        )
                    else:
                        logger.warning(rf"Group {group} \ data set {profile_name} not found in the h5 store")
            else:
                logger.warning(f"Group {group} not found in the h5 store")
    else:
        msg = f"Profile_mapping.toml file does not exist in path {mapping_path}"
        raise Exception(msg)

update()

returns data for the current timestep for all mapped profiles

Returns:

Name Type Description
dict dict

values for profiles at the current time step

Source code in pypsse\profile_manager\profile_store.py
287
288
289
290
291
292
293
294
295
296
297
298
299
def update(self) -> dict:
    """returns data for the current timestep for all mapped profiles

    Returns:
        dict: values for profiles at the current time step
    """

    results = {}
    for profile_name, profile_obj in self.profiles.items():
        result = profile_obj.update()
        results[profile_name] = result

    return results

Profile

Class defination fora single profile

Source code in pypsse\profile_manager\profile.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
class Profile:
    "Class defination fora single profile"

    DEFAULT_SETTINGS = {"multiplier": 1, "normalize": False, "interpolate": False}

    def __init__(self, profile_obj, solver, mapping_dict, buffer_size=10, neglect_year=True):
        self.value_settings = {f"{x['bus']}__{x['id']}": {**self.DEFAULT_SETTINGS, **x} for x in mapping_dict}
        self.mapping_dict = mapping_dict
        self.buffer_size = buffer_size
        self.buffer = np.zeros(buffer_size)
        self.profile = profile_obj
        self.neglect_year = neglect_year
        self.solver = solver
        self.attrs = self.profile.attrs
        s = self.attrs["sTime"].decode()
        stime = s if "." in s else s + ".00"
        e = self.attrs["eTime"].decode()
        etime = e if "." in e else e + ".00"
        self.stime = datetime.datetime.strptime(stime, "%Y-%m-%d %H:%M:%S.%f").astimezone(None)
        self.etime = datetime.datetime.strptime(etime, "%Y-%m-%d %H:%M:%S.%f").astimezone(None)
        self.sim_res = self.solver.get_step_size_cec()
        self.time = copy.deepcopy(self.solver.get_time())
        self.columns = self.attrs["units"]
        self.dtype = self.attrs["type"].decode()

    def update(self, update_object_properties=True):
        "Returns value at the current timestep in the given profile"
        self.time = copy.deepcopy(self.solver.get_time()).astimezone(None)
        if self.time < self.stime or self.time > self.etime:
            value = np.array([0] * len(self.profile[0]))
            value1 = np.array([0] * len(self.profile[0]))
        else:
            dt = (self.time - self.stime).total_seconds()
            n = int(dt / self.attrs["resTime"])
            value = np.array(list(self.profile[n]))
            try:
                valuen1 = np.array(list(self.profile[n + 1]))
            except Exception as _:
                valuen1 = value

            dt2 = (
                self.time - (self.stime + datetime.timedelta(seconds=int(n * self.attrs["resTime"])))
            ).total_seconds()
            value1 = value + (valuen1 - value) * dt2 / self.attrs["resTime"]

        if update_object_properties:
            for obj_name in self.value_settings:
                bus, object_id = obj_name.split("__")
                if self.value_settings[obj_name]["interpolate"]:
                    value = value1
                mult = self.value_settings[obj_name]["multiplier"]
                if isinstance(mult, list):
                    mult = np.array(mult)
                if self.value_settings[obj_name]["normalize"]:
                    value_f = value / self.attrs["max"] * mult
                else:
                    value_f = value * mult
                value_f = self.fill_missing_values(value_f)
                self.solver.update_object(self.dtype, bus, object_id, value_f)
                logger.debug(f"Object updated: {object_id}.{bus}.{self.dtype}={value_f}")
        return value

    def fill_missing_values(self, value):
        "Fixes issues in profile data"
        idx = [f"realar{PROFILE_VALIDATION[self.dtype].index(c) + 1}" for c in self.columns]
        x = dict(zip(idx, list(value)))
        return x

fill_missing_values(value)

Fixes issues in profile data

Source code in pypsse\profile_manager\profile.py
72
73
74
75
76
def fill_missing_values(self, value):
    "Fixes issues in profile data"
    idx = [f"realar{PROFILE_VALIDATION[self.dtype].index(c) + 1}" for c in self.columns]
    x = dict(zip(idx, list(value)))
    return x

update(update_object_properties=True)

Returns value at the current timestep in the given profile

Source code in pypsse\profile_manager\profile.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def update(self, update_object_properties=True):
    "Returns value at the current timestep in the given profile"
    self.time = copy.deepcopy(self.solver.get_time()).astimezone(None)
    if self.time < self.stime or self.time > self.etime:
        value = np.array([0] * len(self.profile[0]))
        value1 = np.array([0] * len(self.profile[0]))
    else:
        dt = (self.time - self.stime).total_seconds()
        n = int(dt / self.attrs["resTime"])
        value = np.array(list(self.profile[n]))
        try:
            valuen1 = np.array(list(self.profile[n + 1]))
        except Exception as _:
            valuen1 = value

        dt2 = (
            self.time - (self.stime + datetime.timedelta(seconds=int(n * self.attrs["resTime"])))
        ).total_seconds()
        value1 = value + (valuen1 - value) * dt2 / self.attrs["resTime"]

    if update_object_properties:
        for obj_name in self.value_settings:
            bus, object_id = obj_name.split("__")
            if self.value_settings[obj_name]["interpolate"]:
                value = value1
            mult = self.value_settings[obj_name]["multiplier"]
            if isinstance(mult, list):
                mult = np.array(mult)
            if self.value_settings[obj_name]["normalize"]:
                value_f = value / self.attrs["max"] * mult
            else:
                value_f = value * mult
            value_f = self.fill_missing_values(value_f)
            self.solver.update_object(self.dtype, bus, object_id, value_f)
            logger.debug(f"Object updated: {object_id}.{bus}.{self.dtype}={value_f}")
    return value

Simulation modes

Dynamic

Bases: AbstractMode, DynamicUtils

Class defination for dynamic simulation mode (uses dyr and raw files)

Source code in pypsse\modes\dynamic.py
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
class Dynamic(AbstractMode, DynamicUtils):
    "Class defination for dynamic simulation mode (uses dyr and raw files)"

    def __init__(
        self,
        psse,
        dyntools,
        settings: SimulationSettings,
        export_settings: ExportFileOptions,
        subsystem_buses,
        raw_data,
    ):
        super().__init__(psse, dyntools, settings, export_settings, subsystem_buses, raw_data)
        self.time = settings.simulation.start_time
        self._StartTime = settings.simulation.start_time
        self.incTime = settings.simulation.simulation_step_resolution
        self.init(subsystem_buses)

    def init(self, bus_subsystems):
        "Initializes the simulation"
        super().init(bus_subsystems)
        self.iter_const = 100.0

        if self.settings.simulation.rwm_file:
            self.psse.mcre([1, 0], self.rwn_file)

        self.psse.fnsl([0, 0, 0, 1, 0, 0, 0, self._i])

        self.load_setup_files()
        self.convert_load()

        self.psse.gnet(1, 0)
        self.psse.fdns([1, 1, 0, 1, 1, 0, 0, 0])
        self.psse.fnsl([1, 1, 0, 1, 1, 0, 0, 0])
        self.psse.cong(0)
        # Solve for dynamics
        self.psse.ordr(0)
        self.psse.fact()
        self.psse.tysl(0)
        self.psse.tysl(0)
        # self.psse.save(self.study_case_path.split('.')[0] + ".sav")
        dyr_path = self.settings.simulation.dyr_file
        assert dyr_path and dyr_path.exists
        logger.debug(f"Loading dynamic model....{dyr_path}")
        self.psse.dynamicsmode(1)
        ierr = self.psse.dyre_new([1, 1, 1, 1], str(dyr_path), r"""conec""", r"""conet""", r"""compile""")

        if self.settings.helics and self.settings.helics.cosimulation_mode:
            if self.settings.helics.iterative_mode:
                sim_step = self.settings.simulation.psse_solver_timestep.total_seconds() / self.iter_const
            else:
                sim_step = self.settings.simulation.psse_solver_timestep.total_seconds()
        else:
            sim_step = self.settings.simulation.psse_solver_timestep.total_seconds()

        ierr = self.psse.dynamics_solution_param_2(
            [60, self._i, self._i, self._i, self._i, self._i, self._i, self._i],
            [0.4, self._f, sim_step, self._f, self._f, self._f, self._f, self._f],
        )

        if ierr:
            msg = f'Error loading dynamic model file "{dyr_path}". Error code - {ierr}'
            raise Exception(msg)
        else:
            logger.debug(f"Dynamic file {dyr_path} sucessfully loaded")

        self.disable_load_models_for_coupled_buses()

        if self.export_settings.export_results_using_channels:
            self.setup_channels()

        self.psse.delete_all_plot_channels()

        self.setup_all_channels()

        # Load user defined models
        self.load_user_defined_models()

        # Load flow settings
        self.psse.fdns([0, 0, 0, 1, 1, 0, 99, 0])
        # initialize
        outx_file  = str(self.settings.export.outx_file).split("\\")
        outx_file[-1] = self.export_settings.filename_prefix + "_" + outx_file[-1]
        outx_file = "\\".join(outx_file)
        ierr = self.psse.strt_2(
            [
                1,
                self.settings.generators.missing_machine_model,
            ],
            outx_file,
        )
        if ierr:
            self.initialization_complete = False
            msg = f"Dynamic simulation failed to successfully initialize. Error code - {ierr}"
            raise Exception(msg)
        else:
            self.initialization_complete = True
            logger.debug("Dynamic simulation initialization sucess!")
        # get load info for the sub system
        # self.load_info = self.get_load_indices(bus_subsystems)

        logger.debug("pyPSSE initialization complete!")

        self.xTime = 0

        return self.initialization_complete

    def step(self, t):
        "Increments the simulation"

        self.time = self.time + self.incTime
        self.xTime = 0
        return self.psse.run(0, t, 1, 1, 1)

    # @kapil do you need this?

    # def get_load_indices(self, bus_subsystems):
    #     "Returns load indices"

    #     all_bus_ids = {}
    #     for bus_subsystem_id in bus_subsystems.keys():
    #         load_info = {}
    #         ierr, load_data = self.psse.aloadchar(bus_subsystem_id, 1, ["ID", "NAME", "EXNAME"])

    #         load_data = np.array(load_data)
    #         ierr, bus_data = self.psse.aloadint(bus_subsystem_id, 1, ["NUMBER"])

    #         bus_data = bus_data[0]
    #         for i, bus_id in enumerate(bus_data):
    #             load_info[bus_id] = {
    #                 "Load ID": load_data[0, i],
    #                 "Bus name": load_data[1, i],
    #                 "Bus name (ext)": load_data[2, i],
    #             }
    #         all_bus_ids[bus_subsystem_id] = load_info
    #     return all_bus_ids

    def resolve_step(self, t):
        "Resolves the current time step"

        err = self.psse.run(0, t + self.xTime * self.incTime / self.iter_const, 1, 1, 1)
        self.xTime += 1
        return err

    def get_time(self):
        "Returns current simulator time"

        return self.time

    def get_total_seconds(self):
        "Returns total simulation time"

        return (self.time - self._StartTime).total_seconds()

    def get_step_size_cec(self):
        "Returns simulation timestep resolution"
        return self.settings.simulation.simulation_step_resolution.total_seconds()

    @converter
    def read_subsystems(self, quantities, subsystem_buses, ext_string2_info=None, mapping_dict=None):
        "Queries the result container for current results"

        if ext_string2_info is None:
            ext_string2_info = {}
        if mapping_dict is None:
            mapping_dict = {}
        results = super().read_subsystems(
            quantities, subsystem_buses, mapping_dict=mapping_dict, ext_string2_info=ext_string2_info
        )

        poll_results = self.poll_channels()
        results.update(poll_results)
        for class_name, var_list in quantities.items():
            if class_name in dyn_only_options:
                for v in var_list:
                    if v in DYNAMIC_ONLY_PPTY[class_name]:
                        for func_name in dyn_only_options[class_name]:
                            if v in dyn_only_options[class_name][func_name]:
                                con_ind = dyn_only_options[class_name][func_name][v]
                                for bus in subsystem_buses:
                                    if class_name == "Loads":
                                        ierr = self.psse.inilod(int(bus))

                                        ierr, ld_id = self.psse.nxtlod(int(bus))

                                        if ld_id is not None:
                                            ierr, con_index = getattr(self.psse, func_name)(
                                                int(bus), ld_id, "CHARAC", "CON"
                                            )

                                            if con_index is not None:
                                                act_con_index = con_index + con_ind
                                                ierr, value = self.psse.dsrval("CON", act_con_index)

                                                res_base = f"{class_name}_{v}"
                                                if res_base not in results:
                                                    results[res_base] = {}
                                                obj_name = f"{bus}_{ld_id}"
                                                results[res_base][obj_name] = value
            else:
                logger.warning("Extend function 'read_subsystems' in the Dynamic class (Dynamic.py)")
        return results

get_step_size_cec()

Returns simulation timestep resolution

Source code in pypsse\modes\dynamic.py
163
164
165
def get_step_size_cec(self):
    "Returns simulation timestep resolution"
    return self.settings.simulation.simulation_step_resolution.total_seconds()

get_time()

Returns current simulator time

Source code in pypsse\modes\dynamic.py
153
154
155
156
def get_time(self):
    "Returns current simulator time"

    return self.time

get_total_seconds()

Returns total simulation time

Source code in pypsse\modes\dynamic.py
158
159
160
161
def get_total_seconds(self):
    "Returns total simulation time"

    return (self.time - self._StartTime).total_seconds()

init(bus_subsystems)

Initializes the simulation

Source code in pypsse\modes\dynamic.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def init(self, bus_subsystems):
    "Initializes the simulation"
    super().init(bus_subsystems)
    self.iter_const = 100.0

    if self.settings.simulation.rwm_file:
        self.psse.mcre([1, 0], self.rwn_file)

    self.psse.fnsl([0, 0, 0, 1, 0, 0, 0, self._i])

    self.load_setup_files()
    self.convert_load()

    self.psse.gnet(1, 0)
    self.psse.fdns([1, 1, 0, 1, 1, 0, 0, 0])
    self.psse.fnsl([1, 1, 0, 1, 1, 0, 0, 0])
    self.psse.cong(0)
    # Solve for dynamics
    self.psse.ordr(0)
    self.psse.fact()
    self.psse.tysl(0)
    self.psse.tysl(0)
    # self.psse.save(self.study_case_path.split('.')[0] + ".sav")
    dyr_path = self.settings.simulation.dyr_file
    assert dyr_path and dyr_path.exists
    logger.debug(f"Loading dynamic model....{dyr_path}")
    self.psse.dynamicsmode(1)
    ierr = self.psse.dyre_new([1, 1, 1, 1], str(dyr_path), r"""conec""", r"""conet""", r"""compile""")

    if self.settings.helics and self.settings.helics.cosimulation_mode:
        if self.settings.helics.iterative_mode:
            sim_step = self.settings.simulation.psse_solver_timestep.total_seconds() / self.iter_const
        else:
            sim_step = self.settings.simulation.psse_solver_timestep.total_seconds()
    else:
        sim_step = self.settings.simulation.psse_solver_timestep.total_seconds()

    ierr = self.psse.dynamics_solution_param_2(
        [60, self._i, self._i, self._i, self._i, self._i, self._i, self._i],
        [0.4, self._f, sim_step, self._f, self._f, self._f, self._f, self._f],
    )

    if ierr:
        msg = f'Error loading dynamic model file "{dyr_path}". Error code - {ierr}'
        raise Exception(msg)
    else:
        logger.debug(f"Dynamic file {dyr_path} sucessfully loaded")

    self.disable_load_models_for_coupled_buses()

    if self.export_settings.export_results_using_channels:
        self.setup_channels()

    self.psse.delete_all_plot_channels()

    self.setup_all_channels()

    # Load user defined models
    self.load_user_defined_models()

    # Load flow settings
    self.psse.fdns([0, 0, 0, 1, 1, 0, 99, 0])
    # initialize
    outx_file  = str(self.settings.export.outx_file).split("\\")
    outx_file[-1] = self.export_settings.filename_prefix + "_" + outx_file[-1]
    outx_file = "\\".join(outx_file)
    ierr = self.psse.strt_2(
        [
            1,
            self.settings.generators.missing_machine_model,
        ],
        outx_file,
    )
    if ierr:
        self.initialization_complete = False
        msg = f"Dynamic simulation failed to successfully initialize. Error code - {ierr}"
        raise Exception(msg)
    else:
        self.initialization_complete = True
        logger.debug("Dynamic simulation initialization sucess!")
    # get load info for the sub system
    # self.load_info = self.get_load_indices(bus_subsystems)

    logger.debug("pyPSSE initialization complete!")

    self.xTime = 0

    return self.initialization_complete

read_subsystems(quantities, subsystem_buses, ext_string2_info=None, mapping_dict=None)

Queries the result container for current results

Source code in pypsse\modes\dynamic.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
@converter
def read_subsystems(self, quantities, subsystem_buses, ext_string2_info=None, mapping_dict=None):
    "Queries the result container for current results"

    if ext_string2_info is None:
        ext_string2_info = {}
    if mapping_dict is None:
        mapping_dict = {}
    results = super().read_subsystems(
        quantities, subsystem_buses, mapping_dict=mapping_dict, ext_string2_info=ext_string2_info
    )

    poll_results = self.poll_channels()
    results.update(poll_results)
    for class_name, var_list in quantities.items():
        if class_name in dyn_only_options:
            for v in var_list:
                if v in DYNAMIC_ONLY_PPTY[class_name]:
                    for func_name in dyn_only_options[class_name]:
                        if v in dyn_only_options[class_name][func_name]:
                            con_ind = dyn_only_options[class_name][func_name][v]
                            for bus in subsystem_buses:
                                if class_name == "Loads":
                                    ierr = self.psse.inilod(int(bus))

                                    ierr, ld_id = self.psse.nxtlod(int(bus))

                                    if ld_id is not None:
                                        ierr, con_index = getattr(self.psse, func_name)(
                                            int(bus), ld_id, "CHARAC", "CON"
                                        )

                                        if con_index is not None:
                                            act_con_index = con_index + con_ind
                                            ierr, value = self.psse.dsrval("CON", act_con_index)

                                            res_base = f"{class_name}_{v}"
                                            if res_base not in results:
                                                results[res_base] = {}
                                            obj_name = f"{bus}_{ld_id}"
                                            results[res_base][obj_name] = value
        else:
            logger.warning("Extend function 'read_subsystems' in the Dynamic class (Dynamic.py)")
    return results

resolve_step(t)

Resolves the current time step

Source code in pypsse\modes\dynamic.py
146
147
148
149
150
151
def resolve_step(self, t):
    "Resolves the current time step"

    err = self.psse.run(0, t + self.xTime * self.incTime / self.iter_const, 1, 1, 1)
    self.xTime += 1
    return err

step(t)

Increments the simulation

Source code in pypsse\modes\dynamic.py
116
117
118
119
120
121
def step(self, t):
    "Increments the simulation"

    self.time = self.time + self.incTime
    self.xTime = 0
    return self.psse.run(0, t, 1, 1, 1)

Snap

Bases: AbstractMode, DynamicUtils

Class defination for snapshat simulation mode (uses snp and sav files)

Source code in pypsse\modes\snap.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
class Snap(AbstractMode, DynamicUtils):
    "Class defination for snapshat simulation mode (uses snp and sav files)"

    def __init__(
        self,
        psse,
        dyntools,
        settings: SimulationSettings,
        export_settings: ExportFileOptions,
        subsystem_buses,
        raw_data,
    ):
        super().__init__(psse, dyntools, settings, export_settings, subsystem_buses, raw_data)
        self.time = settings.simulation.start_time
        self._StartTime = settings.simulation.start_time
        self.incTime = settings.simulation.simulation_step_resolution
        self.init(subsystem_buses)

    def init(self, bus_subsystems):
        "Initializes the simulation"
        super().init(bus_subsystems)

        self.iter_const = 100.0
        self.xTime = 0

        ierr = self.psse.case(str(self.settings.simulation.case_study))

        self.load_setup_files()
        self.convert_load()

        logger.info(f"Load snap file: {self.settings.simulation.snp_file}")
        ierr = self.psse.rstr(str(self.settings.simulation.snp_file))
        #

        # The following logic only runs when the helics interface is enabled
        self.disable_load_models_for_coupled_buses()
        self.disable_generation_for_coupled_buses()
        # self.save_model()
        ############# ------------------------------------- ###############
        outx_file  = str(self.settings.export.outx_file).split("\\")
        outx_file[-1] = self.export_settings.filename_prefix + "_" + outx_file[-1]
        outx_file = "\\".join(outx_file)

        ierr = self.psse.strt_2([0, 1],  outx_file)

        if ierr == 1:
            self.psse.cong(0)
            ierr = self.psse.strt_2([0, 1],  outx_file)

        elif ierr > 1:
            msg = "Error starting simulation"
            raise Exception(msg)

        self.load_user_defined_models()

        if self.settings.helics and self.settings.helics.cosimulation_mode:
            if self.settings.helics.iterative_mode:
                sim_step = self.settings.simulation.psse_solver_timestep.total_seconds() / self.iter_const
            else:
                sim_step = self.settings.simulation.psse_solver_timestep.total_seconds()
        else:
            sim_step = self.settings.simulation.psse_solver_timestep.total_seconds()

        self.psse.dynamics_solution_param_2(
            [60, self._i, self._i, self._i, self._i, self._i, self._i, self._i],
            [0.4, self._f, sim_step, self._f, self._f, self._f, self._f, self._f],
        )

        self.psse.delete_all_plot_channels()

        self.setup_all_channels()

        logger.debug("pyPSSE initialization complete!")
        self.initialization_complete = True
        return self.initialization_complete

    def step(self, t):
        "Increments the simulation"
        self.time = self.time + self.incTime
        self.xTime = 0
        return self.psse.run(0, t, 1, 1, 1)

    def resolve_step(self, t):
        "Resolves the current time step"
        self.xTime += 1
        return self.psse.run(0, t + self.xTime * self.incTime / self.iter_const, 1, 1, 1)

    def get_load_indices(self, bus_subsystems):
        "Returns load indices"
        all_bus_ids = {}
        for bus_subsystem_id in bus_subsystems.keys():
            load_info = {}
            ierr, load_data = self.psse.aloadchar(bus_subsystem_id, 1, ["ID", "NAME", "EXNAME"])

            load_data = np.array(load_data)
            ierr, bus_data = self.psse.aloadint(bus_subsystem_id, 1, ["NUMBER"])

            bus_data = bus_data[0]
            for i, bus_id in enumerate(bus_data):
                load_info[bus_id] = {
                    "Load ID": load_data[0, i],
                    "Bus name": load_data[1, i],
                    "Bus name (ext)": load_data[2, i],
                }
            all_bus_ids[bus_subsystem_id] = load_info
        return all_bus_ids

    def get_time(self):
        "Returns current simulator time"
        return self.time

    def get_total_seconds(self):
        "Returns total simulation time"
        return (self.time - self._StartTime).total_seconds()

    def get_step_size_cec(self):
        "Returns simulation timestep resolution"
        return self.settings.simulation.simulation_step_resolution.total_seconds()

    @converter
    def read_subsystems(self, quantities, subsystem_buses, ext_string2_info=None, mapping_dict=None):
        "Queries the result container for current results"
        if ext_string2_info is None:
            ext_string2_info = {}
        if mapping_dict is None:
            mapping_dict = {}
        results = super().read_subsystems(
            quantities, subsystem_buses, mapping_dict=mapping_dict, ext_string2_info=ext_string2_info
        )

        poll_results = self.poll_channels()
        results.update(poll_results)
        """ Add """
        for class_name, var_list in quantities.items():
            if class_name in dyn_only_options:
                for v in var_list:
                    if v in DYNAMIC_ONLY_PPTY[class_name]:
                        for func_name in dyn_only_options[class_name]:
                            if v in dyn_only_options[class_name][func_name]:
                                con_ind = dyn_only_options[class_name][func_name][v]
                                for bus in subsystem_buses:
                                    if class_name == "Loads":
                                        ierr = self.psse.inilod(int(bus))

                                        ierr, ld_id = self.psse.nxtlod(int(bus))

                                        if ld_id is not None:
                                            ierr, con_index = getattr(self.psse, func_name)(
                                                int(bus), ld_id, "CHARAC", "CON"
                                            )

                                            if con_index is not None:
                                                act_con_index = con_index + con_ind
                                                ierr, value = self.psse.dsrval("CON", act_con_index)

                                                res_base = f"{class_name}_{v}"
                                                if res_base not in results:
                                                    results[res_base] = {}
                                                obj_name = f"{bus}_{ld_id}"
                                                results[res_base][obj_name] = value
            else:
                logger.warning("Extend function 'read_subsystems' in the Snap class (Snap.py)")

        return results

get_load_indices(bus_subsystems)

Returns load indices

Source code in pypsse\modes\snap.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def get_load_indices(self, bus_subsystems):
    "Returns load indices"
    all_bus_ids = {}
    for bus_subsystem_id in bus_subsystems.keys():
        load_info = {}
        ierr, load_data = self.psse.aloadchar(bus_subsystem_id, 1, ["ID", "NAME", "EXNAME"])

        load_data = np.array(load_data)
        ierr, bus_data = self.psse.aloadint(bus_subsystem_id, 1, ["NUMBER"])

        bus_data = bus_data[0]
        for i, bus_id in enumerate(bus_data):
            load_info[bus_id] = {
                "Load ID": load_data[0, i],
                "Bus name": load_data[1, i],
                "Bus name (ext)": load_data[2, i],
            }
        all_bus_ids[bus_subsystem_id] = load_info
    return all_bus_ids

get_step_size_cec()

Returns simulation timestep resolution

Source code in pypsse\modes\snap.py
125
126
127
def get_step_size_cec(self):
    "Returns simulation timestep resolution"
    return self.settings.simulation.simulation_step_resolution.total_seconds()

get_time()

Returns current simulator time

Source code in pypsse\modes\snap.py
117
118
119
def get_time(self):
    "Returns current simulator time"
    return self.time

get_total_seconds()

Returns total simulation time

Source code in pypsse\modes\snap.py
121
122
123
def get_total_seconds(self):
    "Returns total simulation time"
    return (self.time - self._StartTime).total_seconds()

init(bus_subsystems)

Initializes the simulation

Source code in pypsse\modes\snap.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def init(self, bus_subsystems):
    "Initializes the simulation"
    super().init(bus_subsystems)

    self.iter_const = 100.0
    self.xTime = 0

    ierr = self.psse.case(str(self.settings.simulation.case_study))

    self.load_setup_files()
    self.convert_load()

    logger.info(f"Load snap file: {self.settings.simulation.snp_file}")
    ierr = self.psse.rstr(str(self.settings.simulation.snp_file))
    #

    # The following logic only runs when the helics interface is enabled
    self.disable_load_models_for_coupled_buses()
    self.disable_generation_for_coupled_buses()
    # self.save_model()
    ############# ------------------------------------- ###############
    outx_file  = str(self.settings.export.outx_file).split("\\")
    outx_file[-1] = self.export_settings.filename_prefix + "_" + outx_file[-1]
    outx_file = "\\".join(outx_file)

    ierr = self.psse.strt_2([0, 1],  outx_file)

    if ierr == 1:
        self.psse.cong(0)
        ierr = self.psse.strt_2([0, 1],  outx_file)

    elif ierr > 1:
        msg = "Error starting simulation"
        raise Exception(msg)

    self.load_user_defined_models()

    if self.settings.helics and self.settings.helics.cosimulation_mode:
        if self.settings.helics.iterative_mode:
            sim_step = self.settings.simulation.psse_solver_timestep.total_seconds() / self.iter_const
        else:
            sim_step = self.settings.simulation.psse_solver_timestep.total_seconds()
    else:
        sim_step = self.settings.simulation.psse_solver_timestep.total_seconds()

    self.psse.dynamics_solution_param_2(
        [60, self._i, self._i, self._i, self._i, self._i, self._i, self._i],
        [0.4, self._f, sim_step, self._f, self._f, self._f, self._f, self._f],
    )

    self.psse.delete_all_plot_channels()

    self.setup_all_channels()

    logger.debug("pyPSSE initialization complete!")
    self.initialization_complete = True
    return self.initialization_complete

read_subsystems(quantities, subsystem_buses, ext_string2_info=None, mapping_dict=None)

Queries the result container for current results

Source code in pypsse\modes\snap.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
@converter
def read_subsystems(self, quantities, subsystem_buses, ext_string2_info=None, mapping_dict=None):
    "Queries the result container for current results"
    if ext_string2_info is None:
        ext_string2_info = {}
    if mapping_dict is None:
        mapping_dict = {}
    results = super().read_subsystems(
        quantities, subsystem_buses, mapping_dict=mapping_dict, ext_string2_info=ext_string2_info
    )

    poll_results = self.poll_channels()
    results.update(poll_results)
    """ Add """
    for class_name, var_list in quantities.items():
        if class_name in dyn_only_options:
            for v in var_list:
                if v in DYNAMIC_ONLY_PPTY[class_name]:
                    for func_name in dyn_only_options[class_name]:
                        if v in dyn_only_options[class_name][func_name]:
                            con_ind = dyn_only_options[class_name][func_name][v]
                            for bus in subsystem_buses:
                                if class_name == "Loads":
                                    ierr = self.psse.inilod(int(bus))

                                    ierr, ld_id = self.psse.nxtlod(int(bus))

                                    if ld_id is not None:
                                        ierr, con_index = getattr(self.psse, func_name)(
                                            int(bus), ld_id, "CHARAC", "CON"
                                        )

                                        if con_index is not None:
                                            act_con_index = con_index + con_ind
                                            ierr, value = self.psse.dsrval("CON", act_con_index)

                                            res_base = f"{class_name}_{v}"
                                            if res_base not in results:
                                                results[res_base] = {}
                                            obj_name = f"{bus}_{ld_id}"
                                            results[res_base][obj_name] = value
        else:
            logger.warning("Extend function 'read_subsystems' in the Snap class (Snap.py)")

    return results

resolve_step(t)

Resolves the current time step

Source code in pypsse\modes\snap.py
92
93
94
95
def resolve_step(self, t):
    "Resolves the current time step"
    self.xTime += 1
    return self.psse.run(0, t + self.xTime * self.incTime / self.iter_const, 1, 1, 1)

step(t)

Increments the simulation

Source code in pypsse\modes\snap.py
86
87
88
89
90
def step(self, t):
    "Increments the simulation"
    self.time = self.time + self.incTime
    self.xTime = 0
    return self.psse.run(0, t, 1, 1, 1)

Static

Bases: AbstractMode

Source code in pypsse\modes\static.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
class Static(AbstractMode):
    def __init__(self, psse, dyntools, settings, export_settings, subsystem_buses, raw_data):
        "Class defination for steady-state simulation mode"

        super().__init__(psse, dyntools, settings, export_settings, subsystem_buses, raw_data)
        self.time = settings.simulation.start_time
        self._StartTime = settings.simulation.start_time
        self.incTime = settings.simulation.simulation_step_resolution

    def init(self, bussubsystems):
        super().init(bussubsystems)
        self.initialization_complete = True

    def step(self, _):
        "Increments the simulation"
        ierr = self.psse.fnsl()
        # check if powerflow completed successfully
        if ierr == 0:
            self.time = self.time + self.incTime
        else:
            msg = f"Error code {ierr} returned from PSSE while running powerflow, please follow \
                            PSSE doumentation to know more about error"
            raise Exception(msg)

    def resolve_step(self):
        "Resolves the current time step"
        ierr = self.psse.fnsl()
        if ierr > 0:
            msg = f"Error code {ierr} returned from PSSE while running powerflow, please follow \
                                        PSSE doumentation to know more about error"
            raise Exception(msg)

    def get_time(self):
        "Returns current simulator time"
        return self.time

    def get_total_seconds(self):
        "Returns total simulation time"
        return (self.time - self._StartTime).total_seconds()

    def get_step_size_cec(self):
        "Returns simulation timestep resolution"
        return self.settings.simulation.simulation_step_resolution.total_seconds()

    def export(self):
        "Exports simulation results"
        logger.debug("Starting export process. Can take a few minutes for large files")
        excelpath = os.path.join(self.export_path, self.settings["Excel file"])
        achnf = self.dyntools.CHNF(self.outx_path)
        achnf.xlsout(channels="", show=False, xlsfile=excelpath, outfile="", sheet="Sheet1", overwritesheet=True)
        logger.debug(f"{self.settings.export.excel_file} exported")

__init__(psse, dyntools, settings, export_settings, subsystem_buses, raw_data)

Class defination for steady-state simulation mode

Source code in pypsse\modes\static.py
11
12
13
14
15
16
17
def __init__(self, psse, dyntools, settings, export_settings, subsystem_buses, raw_data):
    "Class defination for steady-state simulation mode"

    super().__init__(psse, dyntools, settings, export_settings, subsystem_buses, raw_data)
    self.time = settings.simulation.start_time
    self._StartTime = settings.simulation.start_time
    self.incTime = settings.simulation.simulation_step_resolution

export()

Exports simulation results

Source code in pypsse\modes\static.py
54
55
56
57
58
59
60
def export(self):
    "Exports simulation results"
    logger.debug("Starting export process. Can take a few minutes for large files")
    excelpath = os.path.join(self.export_path, self.settings["Excel file"])
    achnf = self.dyntools.CHNF(self.outx_path)
    achnf.xlsout(channels="", show=False, xlsfile=excelpath, outfile="", sheet="Sheet1", overwritesheet=True)
    logger.debug(f"{self.settings.export.excel_file} exported")

get_step_size_cec()

Returns simulation timestep resolution

Source code in pypsse\modes\static.py
50
51
52
def get_step_size_cec(self):
    "Returns simulation timestep resolution"
    return self.settings.simulation.simulation_step_resolution.total_seconds()

get_time()

Returns current simulator time

Source code in pypsse\modes\static.py
42
43
44
def get_time(self):
    "Returns current simulator time"
    return self.time

get_total_seconds()

Returns total simulation time

Source code in pypsse\modes\static.py
46
47
48
def get_total_seconds(self):
    "Returns total simulation time"
    return (self.time - self._StartTime).total_seconds()

resolve_step()

Resolves the current time step

Source code in pypsse\modes\static.py
34
35
36
37
38
39
40
def resolve_step(self):
    "Resolves the current time step"
    ierr = self.psse.fnsl()
    if ierr > 0:
        msg = f"Error code {ierr} returned from PSSE while running powerflow, please follow \
                                    PSSE doumentation to know more about error"
        raise Exception(msg)

step(_)

Increments the simulation

Source code in pypsse\modes\static.py
23
24
25
26
27
28
29
30
31
32
def step(self, _):
    "Increments the simulation"
    ierr = self.psse.fnsl()
    # check if powerflow completed successfully
    if ierr == 0:
        self.time = self.time + self.incTime
    else:
        msg = f"Error code {ierr} returned from PSSE while running powerflow, please follow \
                        PSSE doumentation to know more about error"
        raise Exception(msg)

sim_controller(psse, dyntools, settings, export_settings, subsystem_buses, raw_data)

sets up an appropriate simualtion controller based on user input

Parameters:

Name Type Description Default
psse object

simulator instance

required
dyntools object

psse dyntools instance

required
settings SimulationSettings

simulation settings

required
export_settings ExportFileOptions

export settings

required
subsystem_buses dict

mapping of bus subsystems to buses

required
raw_data Reader

instance of model reader

required

Returns:

Type Description
Union[Dynamic, ProductionCostModel, Snap, Static]

Union[Dynamic, ProductionCostModel, Snap, Static]: simulator controller instance

Source code in pypsse\simulation_controller.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def sim_controller(
    psse: object,
    dyntools: object,
    settings: SimulationSettings,
    export_settings: ExportFileOptions,
    subsystem_buses: dict,
    raw_data: Reader,
) -> Union[Dynamic, ProductionCostModel, Snap, Static]:
    """sets up an appropriate simualtion controller based on user input

    Args:
        psse (object): simulator instance
        dyntools (object): psse dyntools instance
        settings (SimulationSettings): simulation settings
        export_settings (ExportFileOptions): export settings
        subsystem_buses (dict): mapping of bus subsystems to buses
        raw_data (Reader): instance of model reader

    Returns:
        Union[Dynamic, ProductionCostModel, Snap, Static]: simulator controller instance
    """

    sim_modes = {"Dynamic": Dynamic, "Steady-state": Static, "Snap": Snap, "ProductionCostModel": ProductionCostModel}

    sim = sim_modes[settings.simulation.simulation_mode.value](
        psse, dyntools, settings, export_settings, subsystem_buses, raw_data
    )
    logger.debug(f"Simulator contoller of type {settings.simulation.simulation_mode.value} created")
    return sim

Model parsers

GICParser

parser for the psse GIC file

Source code in pypsse\parsers\gic_parser.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
class GICParser:
    """parser for the psse GIC file"""

    valid_verions = ["3"]

    def __init__(self, settings: SimulationSettings):
        """create GIC parser object

        Args:
            settings (SimulationSettings): simulation settings
        """
        logger.debug("Starting RAW parser")

        self.settings = settings
        self.filepath = str(settings.simulation.gic_file)

        self.filehandle = open(self.filepath)
        verion = self.filehandle.readline()
        if "GICFILEVRSN=" in verion:
            verion = verion.replace("GICFILEVRSN=", "").replace("\r", "").replace("\n", "")
            if verion in self.valid_verions:
                logger.debug(f"Reading GIC file verion {verion}")
            else:
                vers = ",".join(self.valid_verions)
                logger.debug(f"Version {verion} is not supported.\nFollowing version are currently supported: {vers}")
        else:
            logger.debug("GIC file structue does not seem to be valid")

        self.get_bus_coordinates()
        self.psse_graph = nx.Graph()
        self.create_graph()
        pos = {}
        for node in self.psse_graph.nodes:
            pos[node] = [
                float(self.psse_graph.nodes[node]["latitude"]),
                float(self.psse_graph.nodes[node]["longitude"]),
            ]
        export_path = os.path.join(
            self.settings["Simulation"]["Project Path"],
            "Exports",
            self.settings["Export_settings"]["NetworkX graph file"],
        )
        nx.write_gpickle(self.psse_graph, export_path)
        # nx.draw(self.psse_graph ,pos)
        # plt.show()

    def create_graph(self):
        """creates graph representation"""

        self.parse_substation_data()
        self.parse_transformer_data()
        self.parse_branch_data()
        nx.set_node_attributes(self.psse_graph, self.bus_data)

    def parse_substation_data(self):
        """parses substation data"""

        logger.debug("Parsing substation data...")
        linedata = ""
        while True:
            linedata = self.filehandle.readline()
            if "End of Bus Substation Data" in linedata:
                break
            if self.settings["GIC_export_settings"]["include substation connections"]:
                buses = linedata.replace("\r", "").replace("\n", "")
                buses = buses.split(" ")
                if buses[0] in self.bus_data and buses[1] in self.bus_data:
                    self.psse_graph.add_edge(buses[0], buses[1])
                else:
                    logger.debug(
                        f"Error parsing substation data egde: {buses}.\nOne of the bus id does not exist in bus data"
                    )

    def parse_transformer_data(self):
        """parses transformer data"""

        logger.debug("Parsing transformer data...")
        linedata = ""
        while True:
            linedata = self.filehandle.readline()
            if "End of Transformer Data" in linedata:
                break

            if self.settings["GIC_export_settings"]["include transfomer connections"]:
                buses = linedata.replace("\r", "").replace("\n", "")
                buses = buses.split(" ")[:3]
                if buses[2] == "":
                    if buses[0] in self.bus_data and buses[1] in self.bus_data:
                        self.psse_graph.add_edge(buses[0], buses[1])
                    else:
                        logger.debug(
                            f"Error parsing transformer data egde: {buses}."
                            f"\nOne of the bus id does not exist in bus data"
                        )
                else:
                    if buses[0] in self.bus_data and buses[1] in self.bus_data:
                        self.psse_graph.add_edge(buses[0], buses[1])
                    if buses[2] in self.bus_data and buses[1] in self.bus_data:
                        self.psse_graph.add_edge(buses[1], buses[2])
                    if buses[2] in self.bus_data and buses[0] in self.bus_data:
                        self.psse_graph.add_edge(buses[2], buses[0])
                    pass

    def parse_branch_data(self):
        """parses branch data"""

        logger.debug("Parsing branch data...")
        linedata = ""
        while True:
            linedata = self.filehandle.readline()
            if "End of Branch Data" in linedata:
                break
            if self.settings["GIC_export_settings"]["include branch connections"]:
                buses = linedata.replace("\r", "").replace("\n", "")
                buses = buses.split(" ")[:2]
                if buses[0] in self.bus_data and buses[1] in self.bus_data:
                    self.psse_graph.add_edge(buses[0], buses[1])
                else:
                    logger.debug(
                        f"Error parsing branch data egde: {buses}.\nOne of the bus id does not exist in bus data"
                    )

    def get_bus_coordinates(self):
        """parses bus coordinates"""

        logger.debug("Parsing bus coordinates...")
        bus_data_headers = ["subsystem/bustype?", "latitude", "longitude", "angle?"]
        self.bus_data = {}
        linedata = ""
        start = "'"
        end = "'"
        while True:
            linedata = self.filehandle.readline()
            if "End of Substation data" in linedata:
                break

            bus_name = linedata[linedata.find(start) + len(start) : linedata.rfind(end)]
            data = linedata.replace(f" {start}{bus_name}{end}", "")
            data = data.replace("  ", " ")
            data = data.replace("  ", " ")
            data = data.split(" ")
            bus_id = data[0]

            if bus_id not in self.bus_data:
                self.bus_data[bus_id] = {}

            self.bus_data[bus_id]["bus_name"] = bus_name
            for val, label in zip(data[1:], bus_data_headers):
                self.bus_data[bus_id][label] = val

        bus_data = pd.DataFrame(self.bus_data).T
        export_path = os.path.join(
            self.settings["Simulation"]["Project Path"], "Exports", self.settings["Export_settings"]["Coordinate file"]
        )
        bus_data.to_csv(export_path)
        logger.debug(f"Bus coordinate file exported to: {export_path}")

__init__(settings)

create GIC parser object

Parameters:

Name Type Description Default
settings SimulationSettings

simulation settings

required
Source code in pypsse\parsers\gic_parser.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(self, settings: SimulationSettings):
    """create GIC parser object

    Args:
        settings (SimulationSettings): simulation settings
    """
    logger.debug("Starting RAW parser")

    self.settings = settings
    self.filepath = str(settings.simulation.gic_file)

    self.filehandle = open(self.filepath)
    verion = self.filehandle.readline()
    if "GICFILEVRSN=" in verion:
        verion = verion.replace("GICFILEVRSN=", "").replace("\r", "").replace("\n", "")
        if verion in self.valid_verions:
            logger.debug(f"Reading GIC file verion {verion}")
        else:
            vers = ",".join(self.valid_verions)
            logger.debug(f"Version {verion} is not supported.\nFollowing version are currently supported: {vers}")
    else:
        logger.debug("GIC file structue does not seem to be valid")

    self.get_bus_coordinates()
    self.psse_graph = nx.Graph()
    self.create_graph()
    pos = {}
    for node in self.psse_graph.nodes:
        pos[node] = [
            float(self.psse_graph.nodes[node]["latitude"]),
            float(self.psse_graph.nodes[node]["longitude"]),
        ]
    export_path = os.path.join(
        self.settings["Simulation"]["Project Path"],
        "Exports",
        self.settings["Export_settings"]["NetworkX graph file"],
    )
    nx.write_gpickle(self.psse_graph, export_path)

create_graph()

creates graph representation

Source code in pypsse\parsers\gic_parser.py
56
57
58
59
60
61
62
def create_graph(self):
    """creates graph representation"""

    self.parse_substation_data()
    self.parse_transformer_data()
    self.parse_branch_data()
    nx.set_node_attributes(self.psse_graph, self.bus_data)

get_bus_coordinates()

parses bus coordinates

Source code in pypsse\parsers\gic_parser.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def get_bus_coordinates(self):
    """parses bus coordinates"""

    logger.debug("Parsing bus coordinates...")
    bus_data_headers = ["subsystem/bustype?", "latitude", "longitude", "angle?"]
    self.bus_data = {}
    linedata = ""
    start = "'"
    end = "'"
    while True:
        linedata = self.filehandle.readline()
        if "End of Substation data" in linedata:
            break

        bus_name = linedata[linedata.find(start) + len(start) : linedata.rfind(end)]
        data = linedata.replace(f" {start}{bus_name}{end}", "")
        data = data.replace("  ", " ")
        data = data.replace("  ", " ")
        data = data.split(" ")
        bus_id = data[0]

        if bus_id not in self.bus_data:
            self.bus_data[bus_id] = {}

        self.bus_data[bus_id]["bus_name"] = bus_name
        for val, label in zip(data[1:], bus_data_headers):
            self.bus_data[bus_id][label] = val

    bus_data = pd.DataFrame(self.bus_data).T
    export_path = os.path.join(
        self.settings["Simulation"]["Project Path"], "Exports", self.settings["Export_settings"]["Coordinate file"]
    )
    bus_data.to_csv(export_path)
    logger.debug(f"Bus coordinate file exported to: {export_path}")

parse_branch_data()

parses branch data

Source code in pypsse\parsers\gic_parser.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def parse_branch_data(self):
    """parses branch data"""

    logger.debug("Parsing branch data...")
    linedata = ""
    while True:
        linedata = self.filehandle.readline()
        if "End of Branch Data" in linedata:
            break
        if self.settings["GIC_export_settings"]["include branch connections"]:
            buses = linedata.replace("\r", "").replace("\n", "")
            buses = buses.split(" ")[:2]
            if buses[0] in self.bus_data and buses[1] in self.bus_data:
                self.psse_graph.add_edge(buses[0], buses[1])
            else:
                logger.debug(
                    f"Error parsing branch data egde: {buses}.\nOne of the bus id does not exist in bus data"
                )

parse_substation_data()

parses substation data

Source code in pypsse\parsers\gic_parser.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
def parse_substation_data(self):
    """parses substation data"""

    logger.debug("Parsing substation data...")
    linedata = ""
    while True:
        linedata = self.filehandle.readline()
        if "End of Bus Substation Data" in linedata:
            break
        if self.settings["GIC_export_settings"]["include substation connections"]:
            buses = linedata.replace("\r", "").replace("\n", "")
            buses = buses.split(" ")
            if buses[0] in self.bus_data and buses[1] in self.bus_data:
                self.psse_graph.add_edge(buses[0], buses[1])
            else:
                logger.debug(
                    f"Error parsing substation data egde: {buses}.\nOne of the bus id does not exist in bus data"
                )

parse_transformer_data()

parses transformer data

Source code in pypsse\parsers\gic_parser.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def parse_transformer_data(self):
    """parses transformer data"""

    logger.debug("Parsing transformer data...")
    linedata = ""
    while True:
        linedata = self.filehandle.readline()
        if "End of Transformer Data" in linedata:
            break

        if self.settings["GIC_export_settings"]["include transfomer connections"]:
            buses = linedata.replace("\r", "").replace("\n", "")
            buses = buses.split(" ")[:3]
            if buses[2] == "":
                if buses[0] in self.bus_data and buses[1] in self.bus_data:
                    self.psse_graph.add_edge(buses[0], buses[1])
                else:
                    logger.debug(
                        f"Error parsing transformer data egde: {buses}."
                        f"\nOne of the bus id does not exist in bus data"
                    )
            else:
                if buses[0] in self.bus_data and buses[1] in self.bus_data:
                    self.psse_graph.add_edge(buses[0], buses[1])
                if buses[2] in self.bus_data and buses[1] in self.bus_data:
                    self.psse_graph.add_edge(buses[1], buses[2])
                if buses[2] in self.bus_data and buses[0] in self.bus_data:
                    self.psse_graph.add_edge(buses[2], buses[0])
                pass

Reader

Parser for indexing all PSSE model assets

Source code in pypsse\parsers\reader.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
class Reader:
    "Parser for indexing all PSSE model assets"

    def __init__(self, psse_instance: object):
        """creates pypsse model reader

        Args:
            psse_instance (object): simulator instance
        """
        self.psse = psse_instance
        self.buses = self.get_data("abus", tails=["int"], strings=["NUMBER"], flags=[2])
        self.loads = self.get_data("aload", tails=["int", "char"], strings=["NUMBER", "ID"], flags=[4, 4])
        self.loads = self.get_data("aload", tails=["int", "char"], strings=["NUMBER", "ID"], flags=[4, 4])
        self.fixed_stunts = self.get_data("afxshunt", tails=["int", "char"], strings=["NUMBER", "ID"], flags=[4, 4])
        self.generators = self.get_data("amach", tails=["int", "char"], strings=["NUMBER", "ID"], flags=[4, 4])
        self.branches = self.get_data(
            "abrn", tails=["int", "int", "char"], strings=["FROMNUMBER", "TONUMBER", "ID"], flags=[2, 2, 2]
        )
        self.transformers = self.get_data(
            "atr3", tails=["int", "int", "int"], strings=["WIND1NUMBER", "WIND2NUMBER", "WIND3NUMBER"], flags=[2, 2, 2]
        )
        self.area = self.get_data(
            "aarea", tails=["int", "char"], strings=["NUMBER", "AREANAME"], flags=[2, 2]
        )  # Talk to Aadil
        self.dc_branch = self.get_data(
            "a2trmdc", tails=["int", "int"], strings=["FROMNUMBER", "TONUMBER"], flags=[2, 2]
        )  # three terminal dc lines not implemented
        self.multi_term_dc = self.get_data(
            "amultitrmdc", tails=["int", "int"], strings=["VCNPOSNUMBER", "VCNNEGNUMBER"], flags=[2, 2]
        )
        self.switched_shunt = self.get_data(
            "aswsh", tails=["int", "char"], strings=["NUMBER", "DEVICENAME"], flags=[4, 4]
        )
        self.zones = self.get_data("azone", tails=["int", "char"], strings=["NUMBER", "ZONENAME"], flags=[2, 2])
        self.owners = self.get_data("aowner", tails=["int", "char"], strings=["NUMBER", "OWNERNAME"], flags=[2, 2])

    def get_data(self, func_name: str, tails: list = [], strings: list = [], flags: List[int] = []) -> list:
        """returns list of assets matching signature

        Args:
            func_name (str): _description_
            tails (list, optional): method tail. Defaults to [].
            strings (list, optional): data types. Defaults to [].
            flags (List[int], optional): list of flags for filtering. Defaults to [].

        Returns:
            list: list of asset names
        """

        array_list = []
        for tail, string, flag in zip(tails, strings, flags):
            func = getattr(self.psse, func_name.lower() + tail)
            ierr, array_1 = func(sid=-1, flag=flag, string=string)
            assert ierr == 0, f"Error code {ierr}, while running function '{func_name.lower() + tail}'"
            array_list.append([x for array in array_1 for x in array])

        logger.info(f"{func_name} count - {len(array_1)}")
        if len(array_list) == 1:
            return array_list[0]

        return list(zip(*array_list))

    def __str__(self) -> str:
        """overrides default 'print' behavior

        Returns:
            str: summary of model assets
        """
        str_name = "Model asset summary:\n"
        for model in MAPPED_CLASS_NAMES:
            str_name += f"   {model}-{len(getattr(self, model))}"
        return str_name

__init__(psse_instance)

creates pypsse model reader

Parameters:

Name Type Description Default
psse_instance object

simulator instance

required
Source code in pypsse\parsers\reader.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(self, psse_instance: object):
    """creates pypsse model reader

    Args:
        psse_instance (object): simulator instance
    """
    self.psse = psse_instance
    self.buses = self.get_data("abus", tails=["int"], strings=["NUMBER"], flags=[2])
    self.loads = self.get_data("aload", tails=["int", "char"], strings=["NUMBER", "ID"], flags=[4, 4])
    self.loads = self.get_data("aload", tails=["int", "char"], strings=["NUMBER", "ID"], flags=[4, 4])
    self.fixed_stunts = self.get_data("afxshunt", tails=["int", "char"], strings=["NUMBER", "ID"], flags=[4, 4])
    self.generators = self.get_data("amach", tails=["int", "char"], strings=["NUMBER", "ID"], flags=[4, 4])
    self.branches = self.get_data(
        "abrn", tails=["int", "int", "char"], strings=["FROMNUMBER", "TONUMBER", "ID"], flags=[2, 2, 2]
    )
    self.transformers = self.get_data(
        "atr3", tails=["int", "int", "int"], strings=["WIND1NUMBER", "WIND2NUMBER", "WIND3NUMBER"], flags=[2, 2, 2]
    )
    self.area = self.get_data(
        "aarea", tails=["int", "char"], strings=["NUMBER", "AREANAME"], flags=[2, 2]
    )  # Talk to Aadil
    self.dc_branch = self.get_data(
        "a2trmdc", tails=["int", "int"], strings=["FROMNUMBER", "TONUMBER"], flags=[2, 2]
    )  # three terminal dc lines not implemented
    self.multi_term_dc = self.get_data(
        "amultitrmdc", tails=["int", "int"], strings=["VCNPOSNUMBER", "VCNNEGNUMBER"], flags=[2, 2]
    )
    self.switched_shunt = self.get_data(
        "aswsh", tails=["int", "char"], strings=["NUMBER", "DEVICENAME"], flags=[4, 4]
    )
    self.zones = self.get_data("azone", tails=["int", "char"], strings=["NUMBER", "ZONENAME"], flags=[2, 2])
    self.owners = self.get_data("aowner", tails=["int", "char"], strings=["NUMBER", "OWNERNAME"], flags=[2, 2])

__str__()

overrides default 'print' behavior

Returns:

Name Type Description
str str

summary of model assets

Source code in pypsse\parsers\reader.py
70
71
72
73
74
75
76
77
78
79
def __str__(self) -> str:
    """overrides default 'print' behavior

    Returns:
        str: summary of model assets
    """
    str_name = "Model asset summary:\n"
    for model in MAPPED_CLASS_NAMES:
        str_name += f"   {model}-{len(getattr(self, model))}"
    return str_name

get_data(func_name, tails=[], strings=[], flags=[])

returns list of assets matching signature

Parameters:

Name Type Description Default
func_name str

description

required
tails list

method tail. Defaults to [].

[]
strings list

data types. Defaults to [].

[]
flags List[int]

list of flags for filtering. Defaults to [].

[]

Returns:

Name Type Description
list list

list of asset names

Source code in pypsse\parsers\reader.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
def get_data(self, func_name: str, tails: list = [], strings: list = [], flags: List[int] = []) -> list:
    """returns list of assets matching signature

    Args:
        func_name (str): _description_
        tails (list, optional): method tail. Defaults to [].
        strings (list, optional): data types. Defaults to [].
        flags (List[int], optional): list of flags for filtering. Defaults to [].

    Returns:
        list: list of asset names
    """

    array_list = []
    for tail, string, flag in zip(tails, strings, flags):
        func = getattr(self.psse, func_name.lower() + tail)
        ierr, array_1 = func(sid=-1, flag=flag, string=string)
        assert ierr == 0, f"Error code {ierr}, while running function '{func_name.lower() + tail}'"
        array_list.append([x for array in array_1 for x in array])

    logger.info(f"{func_name} count - {len(array_1)}")
    if len(array_list) == 1:
        return array_list[0]

    return list(zip(*array_list))

Command line interface

CLI to run a PyDSS project

create_profiles(project_path, csv_file_path, profile_folder, profile_name, profile_type, start_time, profile_res, profile_info)

Creates profiles for PyPSSE project.

Source code in pypsse\cli\create_profiles.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
@click.argument(
    "project-path",
)
@click.option(
    "-f",
    "--csv-file-path",
    help="Path to a csv valid file",
    required=False,
    default="",
)
@click.option(
    "-p",
    "--profile-folder",
    help="""Path to folder containing csv profiles.
    CSV file names should follow the following format: {profile-type}_{profile-name}""",
    required=False,
    default="",
)
@click.option(
    "-n",
    "--profile-name",
    required=False,
    default=DEFAULT_PROFILE_NAME,
    show_default=True,
    help="Profile name",
)
@click.option(
    "-t",
    "--profile-type",
    required=False,
    default=DEFAULT_PROFILE_TYPE,
    show_default=True,
    help=f"Profile type; Possible values: {list(PROFILE_VALIDATION.keys())}",
)
@click.option(
    "-T",
    "--start-time",
    required=False,
    default=DEFAULT_START_TIME,
    show_default=True,
    help="Time index for the first time step, format = Y-m-d H:M:S.f",
)
@click.option(
    "-r",
    "--profile-res",
    required=False,
    default=DEFAULT_PROFILE_RESOLUTION,
    show_default=True,
    help="Profile time resolution in seconds",
)
@click.option(
    "-i",
    "--profile-info",
    required=False,
    default="",
    show_default=True,
    help="Profile time resolution in seconds",
)
@click.command()
def create_profiles(
    project_path, csv_file_path, profile_folder, profile_name, profile_type, start_time, profile_res, profile_info
):
    """Creates profiles for PyPSSE project."""
    settings_file = os.path.join(project_path, "Settings", SIMULATION_SETTINGS_FILENAME)
    if os.path.exists(settings_file):
        if csv_file_path and os.path.exists(csv_file_path):
            settings = toml.load(settings_file)
            a = ProfileManager(None, settings)
            a.add_profiles_from_csv(
                csv_file=csv_file_path,
                name=profile_name,
                pType=profile_type,
                startTime=dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f").astimezone(None),
                resolution_sec=profile_res,
                info=profile_info,
            )
            logger.info(f"Profile '{profile_name}' added to group '{profile_type}'")
        elif os.path.exists(profile_folder):
            settings = toml.load(settings_file)
            a = ProfileManager(None, settings)
            for _, _, files in os.walk(profile_folder):
                for file in files:
                    if file.endswith(".csv"):
                        filename = file.replace(".csv", "")
                        if "__" in filename:
                            dtype, p_name = filename.split("__")
                            a.add_profiles_from_csv(
                                csv_file=os.path.join(profile_folder, file),
                                name=p_name,
                                pType=dtype,
                                startTime=dt.datetime.strptime(start_time, "%Y-%m-%d %H:%M:%S.%f").astimezone(None),
                                resolution_sec=profile_res,
                                info=profile_info,
                            )
                            msg = f"Profile '{p_name}'' added to group '{dtype}'"
                            logger.info(msg)
        else:
            msg = "Value for either -f or -p flag has to be passed"
            raise Exception(msg)
    else:
        msg = f"{project_path} is not a valid pypsse project"
        raise Exception(msg)

CLI to create a new PyPSSE project

create_project(path=None, project=None, psse_project_folder=None, simulation_file=None, export_settings_file=None, profile_store=None, profile_mapping=None, overwrite=None, autofill=None)

Create a new PyPSSE project.

Source code in pypsse\cli\create_project.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
@click.argument(
    "path",
)
@click.option(
    "-p",
    "--project",
    required=True,
    help="project name",
)
@click.option(
    "-F",
    "--psse-project-folder",
    default=None,
    required=False,
    type=click.Path(exists=True),
    help="PSS/E project folder path",
)
@click.option(
    "-f",
    "--simulation-file",
    required=False,
    show_default=True,
    default="",
    help="Simulation settings toml file path",
)
@click.option(
    "-e",
    "--export-settings-file",
    default="",
    help="Export settings toml file path",
)
@click.option(
    "-s",
    "--profile-store",
    default="",
    help="Path to a valid Profiles.hdf5 file (Contains profiles for time series simulations)",
)
@click.option(
    "-m",
    "--profile-mapping",
    default="",
    help="Path to a valid Profile_mapping.toml file (used to map profile to PSSE elements)",
)
@click.option(
    "-a",
    "--autofill",
    help="Attempt to auto fill settings. (Verify manually settings file is correct)",
    is_flag=True,
    default=True,
    show_default=True,
)
@click.option(
    "-o",
    "--overwrite",
    help="Overwrite project is it already exists",
    is_flag=True,
    default=True,
    show_default=True,
)
@click.command()
def create_project(
    path=None,
    project=None,
    psse_project_folder=None,
    simulation_file=None,
    export_settings_file=None,
    profile_store=None,
    profile_mapping=None,
    overwrite=None,
    autofill=None,
):
    """Create a new PyPSSE project."""
    if os.path.exists(path):
        s_settings = toml.load(simulation_file) if simulation_file else {}
        e_settings = toml.load(export_settings_file) if export_settings_file else {}
        # TODO: Validate settings
        a = Project()
        a.create(
            path,
            project,
            psse_project_folder,
            s_settings,
            e_settings,
            profile_store,
            profile_mapping,
            overwrite,
            autofill,
        )

CLI to run a PyDSS project

run(project_path, simulations_file=None)

Runs a valid PyPSSE simulation.

Source code in pypsse\cli\run.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@click.argument(
    "project-path",
)
@click.option(
    "-s",
    "--simulations-file",
    required=False,
    default=SIMULATION_SETTINGS_FILENAME,
    show_default=True,
    help="scenario toml file to run (over rides default)",
)
@click.command()
def run(project_path, simulations_file=None):
    """Runs a valid PyPSSE simulation."""
    file_path = Path(project_path) / simulations_file
    msg = "Simulation file not found. Use -s to choose a valid settings file"
    "if its name differs from the default file name."
    assert file_path.exists(), msg
    x = Simulator.from_setting_files(file_path)
    x.run()

CLI to run the PyDSS server

serve(host_ip='127.0.0.1', port=9090)

Run a PyPSSE RESTful API server.

Source code in pypsse\cli\serve.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@click.option(
    "-p",
    "--port",
    default=9090,
    show_default=True,
    help="Socket port for the server",
)
@click.option(
    "-h",
    "--host-ip",
    default="127.0.0.1",
    show_default=True,
    help="IP for the server",
)
@click.command()
def serve(host_ip="127.0.0.1", port=9090):
    """Run a PyPSSE RESTful API server."""
    run_server(host_ip, port)

Data interfaces

DataWriter

Data writer class defination

Source code in pypsse\data_writers\data_writer.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class DataWriter:
    "Data writer class defination"
    modes = {
        "h5": HDF5Writer,
        "csv": CSVWriter,
        "json": JSONWriter,
        "none": DummyWriter,
    }

    def __init__(self, log_dir, formatnm, column_length, filename_prefix):
        "Sets up a data writer as per user input"
        self.writer = self.modes[formatnm](log_dir, column_length, filename_prefix)

    def write(self, currenttime, powerflow_output, convergence):
        "Enables incremental write to the data writer object"
        self.writer.write(currenttime, powerflow_output, convergence)

    def close_store(self):
        pass

__init__(log_dir, formatnm, column_length, filename_prefix)

Sets up a data writer as per user input

Source code in pypsse\data_writers\data_writer.py
23
24
25
def __init__(self, log_dir, formatnm, column_length, filename_prefix):
    "Sets up a data writer as per user input"
    self.writer = self.modes[formatnm](log_dir, column_length, filename_prefix)

write(currenttime, powerflow_output, convergence)

Enables incremental write to the data writer object

Source code in pypsse\data_writers\data_writer.py
27
28
29
def write(self, currenttime, powerflow_output, convergence):
    "Enables incremental write to the data writer object"
    self.writer.write(currenttime, powerflow_output, convergence)

CSVWriter

Class that handles writing simulation results to csv files.

Source code in pypsse\data_writers\csv.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class CSVWriter:
    """Class that handles writing simulation results to csv
    files.
    """

    def __init__(self, log_dir: Path, column_length: int, filename_prefix:str=""):
        """Constructor for csv writer

        Args:
            log_dir (Path): output path (dirctory)
            column_length (int): number of data columns
        """
        self.filename_prefix=filename_prefix
        self.column_length = column_length
        self.log_dir = log_dir
        self.timestamps = []
        self.chunkRows = 1
        self.dfs = {}
        self.step = 0
        # Create arrow writer for each object type

    def write(self, currenttime: datetime, powerflow_output: dict):
        """Writes the status of assets at a particular timestep to a csv file.

        Args:
            currenttime (datetime): simulator time step
            powerflow_output (dict): simulation results
        """

        # Iterate through each object type
        self.timestamps.append(currenttime)
        for obj_type in powerflow_output:
            data = powerflow_output[obj_type]
            if obj_type not in self.dfs:
                self.dfs[obj_type] = [data]
            else:
                if self.dfs[obj_type] is None:
                    self.dfs[obj_type] = [data]
                else:
                    self.dfs[obj_type].append(data)

            if self.step % self.chunkRows == self.chunkRows - 1:
                fpath = os.path.join(self.log_dir, f"{self.filename_prefix}_{obj_type}.csv")
                self.dfs[obj_type] = pd.DataFrame(self.dfs[obj_type], index=self.timestamps)
                self.dfs[obj_type].to_csv(fpath, mode="a")

                self.dfs[obj_type] = None
            self.Timestamp[self.step - 1] = np.string_(str(currenttime))
        self.step += 1

__init__(log_dir, column_length, filename_prefix='')

Constructor for csv writer

Parameters:

Name Type Description Default
log_dir Path

output path (dirctory)

required
column_length int

number of data columns

required
Source code in pypsse\data_writers\csv.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, log_dir: Path, column_length: int, filename_prefix:str=""):
    """Constructor for csv writer

    Args:
        log_dir (Path): output path (dirctory)
        column_length (int): number of data columns
    """
    self.filename_prefix=filename_prefix
    self.column_length = column_length
    self.log_dir = log_dir
    self.timestamps = []
    self.chunkRows = 1
    self.dfs = {}
    self.step = 0

write(currenttime, powerflow_output)

Writes the status of assets at a particular timestep to a csv file.

Parameters:

Name Type Description Default
currenttime datetime

simulator time step

required
powerflow_output dict

simulation results

required
Source code in pypsse\data_writers\csv.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
def write(self, currenttime: datetime, powerflow_output: dict):
    """Writes the status of assets at a particular timestep to a csv file.

    Args:
        currenttime (datetime): simulator time step
        powerflow_output (dict): simulation results
    """

    # Iterate through each object type
    self.timestamps.append(currenttime)
    for obj_type in powerflow_output:
        data = powerflow_output[obj_type]
        if obj_type not in self.dfs:
            self.dfs[obj_type] = [data]
        else:
            if self.dfs[obj_type] is None:
                self.dfs[obj_type] = [data]
            else:
                self.dfs[obj_type].append(data)

        if self.step % self.chunkRows == self.chunkRows - 1:
            fpath = os.path.join(self.log_dir, f"{self.filename_prefix}_{obj_type}.csv")
            self.dfs[obj_type] = pd.DataFrame(self.dfs[obj_type], index=self.timestamps)
            self.dfs[obj_type].to_csv(fpath, mode="a")

            self.dfs[obj_type] = None
        self.Timestamp[self.step - 1] = np.string_(str(currenttime))
    self.step += 1

JSONWriter

Class that handles writing simulation results to json files.

Source code in pypsse\data_writers\json.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class JSONWriter:
    """Class that handles writing simulation results to json
    files.
    """

    def __init__(self, log_dir: Path, column_length: int, filename_prefix:str=""):
        """Constructor for json writer

        Args:
            log_dir (Path): output path (dirctory)
            column_length (int): number of data columns
        """
        self.filename_prefix=filename_prefix
        self.column_length = column_length
        self.log_dir = log_dir
        self.chunk_rows = 1
        self.handles = {}
        self.dfs = {}
        self.step = 0
        # Create arrow writer for each object type

    def write(self, currenttime: datetime, powerflow_output: dict, _:bool=None):
        """Writes the status of assets at a particular timestep to a json file.

        Args:
            currenttime (datetime): simulator time step
            powerflow_output (dict): simulation results
        """
        # Iterate through each object type

        for obj_type in powerflow_output:

            fpath = os.path.join(self.log_dir, f"{self.filename_prefix}_{obj_type}.json")
            if self.step == 0:
                f = open(fpath, "w")
                f.close()
                self.handles[obj_type] = open(fpath, "a")
            data = powerflow_output[obj_type]
            if obj_type not in self.dfs:
                self.dfs[obj_type] = {str(currenttime): data}
            else:
                if self.dfs[obj_type] is None:
                    self.dfs[obj_type] = {str(currenttime): data}
                else:
                    self.dfs[obj_type][currenttime] = data
            if self.step % self.chunk_rows == self.chunk_rows - 1:
                try:
                    json.dump(self.dfs[obj_type], self.handles[obj_type], indent=4)
                    self.handles[obj_type].flush()
                    self.dfs[obj_type] = None
                except Exception as E:
                    logger.warning(f"Unable to write property {obj_type} to file: {E!s}")

        self.step += 1

__init__(log_dir, column_length, filename_prefix='')

Constructor for json writer

Parameters:

Name Type Description Default
log_dir Path

output path (dirctory)

required
column_length int

number of data columns

required
Source code in pypsse\data_writers\json.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def __init__(self, log_dir: Path, column_length: int, filename_prefix:str=""):
    """Constructor for json writer

    Args:
        log_dir (Path): output path (dirctory)
        column_length (int): number of data columns
    """
    self.filename_prefix=filename_prefix
    self.column_length = column_length
    self.log_dir = log_dir
    self.chunk_rows = 1
    self.handles = {}
    self.dfs = {}
    self.step = 0

write(currenttime, powerflow_output, _=None)

Writes the status of assets at a particular timestep to a json file.

Parameters:

Name Type Description Default
currenttime datetime

simulator time step

required
powerflow_output dict

simulation results

required
Source code in pypsse\data_writers\json.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def write(self, currenttime: datetime, powerflow_output: dict, _:bool=None):
    """Writes the status of assets at a particular timestep to a json file.

    Args:
        currenttime (datetime): simulator time step
        powerflow_output (dict): simulation results
    """
    # Iterate through each object type

    for obj_type in powerflow_output:

        fpath = os.path.join(self.log_dir, f"{self.filename_prefix}_{obj_type}.json")
        if self.step == 0:
            f = open(fpath, "w")
            f.close()
            self.handles[obj_type] = open(fpath, "a")
        data = powerflow_output[obj_type]
        if obj_type not in self.dfs:
            self.dfs[obj_type] = {str(currenttime): data}
        else:
            if self.dfs[obj_type] is None:
                self.dfs[obj_type] = {str(currenttime): data}
            else:
                self.dfs[obj_type][currenttime] = data
        if self.step % self.chunk_rows == self.chunk_rows - 1:
            try:
                json.dump(self.dfs[obj_type], self.handles[obj_type], indent=4)
                self.handles[obj_type].flush()
                self.dfs[obj_type] = None
            except Exception as E:
                logger.warning(f"Unable to write property {obj_type} to file: {E!s}")

    self.step += 1

Utility functions

DynamicUtils

Utility functions for dynamic simulations

Source code in pypsse\utils\dynamic_utils.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
class DynamicUtils:
    "Utility functions for dynamic simulations"

    dynamic_params: ClassVar[List[str]] = ["FmA", "FmB", "FmC", "FmD", "Fel"]

    def disable_generation_for_coupled_buses(self):
        """Disables generation of coupled buses (co-simulation mode only)"""
        if (
            self.settings.helics
            and self.settings.helics.cosimulation_mode
            and self.settings.helics.disable_generation_on_coupled_buses
        ):
            sub_data = pd.read_csv(self.settings.simulation.subscriptions_file)
            sub_data = sub_data[sub_data["element_type"] == "Load"]
            generators = {}
            generator_list = {}

            for gen_bus, gen_id in self.raw_data.generators:
                if gen_bus not in generator_list:
                    generator_list[gen_bus] = []
                generator_list[gen_bus].append(gen_id)

            for _, row in sub_data.iterrows():
                bus = row["bus"]
                generators[bus] = generator_list[bus]

            for bus_id, machines in generators.items():
                for machine in machines:
                    intgar = [0, self._i, self._i, self._i, self._i, self._i]
                    realar = [
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                        self._f,
                    ]
                    self.psse.machine_chng_2(bus_id, machine, intgar, realar)
                    logger.info(f"Machine disabled: {bus_id}_{machine}")

    def disable_load_models_for_coupled_buses(self):
        """Disables loads of coupled buses (co-simulation mode only)"""
        if self.settings.helics and self.settings.helics.cosimulation_mode:
            sub_data = pd.read_csv(self.settings.simulation.subscriptions_file)
            sub_data = sub_data[sub_data["element_type"] == "Load"]

            self.psse_dict = {}
            for _, row in sub_data.iterrows():
                bus = row["bus"]
                load = row["element_id"]
                ierr = self.psse.ldmod_status(0, int(bus), str(load), 1, 0)
                if ierr == 0:
                    logger.info(f"Dynamic model for load {load} connected to bus {bus} has been disabled")
                elif ierr == 5:
                    logger.error(f"No dynamic model found for load {load} connected to bus {bus}")
                else:
                    raise Exception(f"error={ierr}") 

    def break_loads(self, loads: list = None, components_to_replace: List[str] = []):
        """Implements the load split logic

        Args:
            loads (list, optional): list of coupled loads. Defaults to None.
            components_to_replace (List[str], optional): components to be simulated on distribution side. Defaults to [].
        """

        components_to_stay = [x for x in self.dynamic_params if x not in components_to_replace]
        if loads is None:
            loads = self._get_coupled_loads()
        loads = self._get_load_static_data(loads)
        loads = self._get_load_dynamic_data(loads)
        loads = self._replicate_coupled_load(loads, components_to_replace)
        self._update_dynamic_parameters(loads, components_to_stay, components_to_replace)

    def _update_dynamic_parameters(self, loads: dict, components_to_stay: list, components_to_replace: list):
        """Updates dynamic parameters of composite old / replicated load models

        Args:
            loads (dict): load dictionary
            components_to_stay (list): components to be simulated on transmission side
            components_to_replace (list): components to be simulated on distribution side
        """

        new_percentages = {}
        for load in loads:
            count = 0
            for comp in components_to_stay:
                count += load[comp]
            for comp in components_to_stay:
                new_percentages[comp] = load[comp] / count
            for comp in components_to_replace:
                new_percentages[comp] = 0.0

            settings = self._get_load_dynamic_properties(load)
            #
            for k, v in new_percentages.items():
                idx = dyn_only_options["Loads"]["lmodind"][k]
                settings[idx] = v
                # self.psse.change_ldmod_con(load['bus'], 'XX' ,r"""CMLDBLU2""" ,idx ,v)
            values = list(settings.values())
            self.psse.add_load_model(load["bus"], "XX", 0, 1, r"""CMLDBLU2""", 2, [0, 0], ["", ""], 133, values)
            logger.info(f"Dynamic model parameters for load {load['id']} at bus 'XX' changed.")

    def _get_load_dynamic_properties(self, load):
        "Returns dynamic parameters of composite load models"
        settings = {}
        for i in range(133):
            ierr, con_index = self.psse.lmodind(load["bus"], str(load["id"]), "CHARAC", "CON")
            if con_index is not None:
                act_con_index = con_index + i
                ierr, value = self.psse.dsrval("CON", act_con_index)
                assert ierr == 0, f"error={ierr}"
                settings[i] = value
        return settings

    def _replicate_coupled_load(self, loads: dict, components_to_replace: list):
        """create a replica of composite load model

        Args:
            loads (dict): load dictionary
            components_to_replace (list): composite load models to replace on distribution side

        Returns:
            dict: updated load dictionary
        """

        for load in loads:
            dynamic_percentage = load["FmA"] + load["FmB"] + load["FmC"] + load["FmD"] + load["Fel"]
            static_percentage = 1.0 - dynamic_percentage
            for comp in components_to_replace:
                static_percentage += load[comp]
            remaining_load = 1 - static_percentage
            total_load = load["MVA"]
            total_distribution_load = total_load * static_percentage
            total_transmission_load = total_load * remaining_load
            # ceate new load
            self.psse.load_data_5(
                load["bus"],
                "XX",
                realar=[total_transmission_load.real, total_transmission_load.imag, 0.0, 0.0, 0.0, 0.0],
                lodtyp="replica",
            )
            # ierr, cmpval = self.psse.loddt2(load["bus"], "XX" ,"MVA" , "ACT")
            # modify old load
            self.psse.load_data_5(
                load["bus"],
                str(load["id"]),
                realar=[total_distribution_load.real, total_distribution_load.imag, 0.0, 0.0, 0.0, 0.0],
                lodtyp="original",
            )
            # ierr, cmpval = self.psse.loddt2(load["bus"], load["id"] ,"MVA" , "ACT")
            logger.info(f"Original load {load['id']} @ bus {load['bus']}: {total_load}")
            logger.info(f"New load 'XX' @ bus {load['bus']} created successfully: {total_transmission_load}")
            logger.info(f"Load {load['id']} @ bus {load['bus']} updated : {total_distribution_load}")
            load["distribution"] = total_distribution_load
            load["transmission"] = total_transmission_load
        return loads

    def _get_coupled_loads(self) -> list:
        """Returns a list of all coupled loads ina give simualtion

        Returns:
            list: list of coupled loads
        """

        sub_data = pd.read_csv(
            os.path.join(
                self.settings["Simulation"]["Project Path"], "Settings", self.settings["HELICS"]["Subscriptions file"]
            )
        )
        load = []
        for _, row in sub_data.iterrows():
            if row["element_type"] == "Load":
                load.append(
                    {
                        "type": row["element_type"],
                        "id": row["element_id"],
                        "bus": row["bus"],
                    }
                )
        return load

    def _get_load_static_data(self, loads: list) -> dict:
        """Returns static data for load models

        Args:
            loads (list): list of load names

        Returns:
            dict: mapping load to static values
        """

        values = ["MVA", "IL", "YL", "TOTAL"]
        for load in loads:
            for v in values:
                ierr, cmpval = self.psse.loddt2(load["bus"], str(load["id"]), v, "ACT")
                load[v] = cmpval
        return loads

    def _get_load_dynamic_data(self, loads: list) -> dict:
        """Returns dynamic data for load models

        Args:
            loads (list): list of load names

        Returns:
            dict: mapping load to dynamic values
        """

        values = dyn_only_options["Loads"]["lmodind"]
        for load in loads:
            for v, con_ind in values.items():
                ierr = self.psse.inilod(load["bus"])
                assert ierr == 0, f"error={ierr}"
                ierr, ld_id = self.psse.nxtlod(load["bus"])
                assert ierr == 0, f"error={ierr}"
                if ld_id is not None:
                    ierr, con_index = self.psse.lmodind(load["bus"], ld_id, "CHARAC", "CON")
                    assert ierr == 0, f"error={ierr}"
                    if con_index is not None:
                        act_con_index = con_index + con_ind
                        ierr, value = self.psse.dsrval("CON", act_con_index)
                        assert ierr == 0, f"error={ierr}"
                        load[v] = value
        return loads

    def setup_machine_channels(self, machines: dict, properties: list):
        """sets up machine channels

        Args:
            machines (dict): mapping machine to connected bus
            properties (list): list of machine properties
        """

        for _, qty in enumerate(properties):
            if qty not in self.channel_map:
                nqty = f"MACHINE_{qty}"
                self.channel_map[nqty] = {}
            for mch, b in machines:
                if qty in MACHINE_CHANNELS:
                    self.channel_map[nqty][f"{b}_{mch}"] = [self.chnl_idx]
                    chnl_id = MACHINE_CHANNELS[qty]
                    logger.info(f"{qty} for machine {b}_{mch} added to channel {self.chnl_idx}")
                    self.psse.machine_array_channel([self.chnl_idx, chnl_id, int(b)], mch, "")
                    self.chnl_idx += 1

    def setup_load_channels(self, loads: list):
        """Sets up load channels

        Args:
            loads (list): list of loads
        """

        if "LOAD_P" not in self.channel_map:
            self.channel_map["LOAD_P"] = {}
            self.channel_map["LOAD_Q"] = {}

        for ld, b in loads:
            self.channel_map["LOAD_P"][f"{b}_{ld}"] = [self.chnl_idx]
            self.channel_map["LOAD_Q"][f"{b}_{ld}"] = [self.chnl_idx + 1]
            self.psse.load_array_channel([self.chnl_idx, 1, int(b)], ld, "")
            self.psse.load_array_channel([self.chnl_idx + 1, 2, int(b)], ld, "")
            logger.info(f"P and Q for load {b}_{ld} added to channel {self.chnl_idx} and {self.chnl_idx + 1}")
            self.chnl_idx += 2

    def setup_bus_channels(self, buses: list, properties: list):
        """Sets up bus channels

        Args:
            buses (list): list of buses
            properties (dict): list of bus properties
        """

        for _, qty in enumerate(properties):
            if qty not in self.channel_map:
                self.channel_map[qty] = {}
            for _, b in enumerate(buses):
                if qty == "frequency":
                    self.channel_map[qty][b] = [self.chnl_idx]
                    self.psse.bus_frequency_channel([self.chnl_idx, int(b)], "")
                    logger.info(f"Frequency for bus {b} added to channel { self.chnl_idx}")
                    self.chnl_idx += 1
                elif qty == "voltage_and_angle":
                    self.channel_map[qty][b] = [self.chnl_idx, self.chnl_idx + 1]
                    self.psse.voltage_and_angle_channel([self.chnl_idx, -1, -1, int(b)], "")
                    logger.info(f"Voltage and angle for bus {b} added to channel {self.chnl_idx} and {self.chnl_idx+1}")
                    self.chnl_idx += 2

    def poll_channels(self) -> dict:
        """Polls all channels adde during the setup process

        Returns:
            dict: mapping of polled channels to values
        """

        results = {}
        for ppty, b_dict in self.channel_map.items():
            ppty_new = ppty.split("_and_")
            for b, indices in b_dict.items():
                for n, idx in zip(ppty_new, indices):
                    if "_" not in n:
                        n_name = f"BUS_{n}"
                    else:
                        n_name = n
                    if n_name not in results:
                        results[n_name] = {}
                    ierr, value = self.psse.chnval(idx)
                    assert ierr == 0, f"error={ierr}"
                    if value is None:
                        value = -1
                    results[n_name][b] = value
        return results

    def setup_all_channels(self):
        """Sets up all user-defined channels for a project"""

        self.channel_map = {}
        self.chnl_idx = 1
        if not self.export_settings.channel_setup:
            return

        for channel in self.export_settings.channel_setup:
            method_type = channel.asset_type
            if method_type == "buses":
                self.setup_bus_channels(channel.asset_list, channel.asset_properties)
            elif method_type == "loads":
                load_list = [[x, int(y)] for x, y in channel.asset_list]
                self.setup_load_channels(load_list)
            elif method_type == "machines":
                machine_list = [[x, int(y)] for x, y in channel.asset_list]
                self.setup_machine_channels(machine_list, channel.asset_properties)

break_loads(loads=None, components_to_replace=[])

Implements the load split logic

Parameters:

Name Type Description Default
loads list

list of coupled loads. Defaults to None.

None
components_to_replace List[str]

components to be simulated on distribution side. Defaults to [].

[]
Source code in pypsse\utils\dynamic_utils.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def break_loads(self, loads: list = None, components_to_replace: List[str] = []):
    """Implements the load split logic

    Args:
        loads (list, optional): list of coupled loads. Defaults to None.
        components_to_replace (List[str], optional): components to be simulated on distribution side. Defaults to [].
    """

    components_to_stay = [x for x in self.dynamic_params if x not in components_to_replace]
    if loads is None:
        loads = self._get_coupled_loads()
    loads = self._get_load_static_data(loads)
    loads = self._get_load_dynamic_data(loads)
    loads = self._replicate_coupled_load(loads, components_to_replace)
    self._update_dynamic_parameters(loads, components_to_stay, components_to_replace)

disable_generation_for_coupled_buses()

Disables generation of coupled buses (co-simulation mode only)

Source code in pypsse\utils\dynamic_utils.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def disable_generation_for_coupled_buses(self):
    """Disables generation of coupled buses (co-simulation mode only)"""
    if (
        self.settings.helics
        and self.settings.helics.cosimulation_mode
        and self.settings.helics.disable_generation_on_coupled_buses
    ):
        sub_data = pd.read_csv(self.settings.simulation.subscriptions_file)
        sub_data = sub_data[sub_data["element_type"] == "Load"]
        generators = {}
        generator_list = {}

        for gen_bus, gen_id in self.raw_data.generators:
            if gen_bus not in generator_list:
                generator_list[gen_bus] = []
            generator_list[gen_bus].append(gen_id)

        for _, row in sub_data.iterrows():
            bus = row["bus"]
            generators[bus] = generator_list[bus]

        for bus_id, machines in generators.items():
            for machine in machines:
                intgar = [0, self._i, self._i, self._i, self._i, self._i]
                realar = [
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                    self._f,
                ]
                self.psse.machine_chng_2(bus_id, machine, intgar, realar)
                logger.info(f"Machine disabled: {bus_id}_{machine}")

disable_load_models_for_coupled_buses()

Disables loads of coupled buses (co-simulation mode only)

Source code in pypsse\utils\dynamic_utils.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def disable_load_models_for_coupled_buses(self):
    """Disables loads of coupled buses (co-simulation mode only)"""
    if self.settings.helics and self.settings.helics.cosimulation_mode:
        sub_data = pd.read_csv(self.settings.simulation.subscriptions_file)
        sub_data = sub_data[sub_data["element_type"] == "Load"]

        self.psse_dict = {}
        for _, row in sub_data.iterrows():
            bus = row["bus"]
            load = row["element_id"]
            ierr = self.psse.ldmod_status(0, int(bus), str(load), 1, 0)
            if ierr == 0:
                logger.info(f"Dynamic model for load {load} connected to bus {bus} has been disabled")
            elif ierr == 5:
                logger.error(f"No dynamic model found for load {load} connected to bus {bus}")
            else:
                raise Exception(f"error={ierr}") 

poll_channels()

Polls all channels adde during the setup process

Returns:

Name Type Description
dict dict

mapping of polled channels to values

Source code in pypsse\utils\dynamic_utils.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
def poll_channels(self) -> dict:
    """Polls all channels adde during the setup process

    Returns:
        dict: mapping of polled channels to values
    """

    results = {}
    for ppty, b_dict in self.channel_map.items():
        ppty_new = ppty.split("_and_")
        for b, indices in b_dict.items():
            for n, idx in zip(ppty_new, indices):
                if "_" not in n:
                    n_name = f"BUS_{n}"
                else:
                    n_name = n
                if n_name not in results:
                    results[n_name] = {}
                ierr, value = self.psse.chnval(idx)
                assert ierr == 0, f"error={ierr}"
                if value is None:
                    value = -1
                results[n_name][b] = value
    return results

setup_all_channels()

Sets up all user-defined channels for a project

Source code in pypsse\utils\dynamic_utils.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def setup_all_channels(self):
    """Sets up all user-defined channels for a project"""

    self.channel_map = {}
    self.chnl_idx = 1
    if not self.export_settings.channel_setup:
        return

    for channel in self.export_settings.channel_setup:
        method_type = channel.asset_type
        if method_type == "buses":
            self.setup_bus_channels(channel.asset_list, channel.asset_properties)
        elif method_type == "loads":
            load_list = [[x, int(y)] for x, y in channel.asset_list]
            self.setup_load_channels(load_list)
        elif method_type == "machines":
            machine_list = [[x, int(y)] for x, y in channel.asset_list]
            self.setup_machine_channels(machine_list, channel.asset_properties)

setup_bus_channels(buses, properties)

Sets up bus channels

Parameters:

Name Type Description Default
buses list

list of buses

required
properties dict

list of bus properties

required
Source code in pypsse\utils\dynamic_utils.py
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
def setup_bus_channels(self, buses: list, properties: list):
    """Sets up bus channels

    Args:
        buses (list): list of buses
        properties (dict): list of bus properties
    """

    for _, qty in enumerate(properties):
        if qty not in self.channel_map:
            self.channel_map[qty] = {}
        for _, b in enumerate(buses):
            if qty == "frequency":
                self.channel_map[qty][b] = [self.chnl_idx]
                self.psse.bus_frequency_channel([self.chnl_idx, int(b)], "")
                logger.info(f"Frequency for bus {b} added to channel { self.chnl_idx}")
                self.chnl_idx += 1
            elif qty == "voltage_and_angle":
                self.channel_map[qty][b] = [self.chnl_idx, self.chnl_idx + 1]
                self.psse.voltage_and_angle_channel([self.chnl_idx, -1, -1, int(b)], "")
                logger.info(f"Voltage and angle for bus {b} added to channel {self.chnl_idx} and {self.chnl_idx+1}")
                self.chnl_idx += 2

setup_load_channels(loads)

Sets up load channels

Parameters:

Name Type Description Default
loads list

list of loads

required
Source code in pypsse\utils\dynamic_utils.py
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def setup_load_channels(self, loads: list):
    """Sets up load channels

    Args:
        loads (list): list of loads
    """

    if "LOAD_P" not in self.channel_map:
        self.channel_map["LOAD_P"] = {}
        self.channel_map["LOAD_Q"] = {}

    for ld, b in loads:
        self.channel_map["LOAD_P"][f"{b}_{ld}"] = [self.chnl_idx]
        self.channel_map["LOAD_Q"][f"{b}_{ld}"] = [self.chnl_idx + 1]
        self.psse.load_array_channel([self.chnl_idx, 1, int(b)], ld, "")
        self.psse.load_array_channel([self.chnl_idx + 1, 2, int(b)], ld, "")
        logger.info(f"P and Q for load {b}_{ld} added to channel {self.chnl_idx} and {self.chnl_idx + 1}")
        self.chnl_idx += 2

setup_machine_channels(machines, properties)

sets up machine channels

Parameters:

Name Type Description Default
machines dict

mapping machine to connected bus

required
properties list

list of machine properties

required
Source code in pypsse\utils\dynamic_utils.py
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
def setup_machine_channels(self, machines: dict, properties: list):
    """sets up machine channels

    Args:
        machines (dict): mapping machine to connected bus
        properties (list): list of machine properties
    """

    for _, qty in enumerate(properties):
        if qty not in self.channel_map:
            nqty = f"MACHINE_{qty}"
            self.channel_map[nqty] = {}
        for mch, b in machines:
            if qty in MACHINE_CHANNELS:
                self.channel_map[nqty][f"{b}_{mch}"] = [self.chnl_idx]
                chnl_id = MACHINE_CHANNELS[qty]
                logger.info(f"{qty} for machine {b}_{mch} added to channel {self.chnl_idx}")
                self.psse.machine_array_channel([self.chnl_idx, chnl_id, int(b)], mch, "")
                self.chnl_idx += 1

Contincency interface

This module manages contingency modeling in PyPSSE

BaseFault

Base class defination for all fault types

Source code in pypsse\contingencies.py
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class BaseFault:
    "Base class defination for all fault types"
    __metaclass__ = ABCMeta

    requirements = []
    fault_settings = {}
    fault_method = ""
    element = None

    def __init__(self, psse, settings, contingency_type):
        self.contingency_type = contingency_type
        self.settings = settings
        self.psse = psse
        self.enabled = False
        self.tripped = False

    def update(self, t: float):
        """updates a fault event

        Args:
            t (float): simuation time in seconds
        """
        self.t = t
        if hasattr(self.settings, "duration"):
            if (
                self.settings.time + self.settings.duration
                > t
                >= self.settings.time
                and not self.enabled
            ):
                self.enabled = True
                self.enable_fault()
            if (
                t >= self.settings.time + self.settings.duration
                and self.enabled
            ):
                self.enabled = False
                self.disable_fault()
        elif (
            not hasattr(self.settings, "duration")
            and t >= self.settings.time
            and not self.tripped
        ):
            self.enable_fault()
            self.tripped = True

    def enable_fault(self):
        """enables a fault event"""
        err = getattr(self.psse, self.fault_method)(**self.fault_settings)
        if err:
            logger.warning(
                f"Unable to enable {self.fault_method} at element {self.element}"
            )
        else:
            logger.debug(
                f"{self.fault_method} applied to {self.element} at time {self.t} seconds"
            )

    def disable_fault(self):
        """disables a fault event"""
        err = self.psse.dist_clear_fault()
        if err:
            logger.warning(
                f"Unable to clear {self.fault_method} at element {self.element}"
            )
        else:
            logger.debug(
                f"{self.fault_method} cleared at element {self.element} at time {self.t} seconds"
            )

    def is_enabled(self):
        """Returns enabled status

        Returns:
            _type_: rue if the fault object is enabled else false
        """
        return self.enabled

    def is_tripped(self) -> bool:
        """Returns trip status

        Returns:
            bool: true if the fault object is tripped else false
        """
        return self.tripped

disable_fault()

disables a fault event

Source code in pypsse\contingencies.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def disable_fault(self):
    """disables a fault event"""
    err = self.psse.dist_clear_fault()
    if err:
        logger.warning(
            f"Unable to clear {self.fault_method} at element {self.element}"
        )
    else:
        logger.debug(
            f"{self.fault_method} cleared at element {self.element} at time {self.t} seconds"
        )

enable_fault()

enables a fault event

Source code in pypsse\contingencies.py
80
81
82
83
84
85
86
87
88
89
90
def enable_fault(self):
    """enables a fault event"""
    err = getattr(self.psse, self.fault_method)(**self.fault_settings)
    if err:
        logger.warning(
            f"Unable to enable {self.fault_method} at element {self.element}"
        )
    else:
        logger.debug(
            f"{self.fault_method} applied to {self.element} at time {self.t} seconds"
        )

is_enabled()

Returns enabled status

Returns:

Name Type Description
_type_

rue if the fault object is enabled else false

Source code in pypsse\contingencies.py
104
105
106
107
108
109
110
def is_enabled(self):
    """Returns enabled status

    Returns:
        _type_: rue if the fault object is enabled else false
    """
    return self.enabled

is_tripped()

Returns trip status

Returns:

Name Type Description
bool bool

true if the fault object is tripped else false

Source code in pypsse\contingencies.py
112
113
114
115
116
117
118
def is_tripped(self) -> bool:
    """Returns trip status

    Returns:
        bool: true if the fault object is tripped else false
    """
    return self.tripped

update(t)

updates a fault event

Parameters:

Name Type Description Default
t float

simuation time in seconds

required
Source code in pypsse\contingencies.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def update(self, t: float):
    """updates a fault event

    Args:
        t (float): simuation time in seconds
    """
    self.t = t
    if hasattr(self.settings, "duration"):
        if (
            self.settings.time + self.settings.duration
            > t
            >= self.settings.time
            and not self.enabled
        ):
            self.enabled = True
            self.enable_fault()
        if (
            t >= self.settings.time + self.settings.duration
            and self.enabled
        ):
            self.enabled = False
            self.disable_fault()
    elif (
        not hasattr(self.settings, "duration")
        and t >= self.settings.time
        and not self.tripped
    ):
        self.enable_fault()
        self.tripped = True

BusFaultObject

Bases: BaseFault

Class defination for a bus fault

Source code in pypsse\contingencies.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
class BusFaultObject(BaseFault):
    "Class defination for a bus fault"
    fault_method = "dist_bus_fault"
    fault_settings = {}

    def __init__(self, psse: object, settings: BusFault, contingency_type: str):
        """bus fault object

        Args:
            psse (object): simulator type
            settings (BusFault): bus fault object
            contingency_type (str): contingency type
        """
        super().__init__(psse, settings, contingency_type)
        self.fault_settings["ibus"] = settings.bus_id
        self.fault_settings["units"] = 3
        self.fault_settings["values"] = settings.fault_impedance
        self.fault_settings["basekv"] = 0.0
        self.element = settings.bus_id

__init__(psse, settings, contingency_type)

bus fault object

Parameters:

Name Type Description Default
psse object

simulator type

required
settings BusFault

bus fault object

required
contingency_type str

contingency type

required
Source code in pypsse\contingencies.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def __init__(self, psse: object, settings: BusFault, contingency_type: str):
    """bus fault object

    Args:
        psse (object): simulator type
        settings (BusFault): bus fault object
        contingency_type (str): contingency type
    """
    super().__init__(psse, settings, contingency_type)
    self.fault_settings["ibus"] = settings.bus_id
    self.fault_settings["units"] = 3
    self.fault_settings["values"] = settings.fault_impedance
    self.fault_settings["basekv"] = 0.0
    self.element = settings.bus_id

BusTripObject

Bases: BaseFault

Class defination for a bus trip

Source code in pypsse\contingencies.py
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
class BusTripObject(BaseFault):
    "Class defination for a bus trip"
    fault_method = "dist_bus_trip"
    fault_settings = {}

    def __init__(self, psse: object, settings: BusTrip, contingency_type: str):
        """Bus trip contingency

        Args:
            psse (object): simulator instance
            settings (BusTrip): bus trip model
            contingency_type (str): type of contingency
        """
        super().__init__(psse, settings, contingency_type)
        self.fault_settings["ibus"] = settings.bus_id
        self.element = settings.bus_id

__init__(psse, settings, contingency_type)

Bus trip contingency

Parameters:

Name Type Description Default
psse object

simulator instance

required
settings BusTrip

bus trip model

required
contingency_type str

type of contingency

required
Source code in pypsse\contingencies.py
191
192
193
194
195
196
197
198
199
200
201
def __init__(self, psse: object, settings: BusTrip, contingency_type: str):
    """Bus trip contingency

    Args:
        psse (object): simulator instance
        settings (BusTrip): bus trip model
        contingency_type (str): type of contingency
    """
    super().__init__(psse, settings, contingency_type)
    self.fault_settings["ibus"] = settings.bus_id
    self.element = settings.bus_id

LineFaultObject

Bases: BaseFault

Class defination for a line fault

Source code in pypsse\contingencies.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
class LineFaultObject(BaseFault):
    "Class defination for a line fault"
    fault_method = "dist_branch_fault"
    fault_settings = {}

    def __init__(
        self, psse: object, settings: LineFault, contingency_type: str
    ):
        """line fault model

        Args:
            psse (object): simulator instance
            settings (LineFault): line fault model
            contingency_type (str): contingecy type
        """
        super().__init__(psse, settings, contingency_type)
        self.fault_settings["ibus"] = settings.bus_ids[0]
        self.fault_settings["jbus"] = settings.bus_ids[1]
        self.fault_settings["id"] = settings.bus_ids[2]
        self.fault_settings["units"] = 3
        self.fault_settings["values"] = settings.fault_impedance
        self.fault_settings["basekv"] = 0.0
        self.element = settings.bus_ids

__init__(psse, settings, contingency_type)

line fault model

Parameters:

Name Type Description Default
psse object

simulator instance

required
settings LineFault

line fault model

required
contingency_type str

contingecy type

required
Source code in pypsse\contingencies.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def __init__(
    self, psse: object, settings: LineFault, contingency_type: str
):
    """line fault model

    Args:
        psse (object): simulator instance
        settings (LineFault): line fault model
        contingency_type (str): contingecy type
    """
    super().__init__(psse, settings, contingency_type)
    self.fault_settings["ibus"] = settings.bus_ids[0]
    self.fault_settings["jbus"] = settings.bus_ids[1]
    self.fault_settings["id"] = settings.bus_ids[2]
    self.fault_settings["units"] = 3
    self.fault_settings["values"] = settings.fault_impedance
    self.fault_settings["basekv"] = 0.0
    self.element = settings.bus_ids

LineTripObject

Bases: BaseFault

Class defination for a line trip

Source code in pypsse\contingencies.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
class LineTripObject(BaseFault):
    "Class defination for a line trip"
    fault_method = "dist_branch_trip"
    fault_settings = {}

    def __init__(self, psse: object, settings: LineTrip, contingency_type: str):
        """line trip model

        Args:
            psse (object): simulator instance
            settings (LineTrip): line trip model
            contingency_type (str): contingency type
        """
        super().__init__(psse, settings, contingency_type)
        self.fault_settings["ibus"] = settings.bus_ids[0]
        self.fault_settings["jbus"] = settings.bus_ids[1]
        self.element = settings.bus_ids

__init__(psse, settings, contingency_type)

line trip model

Parameters:

Name Type Description Default
psse object

simulator instance

required
settings LineTrip

line trip model

required
contingency_type str

contingency type

required
Source code in pypsse\contingencies.py
172
173
174
175
176
177
178
179
180
181
182
183
def __init__(self, psse: object, settings: LineTrip, contingency_type: str):
    """line trip model

    Args:
        psse (object): simulator instance
        settings (LineTrip): line trip model
        contingency_type (str): contingency type
    """
    super().__init__(psse, settings, contingency_type)
    self.fault_settings["ibus"] = settings.bus_ids[0]
    self.fault_settings["jbus"] = settings.bus_ids[1]
    self.element = settings.bus_ids

MachineTripObject

Bases: BaseFault

Class defination for a machine fault

Source code in pypsse\contingencies.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
class MachineTripObject(BaseFault):
    "Class defination for a machine fault"
    fault_method = "dist_machine_trip"
    fault_settings = {}

    def __init__(
        self, psse: object, settings: MachineTrip, contingency_type: str
    ):
        """Machine trip contingency

        Args:
            psse (object): simulator instance
            settings (MachineTrip): machine trip model
            contingency_type (str): type of contingency
        """
        super().__init__(psse, settings, contingency_type)
        self.fault_settings["ibus"] = settings.bus_id
        self.fault_settings["id"] = settings.machine_id
        self.element = settings.bus_id

__init__(psse, settings, contingency_type)

Machine trip contingency

Parameters:

Name Type Description Default
psse object

simulator instance

required
settings MachineTrip

machine trip model

required
contingency_type str

type of contingency

required
Source code in pypsse\contingencies.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
def __init__(
    self, psse: object, settings: MachineTrip, contingency_type: str
):
    """Machine trip contingency

    Args:
        psse (object): simulator instance
        settings (MachineTrip): machine trip model
        contingency_type (str): type of contingency
    """
    super().__init__(psse, settings, contingency_type)
    self.fault_settings["ibus"] = settings.bus_id
    self.fault_settings["id"] = settings.machine_id
    self.element = settings.bus_id

add_contingency(contingency, cont_dict, dt, system_contingencies)

Adds a new contingency

Source code in pypsse\contingencies.py
21
22
23
24
25
26
27
28
29
30
31
def add_contingency(contingency, cont_dict, dt, system_contingencies):
    "Adds a new contingency"
    ontingency_type = contingency.__class__.__name__
    if ontingency_type in contingencies:
        system_contingencies.append(contingencies[ontingency_type](**cont_dict))
    else:
        msg = "Invalid contingency type. Valid values are: {}".format(
            ",".join(contingencies.keys())
        )
        raise Exception(msg)
    return system_contingencies, dt

build_contingencies(psse, contingencies_)

Builds all contingencies defined in the settings file

Parameters:

Name Type Description Default
psse object

simulator instance

required
settings SimulationSettings

simulation settings

required

Returns:

Type Description
List[Union[BusFaultObject, BusTripObject, LineFaultObject, LineTripObject, MachineTripObject]]

List[Union[BusFaultObject, BusTripObject, LineFaultObject, LineTripObject, MachineTripObject]]: list of contingencies

Source code in pypsse\contingencies.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
def build_contingencies(
    psse: object, contingencies_: Union[Contingencies, SimulationSettings]
) -> List[
    Union[
        BusFaultObject,
        BusTripObject,
        LineFaultObject,
        LineTripObject,
        MachineTripObject,
    ]
]:
    """Builds all contingencies defined in the settings file

    Args:
        psse (object): simulator instance
        settings (SimulationSettings): simulation settings

    Returns:
        List[Union[BusFaultObject, BusTripObject, LineFaultObject, LineTripObject, MachineTripObject]]: list of contingencies
    """

    system_contingencies = []
    if contingencies_.contingencies:
        for contingency in contingencies_.contingencies:
            contingency_type = contingency.__class__.__name__
            if contingency_type in contingencies:
                system_contingencies.append(
                    contingencies[contingency_type](
                        psse, contingency, contingency_type
                    )
                )
                logger.debug(f'Contingency of type "{contingency_type}" added')
            else:
                logger.warning(
                    "Invalid contingency type. Valid values are: {}".format(
                        ",".join(contingencies)
                    )
                )
    else:
        logger.debug("No contingencies to build")
    return system_contingencies