'''
Backend functionality for reading and writing GDX files.
The GdxFile and GdxSymbol classes are full-featured interfaces
for going between the GDX format and pandas DataFrames,
including translation between GDX and numpy special values.
'''
from __future__ import absolute_import, print_function
from builtins import super
import atexit
from collections import defaultdict, OrderedDict
try:
from collections.abc import MutableSequence
except ImportError:
from collections import MutableSequence
import copy
from ctypes import c_bool
from enum import Enum
import logging
from numbers import Number
# try to import gdx loading utility
HAVE_GDX2PY = False
try:
# special Windows dll for optimized loading of GAMS parameters
import gdx2py
HAVE_GDX2PY = True
except ImportError: pass
# gdxpds needs to be imported before pandas to try to avoid library conflict on
# Linux that causes a segmentation fault.
from gdxpds import Error
from gdxpds.tools import NeedsGamsDir
import gdxcc
import numpy as np
import pandas as pd
import gdxpds.special as special
# Import some things for backward compatibility
from gdxpds.special import (
convert_gdx_to_np_svs,
convert_np_to_gdx_svs,
gdx_isnan,
gdx_val_equal,
NUMPY_SPECIAL_VALUES,
is_np_eps,
is_np_sv,
)
logger = logging.getLogger(__name__)
[docs]def replace_df_column(df,colname,new_col):
"""
Utility function that replaces df[colname] with new_col. Special
care is taken for the case when df has multiple columns named '*',
since this causes pandas to crash.
Parameters
----------
df : pandas.DataFrame
edited in place by this function
colname : str
name of column in df whose data is to be replaced
new_col : vector, list, pandas.Series
new column data for df[colname]
"""
cols = df.columns
tmpcols = [col if col != '*' else 'aaa' for col in cols ]
df.columns = tmpcols
df[colname] = new_col
df.columns = cols
return
[docs]class GdxError(Error):
def __init__(self, H, msg):
"""
Pulls information from gdxcc about the last encountered error and appends
it to msg.
Parameters
----------
H : pointer or None
SWIG binding pointer to a GDX object
msg : str
gdxpds error message
Attributes
----------
msg : str
msg that is passed in with a gdxErrorStr appended
"""
if H:
msg += ". " + gdxcc.gdxErrorStr(H, gdxcc.gdxGetLastError(H))[1] + "."
super().__init__(msg)
[docs]class GdxFile(MutableSequence, NeedsGamsDir):
def __init__(self,gams_dir=None,lazy_load=True):
"""
Initializes a GdxFile object by connecting to GAMS and creating a pointer.
Throws a GdxError if either of those operations fail.
Parameters
----------
gams_dir : None or str
lazy_load : bool
If True, :py:class:`GdxSymbol` data are not automatically loaded when the
symbols are initially :py:meth:`read`. Individual data tables can only be
accessed later after the corresponding calls to :py:meth:`GdxSymbol.load`.
If False, all data are automatically loaded and the full GDX file is
available in memory after the call to :py:meth:`read`.
"""
self.lazy_load = lazy_load
self._version = None
self._producer = None
self._filename = None
self._symbols = OrderedDict()
NeedsGamsDir.__init__(self,gams_dir=gams_dir)
self._H = self._create_gdx_object()
self.universal_set = GdxSymbol('*',GamsDataType.Set,dims=1,file=None,index=0)
self.universal_set._file = self
atexit.register(gdxcc.gdxFree, self.H)
return
[docs] def cleanup(self):
gdxcc.gdxFree(self.H)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.cleanup()
def __del__(self):
self.cleanup()
[docs] def clone(self):
"""
Returns a new GdxFile containing clones of the GdxSymbols in this
GdxFile. The clone will not be associated with a filename. The clone's
GdxSymbols will not have indexes. The clone will be ready to write to
a new file.
Returns
-------
:py:class:`GdxFile`
"""
result = GdxFile(gams_dir=self.gams_dir,lazy_load=False)
for symbol in self:
result.append(symbol.clone())
result[-1]._file = result
return result
@property
def empty(self):
"""
Returns True if this GdxFile object does not contain any symbols.
Returns
-------
bool
"""
return len(self) == 0
@property
def H(self):
"""
GDX object handle
"""
return self._H
@property
def filename(self):
"""
Filename this :py:class:`GdxFile` is associated with, if any
Returns
-------
None or str
"""
return self._filename
@property
def version(self):
"""
GDX file version
"""
return self._version
@property
def producer(self):
"""
What program wrote the GDX file
"""
return self._producer
@property
def num_elements(self):
"""
Total number of records present in this file, summed over all symbols.
Returns
-------
int
"""
return sum([symbol.num_records for symbol in self])
[docs] def read(self,filename):
"""
Opens gdx file at filename and reads meta-data. If not self.lazy_load,
also loads all symbols.
Throws an Error if not self.empty.
Throws a GdxError if any calls to gdxcc fail.
Parameters
----------
filename : pathlib.Path or str
"""
if not self.empty:
raise Error("GdxFile.read can only be used if the GdxFile is .empty")
# open the file
rc = gdxcc.gdxOpenRead(self.H,str(filename))
if not rc[0]:
raise GdxError(self.H,f"Could not open {filename!r}")
self._filename = filename
# read in meta-data ...
# ... for the file
ret, self._version, self._producer = gdxcc.gdxFileVersion(self.H)
if ret != 1:
raise GdxError(self.H,"Could not get file version")
ret, symbol_count, element_count = gdxcc.gdxSystemInfo(self.H)
logger.debug(f"Opening '{filename}' with {symbol_count} symbols and "
f"{element_count} elements with lazy_load = {self.lazy_load}.")
# ... for the symbols
ret, name, dims, data_type = gdxcc.gdxSymbolInfo(self.H,0)
if ret != 1:
raise GdxError(self.H,"Could not get symbol info for the universal set")
self.universal_set = GdxSymbol(name,data_type,dims=dims,file=self,index=0)
for i in range(symbol_count):
index = i + 1
ret, name, dims, data_type = gdxcc.gdxSymbolInfo(self.H,index)
if ret != 1:
raise GdxError(self.H,f"Could not get symbol info for symbol {index}")
try:
sym = GdxSymbol(name,data_type,dims=dims,file=self,index=index)
self.append(sym)
except Exception as e:
logger.error(f"Unable to initialize GdxSymbol {name!r}, because {e}. SKIPPING.")
# read all symbols if not lazy_load
if not self.lazy_load:
for symbol in self:
symbol.load()
return
[docs] def write(self,filename):
"""
Writes this :py:class:`GdxFile` to filename
Parameters
----------
filename : pathlib.Path or str
"""
# only write if all symbols loaded
for symbol in self:
if not symbol.loaded:
raise Error("All symbols must be loaded before this file can be written.")
ret = gdxcc.gdxOpenWrite(self.H,str(filename),"gdxpds")
if not ret:
raise GdxError(self.H, f"Could not open {filename!r} for writing. "
"Consider cloning this file (.clone()) before trying to write.")
self._filename = filename
# write the universal set
self.universal_set.write()
for i, symbol in enumerate(self,start=1):
try:
symbol.write(index=i)
except:
logger.error("Unable to write {} to {}".format(symbol,filename))
raise
gdxcc.gdxClose(self.H)
def __repr__(self):
return "GdxFile(self,gams_dir={},lazy_load={})".format(
repr(self.gams_dir),
repr(self.lazy_load))
def __str__(self):
s = "GdxFile containing {} symbols and {} elements.".format(len(self),self.num_elements)
sep = " Symbols:\n "
for symbol in self:
s += sep + str(symbol)
sep = "\n "
return s
[docs] def __getitem__(self,key):
"""
Supports list-like indexing and symbol-based indexing
Parameters
----------
key : int or str
If int, the index into the list of symbols. If str, the name of the symbol to
be accessed.
Returns
-------
:py:class:`GdxSymbol`
"""
return self._symbols[self._name_key(key)]
[docs] def __setitem__(self,key,value):
"""
Supports overwriting or adding a :py:class:`GdxSymbol` via a list-like interface
Parameters
----------
key : int
Must be an index into the list of symbols, within range(len(self)+1)
value : :py:class:`GdxSymbol`
"""
self._check_insert_setitem(key, value)
value._file = self
if key < len(self):
self._symbols[self._name_key(key)] = value
self._fixup_name_keys()
return
assert key == len(self)
self._symbols[value.name] = value
return
def __delitem__(self,key):
"""
Deletes a symbol from this :py:class:`GdxFile`'s collection
Parameters
----------
key : int or str
If int, the index into the list of symbols. If str, the name of the symbol to
be accessed.
"""
del self._symbols[self._name_key(key)]
return
def __len__(self):
"""
Number of :py:class:`GdxSymbol`s in this :py:class:`GdxFile`
"""
return len(self._symbols)
[docs] def insert(self,key,value):
"""
Inserts value at position key
Parameters
----------
key : int
Must be an index into the list of symbols, within range(len(self)+1)
value : :py:class:`GdxSymbol`
"""
self._check_insert_setitem(key, value)
value._file = self
if key == len(self) and value.name not in self._symbols:
# We can safely append the symbol. This is fast (O(log(n)) complexity)
self._symbols[value.name] = value
else:
# Need to insert inside the sequence. This is slow (O(n) complexity)
data = [(symbol.name, symbol) for symbol in self]
data.insert(key,(value.name,value))
self._symbols = OrderedDict(data)
return
def __contains__(self,key):
"""
Returns True if __getitem__ works with key.
"""
try:
self.__getitem__(key)
return True
except:
return False
[docs] def keys(self):
"""
List of symbol names obtained by iterating through this :py:class:`GdxFile`
Returns
-------
list of str
"""
return [symbol.name for symbol in self]
def _name_key(self,key):
name_key = key
if isinstance(key,int):
name_key = list(self._symbols.keys())[key]
return name_key
def _check_insert_setitem(self,key,value):
if not isinstance(value,GdxSymbol):
raise Error("GdxFiles only contain GdxSymbols. GdxFile was given a {}.".format(type(value)))
if not isinstance(key,int):
raise Error("When adding or replacing GdxSymbols in GdxFiles, only integer, not name indices, may be used.")
if key > len(self):
raise Error("Invalid key, {}".format(key))
return
def _fixup_name_keys(self):
self._symbols = OrderedDict([(symbol.name, symbol) for cur_key, symbol in self._symbols])
return
def _create_gdx_object(self):
H = gdxcc.new_gdxHandle_tp()
rc = gdxcc.gdxCreateD(H,self.gams_dir,gdxcc.GMS_SSSIZE)
if not rc:
raise GdxError(H,rc[1])
return H
[docs]class GamsDataType(Enum):
Set = gdxcc.GMS_DT_SET
Parameter = gdxcc.GMS_DT_PAR
Variable = gdxcc.GMS_DT_VAR
Equation = gdxcc.GMS_DT_EQU
Alias = gdxcc.GMS_DT_ALIAS
[docs]class GamsVariableType(Enum):
Unknown = gdxcc.GMS_VARTYPE_UNKNOWN
Binary = gdxcc.GMS_VARTYPE_BINARY
Integer = gdxcc.GMS_VARTYPE_INTEGER
Positive = gdxcc.GMS_VARTYPE_POSITIVE
Negative = gdxcc.GMS_VARTYPE_NEGATIVE
Free = gdxcc.GMS_VARTYPE_FREE
SOS1 = gdxcc.GMS_VARTYPE_SOS1
SOS2 = gdxcc.GMS_VARTYPE_SOS2
Semicont = gdxcc.GMS_VARTYPE_SEMICONT
Semiint = gdxcc.GMS_VARTYPE_SEMIINT
[docs]class GamsEquationType(Enum):
Equality = 53 + gdxcc.GMS_EQUTYPE_E
GreaterThan = 53 + gdxcc.GMS_EQUTYPE_G
LessThan = 53 + gdxcc.GMS_EQUTYPE_L
NothingEnforced = 53 + gdxcc.GMS_EQUTYPE_N
External = 53 + gdxcc.GMS_EQUTYPE_X
Conic = 53 + gdxcc.GMS_EQUTYPE_C
[docs]class GamsValueType(Enum):
Level = gdxcc.GMS_VAL_LEVEL # .l
Marginal = gdxcc.GMS_VAL_MARGINAL # .m
Lower = gdxcc.GMS_VAL_LOWER # .lo
Upper = gdxcc.GMS_VAL_UPPER # .ub
Scale = gdxcc.GMS_VAL_SCALE # .scale
@classmethod
def _missing_(cls, value):
if isinstance(value,str):
for value_type in cls:
if value_type.name == value:
return value_type
if value == 'Value':
return GamsValueType(GamsValueType.Level)
super()._missing_(value)
GAMS_VALUE_COLS_MAP = defaultdict(lambda : [('Value',GamsValueType.Level.value)])
"""
List of value columns provided for each :py:attr:`GamsValueType`
"""
GAMS_VALUE_COLS_MAP[GamsDataType.Variable] = [(value_type.name, value_type.value) for value_type in GamsValueType]
GAMS_VALUE_COLS_MAP[GamsDataType.Equation] = GAMS_VALUE_COLS_MAP[GamsDataType.Variable]
GAMS_VALUE_DEFAULTS = {
GamsValueType.Level: 0.0,
GamsValueType.Marginal: 0.0,
GamsValueType.Lower: -np.inf,
GamsValueType.Upper: np.inf,
GamsValueType.Scale: 1.0
}
"""
Default values for each :py:class:`GamsValueType`
"""
GAMS_VARIABLE_DEFAULT_LOWER_UPPER_BOUNDS = {
GamsVariableType.Unknown: (-np.inf,np.inf),
GamsVariableType.Binary: (0.0,1.0),
GamsVariableType.Integer: (0.0,np.inf),
GamsVariableType.Positive: (0.0,np.inf),
GamsVariableType.Negative: (-np.inf,0.0),
GamsVariableType.Free : (-np.inf,np.inf),
GamsVariableType.SOS1: (0.0,np.inf),
GamsVariableType.SOS2: (0.0,np.inf),
GamsVariableType.Semicont: (1.0,np.inf),
GamsVariableType.Semiint: (1.0,np.inf)
}
"""
Default lower and upper bounds for each :py:class:`GamsVariableType`
"""
[docs]class GdxSymbol(object):
def __init__(self,name,data_type,dims=0,file=None,index=None,
description='',variable_type=None,equation_type=None):
"""
In-memory representation of a GAMS GDX Symbol
Parameters
----------
name : str
data_type : :py:class:`GamsDataType`
dims : int or list of str
If dims is set to an int, then that number of dimensions will be created, each
indicated with the wildcard name '*'. Otherwise, a list of strings is expected,
each string being a dimension name.
file : None or :py:class:`GdxFile`
Users should not set file. File is set by, e.g., :py:meth:`GdxFile.read` and
:py:meth:`GdxFile.append`.
index : None or int
Users should not set file. File is set by, e.g., :py:meth:`GdxFile.read` and
:py:meth:`GdxFile.append`.
description : str
Human readable description for this :py:class:`GdxSymbol`
variable_type : None or :py:class:`GamsVariableType`
Only expected if data_type == :py:attr:`GamsDataType.Variable`
equation_type : None or :py:class:`GamsEquationType`
Only expected if data_type == :py:attr:`GamsDataType.Equation`
"""
self._name = name
self.description = description
self._loaded = False
self._data_type = GamsDataType(data_type)
self._variable_type = None; self.variable_type = variable_type
self._equation_type = None; self.equation_type = equation_type
self._dataframe = None; self._dims = None
self.dims = dims
assert self._dataframe is not None
self._file = file
self._index = index
# adding this flag to implement ability to load set text instead of boolean values
self._fixup_set_vals = True
if self.file is not None:
# reading from file
# get additional meta-data
ret, records, userinfo, description = gdxcc.gdxSymbolInfoX(self.file.H,self.index)
if ret != 1:
raise GdxError(self.file.H,"Unable to get extended symbol information for {}".format(self.name))
self._num_records = records
if self.data_type == GamsDataType.Variable:
self.variable_type = GamsVariableType(userinfo)
elif self.data_type == GamsDataType.Equation:
self.equation_type = GamsEquationType(userinfo)
self.description = description
if self.index > 0:
ret, gdx_domain = gdxcc.gdxSymbolGetDomainX(self.file.H,self.index)
if ret == 0:
raise GdxError(self.file.H,"Unable to get domain information for {}".format(self.name))
assert len(gdx_domain) == len(self.dims), "Dimensional information read in from GDX should be consistent."
self.dims = gdx_domain
else:
# universal set
assert self.index == 0
self._loaded = True
return
# writing new symbol
self._loaded = True
return
[docs] def clone(self):
"""
Create a copy of this :py:class:`GdxSymbol`
Returns
-------
:py:class:`GdxSymbol`
"""
if not self.loaded:
raise Error("Symbol {} cannot be cloned because it is not yet loaded.".format(repr(self.name)))
assert self.loaded
result = GdxSymbol(self.name,self.data_type,
dims=self.dims,
description=self.description,
variable_type=self.variable_type,
equation_type=self.equation_type)
result.dataframe = copy.deepcopy(self.dataframe)
assert result.loaded
return result
@property
def name(self):
"""
Name of this :py:class:`GdxSymbol`
Returns
-------
str
"""
return self._name
@name.setter
def name(self,value):
self._name = value
if self.file is not None:
self.file._fixup_name_keys()
return
@property
def data_type(self):
"""
GAMS data type of this :py:class:`GdxSymbol`
Returns
-------
:py:class:`GamsDataType`
"""
return self._data_type
@data_type.setter
def data_type(self, value):
if not self.loaded or self.num_records > 0:
raise Error("Cannot change the data_type of a GdxSymbol that is yet to be read for file or contains records.")
self._data_type = GamsDataType(value)
self.variable_type = None
self.equation_type = None
self._init_dataframe()
return
@property
def variable_type(self):
"""
Only not none if :py:attr:`data_type` == :py:attr:`GamsDataType.Variable`
Returns
-------
None or :py:attr:`GamsDataType.Variable`
"""
return self._variable_type
@variable_type.setter
def variable_type(self,value):
if self.data_type == GamsDataType.Variable:
if value is None:
# default to Free
self._variable_type = GamsVariableType.Free
else:
try:
self._variable_type = GamsVariableType(value)
except:
if isinstance(self._variable_type,GamsVariableType):
logger.warning("Ignoring invalid GamsVariableType request '{}'.".format(value))
return
logger.debug("Setting variable_type to {}.".format(GamsVariableType.Free))
self._variable_type = GamsVariableType.Free
return
assert self.data_type != GamsDataType.Variable
if value is not None:
logger.warning("GdxSymbol is not a Variable, so setting variable_type to None")
self._variable_type = None
@property
def equation_type(self):
"""
Only not none if :py:attr:`data_type` == :py:attr:`GamsDataType.Equation`
Returns
-------
None or :py:attr:`GamsDataType.Equation`
"""
return self._equation_type
@equation_type.setter
def equation_type(self,value):
if self.data_type == GamsDataType.Equation:
if value is None:
# default to Equality
self._equation_type = GamsEquationType.Equality
else:
try:
self._equation_type = GamsEquationType(value)
except:
if isinstance(self._equation_type,GamsEquationType):
logger.warning("Ignoring invalid GamsEquationType request '{}'.".format(value))
return
logger.debug("Setting equation_type to {}.".format(GamsEquationType.Equality))
self._equation_type = GamsEquationType.Equality
return
assert self.data_type != GamsDataType.Equation
if value is not None:
logger.warning("GdxSymbol is not an Equation, so setting equation_type to None")
self._equation_type = None
@property
def value_cols(self):
"""
List of (name, GamsValueType.value) tuples that describe the
value columns in the dataframe, that is, the columns that follow the
self.dims columns.
Returns
-------
list of (str, int)
"""
return GAMS_VALUE_COLS_MAP[self.data_type]
@property
def value_col_names(self):
"""
List of value column names, that is, the columns that follow the self.dims columns.
Returns
-------
list of str
"""
return [col_name for col_name, col_ind in self.value_cols]
[docs] def get_value_col_default(self,value_col_name):
if not value_col_name in self.value_col_names:
raise Error(f"{value_col_name} is not one of the value columns for "
f"this GdxSymbol, which is a {self.data_type}")
value_col = GamsValueType(value_col_name)
if self.data_type == GamsDataType.Set:
assert value_col == GamsValueType.Level
return c_bool(True)
if (self.data_type == GamsDataType.Variable) and (
(value_col == GamsValueType.Lower) or
(value_col == GamsValueType.Upper)):
lb_default, ub_default = GAMS_VARIABLE_DEFAULT_LOWER_UPPER_BOUNDS[self.variable_type]
if value_col == GamsValueType.Lower:
return lb_default
else:
assert value_col == GamsValueType.Upper
return ub_default
return GAMS_VALUE_DEFAULTS[value_col]
@property
def file(self):
"""
:py:class:`GdxFile` file that contains this :py:class:`GdxSymbol`, if any
Returns
-------
None or :py:class:`GdxFile`
"""
return self._file
@property
def index(self):
"""
Index of this :py:class:`GdxSymbol` in its :py:class:`GdxFile`, if any
Returns
-------
None or int
"""
return self._index
@property
def loaded(self):
"""
Whether the data for this symbol has been loaded
Returns
-------
bool
"""
return self._loaded
@property
def full_typename(self):
if self.data_type == GamsDataType.Parameter and self.dims == 0:
return 'Scalar'
elif self.data_type == GamsDataType.Variable:
return self.variable_type.name + " " + self.data_type.name
return self.data_type.name
@property
def dims(self):
"""
List of dimension names over which this symbol is defined. If the :py:class:`GdxSymbol` was
constructed with dims set to an integer, all dimension names will be the wildcard '*'.
Returns
-------
list of str
length of list is equal to :py:attr:`num_dims`
"""
return self._dims
@dims.setter
def dims(self, value):
if (self._dims is not None) and (self.loaded and ((self.num_dims > 0) or (self.num_records > 0))):
if not isinstance(value,list) or len(value) != self.num_dims:
raise Error(f"Cannot set dims to {value}, because the number of "
"dimensions has already been set to {self.num_dims}.")
if isinstance(value, int):
self._dims = ['*'] * value
self._init_dataframe()
return
if not isinstance(value, list):
raise Error('dims must be an int or a list. Was passed {} of type {}.'.format(value, type(value)))
for dim in value:
if not isinstance(dim, str):
raise Error('Individual dimensions must be denoted by strings. Was passed {} as element of {}.'.format(dim, value))
assert (self._dims is None) or (self.loaded and (self.num_dims == 0) and (self.num_records == 0)) or (len(value) == self.num_dims)
self._dims = value
if self.loaded and self.num_records > 0:
self._dataframe.columns = self.dims + self.value_col_names
return
self._init_dataframe()
@property
def num_dims(self):
"""
Number of dimensions over which this symbol is defined
Returns
-------
int
"""
return len(self.dims)
@property
def dataframe(self):
"""
Data table for this symbol. Dim columns are followed by value columns, left to right.
Returns
-------
pd.DataFrame
"""
return self._dataframe
@dataframe.setter
def dataframe(self, data):
try:
# get data in common format and start dealing with dimensions
if isinstance(data, pd.DataFrame):
df = data.copy()
has_col_names = True
else:
df = pd.DataFrame(data)
has_col_names = False
if df.empty:
# clarify dimensionality, as needed for loading empty GdxSymbols
df = pd.DataFrame(data,columns=self.dims + self.value_cols)
# finish handling dimensions
n = len(df.columns)
if (self.num_dims > 0) or (self.num_records > 0):
if not ((n == self.num_dims) or (n == self.num_dims + len(self.value_cols))):
raise Error("Cannot set dataframe to {} because the number ".format(df.head()) + \
"of dimensions would change. This symbol has {} ".format(self.num_dims) + \
"dimensions, currently represented by {}.".format(self.dims))
num_dims = self.num_dims
else:
# num_dims not explicitly established yet. in this case we must
# assume value columns have been provided or dimensionality is 0
num_dims = max(n - len(self.value_cols),0)
if (num_dims == 0) and (n < len(self.value_cols)):
raise Error("Cannot set dataframe to {} because the number ".format(df.head()) + \
"of dimensions cannot be established consistent with {}.".format(self))
if self.loaded and (num_dims > 0):
logger.warning('Inferring {} to have {} dimensions. '.format(self.name,num_dims) +
'Recommended practice is to explicitly set gdxpds.gdx.GdxSymbol dims in the constructor.')
replace_dims = True
if has_col_names:
dim_cols = list(df.columns)[:num_dims]
elif self.num_dims == num_dims:
dim_cols = self.dims
replace_dims = False
else:
dim_cols = ['*'] * num_dims
for col in dim_cols:
if not isinstance(col, str):
replace_dims = False
logger.info("Not using dataframe column names to set dimensions because {} is not a string.".format(col))
if num_dims != self.num_dims:
self.dims = num_dims
break
if replace_dims:
self.dims = dim_cols
# all done establishing dimensions
assert self.num_dims == num_dims
# finalize the dataframe
if n == self.num_dims:
self._append_default_values(df)
df.columns = self.dims + self.value_col_names
self._dataframe = df
except Exception:
logger.error("Unable to set dataframe for {} to\n{}\n\nIn process dataframe: {}".format(self,data,self._dataframe))
raise
if self.data_type == GamsDataType.Set:
self._fixup_set_value()
return
def _init_dataframe(self):
self._dataframe = pd.DataFrame([],columns=self.dims + self.value_col_names)
if self.data_type == GamsDataType.Set:
colname = self._dataframe.columns[-1]
replace_df_column(self._dataframe,colname,self._dataframe[colname].astype(c_bool))
return
def _append_default_values(self,df):
assert len(df.columns) == self.num_dims
logger.debug("Applying default values to create valid dataframe for '{self.name}'.")
for value_col_name in self.value_col_names:
df[value_col_name] = self.get_value_col_default(value_col_name)
def _fixup_set_value(self):
"""
Tricky to get boolean set values to come through right.
isinstance(True,Number) == True and float(True) = 1, but
isinstance(c_bool(True),Number) == False, and this keeps the default
value of 0.0.
Could just test for isinstance(,bool), but this fix has the added
advantage of speaking the GDX bindings data type language, and also
fills in any missing values, so users no longer need to actually specify
self.dataframe['Value'] = True.
"""
assert self.data_type == GamsDataType.Set
colname = self._dataframe.columns[-1]
assert colname == self.value_col_names[0], f"Unexpected final column {colname!r} in Set dataframe"
if self._dataframe[colname].isnull().values.any():
logger.warning(f"Filling null values in {self} with True. To be "
"filled:\n{self._dataframe[self._dataframe[colname].isnull()]}")
replace_df_column(self._dataframe, colname, self._dataframe[colname].fillna(value=True))
if self._fixup_set_vals:
replace_df_column(self._dataframe,colname,self._dataframe[colname].apply(lambda x: c_bool(x)))
self._fixup_set_vals = True
return
@property
def num_records(self):
"""
Number of rows in the data table, per the DataFrame if :py:attr:`loaded`, or per GAMS.
Returns
-------
int
"""
if self.loaded:
return len(self.dataframe.index)
return self._num_records
def __repr__(self):
return "GdxSymbol({},{},{},file={},index={},description={},variable_type={},equation_type={})".format(
repr(self.name),
repr(self.data_type),
repr(self.dims),
repr(self.file),
repr(self.index),
repr(self.description),
repr(self.variable_type),
repr(self.equation_type))
def __str__(self):
s = self.name
s += ", " + self.description
s += ", " + self.full_typename
s += ", {} records".format(self.num_records)
s += ", {} dims {}".format(self.num_dims, self.dims)
s += ", loaded" if self.loaded else ", not loaded"
return s
[docs] def load(self, load_set_text=False):
"""
Loads this :py:class:`GdxSymbol` from its :py:attr:`file`, thereby popluating
:py:attr:`dataframe`.
Parameters
----------
load_set_text : bool
If True (default is False) and this symbol is a :class:`GamsDataType.Set <GamsDataType>`,
loads the GDX Text field into the :py:attr:`dataframe` rather than a `c_bool`.
"""
if self.loaded:
logger.info("Nothing to do. Symbol already loaded.")
return
if not self.file:
raise Error("Cannot load {} because there is no file pointer".format(repr(self)))
if not self.index:
raise Error("Cannot load {} because there is no symbol index".format(repr(self)))
if self.data_type == GamsDataType.Parameter and HAVE_GDX2PY:
self.dataframe = gdx2py.par2list(self.file.filename,self.name)
self._loaded = True
return
_ret, records = gdxcc.gdxDataReadStrStart(self.file.H,self.index)
def reader():
handle = self.file.H
for i in range(records):
yield gdxcc.gdxDataReadStr(handle)
vc = self.value_cols # do this for speed in the next line
if load_set_text and (self.data_type == GamsDataType.Set):
data = [elements + [gdxcc.gdxGetElemText(self.file.H,int(values[col_ind]))[1]
for _col_name, col_ind in vc]
for _ret, elements, values, _afdim in reader()]
self._fixup_set_vals = False
else:
data = [elements + [values[col_ind] for col_name, col_ind in vc] for ret, elements, values, afdim in reader()]
self.dataframe = data
if not self.data_type in (GamsDataType.Set, GamsDataType.Alias):
self.dataframe = special.convert_gdx_to_np_svs(self.dataframe, self.num_dims)
self._loaded = True
return
[docs] def unload(self):
"""
Drops this :py:class:`GdxSymbol`'s :py:attr:`dataframe`
"""
self.dataframe = None
self._loaded = False
[docs] def write(self,index=None):
"""
Writes this :py:class:`GdxSymbol` to its :py:attr:`file`
"""
if not self.loaded:
raise Error(f"Cannot write unloaded symbol {self.name!r}.")
if self.file is None:
raise Error(f"Cannot write {self!r} because there is no file pointer")
if self.data_type == GamsDataType.Set:
self._fixup_set_value()
if index is not None:
self._index = index
if self.index == 0:
# universal set
gdxcc.gdxUELRegisterRawStart(self.file.H)
gdxcc.gdxUELRegisterRaw(self.file.H,self.name)
gdxcc.gdxUELRegisterDone(self.file.H)
return
# write the data
userinfo = 0
if self.variable_type is not None:
userinfo = self.variable_type.value
elif self.equation_type is not None:
userinfo = self.equation_type.value
if not gdxcc.gdxDataWriteStrStart(self.file.H,
self.name,
self.description,
self.num_dims,
self.data_type.value,
userinfo):
raise GdxError(self.file.H,"Could not start writing data for symbol {}".format(repr(self.name)))
# set domain information
if self.num_dims > 0:
if self.index:
if not gdxcc.gdxSymbolSetDomainX(self.file.H,self.index,self.dims):
raise GdxError(self.file.H,"Could not set domain information for {}. Domains are {}".format(repr(self.name),repr(self.dims)))
else:
logger.info("Not writing domain information because symbol index is unknown.")
values = gdxcc.doubleArray(gdxcc.GMS_VAL_MAX)
# make sure index is clean -- needed for merging in convert_np_to_gdx_svs
self.dataframe = self.dataframe.reset_index(drop=True)
# convert special numeric values if appropriate
to_write = self.dataframe.copy() if (self.data_type in (GamsDataType.Set, GamsDataType.Alias)) else special.convert_np_to_gdx_svs(self.dataframe, self.num_dims)
# write each row
for row in to_write.itertuples(index=False, name=None):
dims = [str(x) for x in row[:self.num_dims]]
vals = row[self.num_dims:]
for _col_name, col_ind in self.value_cols:
values[col_ind] = float(0.0)
try:
if isinstance(vals[col_ind],Number):
values[col_ind] = float(vals[col_ind])
except:
raise Error("Unable to set element {} from {}.".format(col_ind,vals))
gdxcc.gdxDataWriteStr(self.file.H,dims,values)
gdxcc.gdxDataWriteDone(self.file.H)
return
# ------------------------------------------------------------------------------
# Helper functions
# ------------------------------------------------------------------------------
[docs]def append_set(gdx_file, set_name, df, cols=None, dim_names=None,
description=None):
"""
Convenience function that appends set_name to gdx_file as a
:class:`GamsDataType.Set <GamsDataType>` :class:`GdxSymbol` using data in
df.
Parameters
----------
gdx_file : :class:`GdxFile`
file to which new :class:`GdxSymbol` is to be added
set_name : str
name of the :class:`GdxSymbol` to be added
df : pandas.DataFrame
dataframe or data that can be used to construct a dataframe containing
the set data. assumes that all columns define dimensions (there is no
'Value' column)
cols : None or list of str
if not None, these are the columns in df to be used for the set
definition
dim_names : None or list of str
if provided, the columns of a copy of df (or of df[cols]) will be renamed
to these names, because the dimension names are taken from the final
dataframe, these will also be the dimension names
description : None or str
passed directly to :class:`GdxSymbol`
"""
# ensure df is DataFrame and not Series
logger.debug(f"Defining set {set_name!r} based on:\n{df!r}")
tmp = pd.DataFrame(df)
# select down to data we actually want
if cols is not None:
tmp = tmp[cols]
if dim_names is not None:
if tmp.empty:
tmp = pd.DataFrame([], columns = dim_names)
else:
tmp.columns = dim_names
# define the symbol
gdx_file.append(GdxSymbol(set_name, GamsDataType.Set,
dims = list(tmp.columns), description = description))
# define the data for the symbol
gdx_file[-1].dataframe = tmp
# debug description of what happened
logger.debug(f"Added set {set_name!r} to {gdx_file!r} using processed data:\n{tmp!r}")
return
[docs]def append_parameter(gdx_file, param_name, df, cols=None, dim_names=None,
description=None):
"""
Convenience function that appends param_name to gdx_file as a
:class:`GamsDataType.Parameter <GamsDataType>` :class:`GdxSymbol` using
data in df.
Parameters
----------
gdx_file : :class:`GdxFile`
file to which new :class:`GdxSymbol` is to be added
param_name : str
name of the :class:`GdxSymbol` to be added
df : pandas.DataFrame
dataframe or data that can be used to construct a dataframe containing
the parameter data. assumes that the last selected column is the 'Value'
column
cols : None or list of str
if not None, these are the columns in df to be used for the parameter
definition (dimension columns followed by value column)
dim_names : None or list of str
if provided, the columns of a copy of df (or of df[cols]) will be renamed
to these names + ['Value']. because the dimension names are taken from
the final dataframe, these will also be the dimension names
description : None or str
passed directly to :class:`GdxSymbol`
"""
# pre-process the data
logger.debug(f"Defining parameter {param_name!r} based on:\n{df!r}")
tmp = pd.DataFrame(df)
if cols is not None:
tmp = tmp[cols]
if dim_names is not None:
if tmp.empty:
tmp = pd.DataFrame([], columns = dim_names + ['Value'])
else:
tmp.columns = dim_names + ['Value']
# define the symbol
gdx_file.append(GdxSymbol(param_name, GamsDataType.Parameter,
dims = list(tmp.columns)[:-1], description = description))
# define the data for the symbol
gdx_file[-1].dataframe = tmp
# debug descripton of what happened
logger.debug(f"Added parameter {param_name!r} to {gdx_file!r} using processed data:\n{tmp!r}")
return