import logging
import os
import subprocess as subp
import gdxpds.gdx
import gdxpds.special
from gdxpds.test import base_dir, run_dir
from gdxpds.test.test_session import manage_rundir
from gdxpds.test.test_conversions import roundtrip_one_gdx
import gdxcc
import numpy as np
import pandas as pd
import pytest
logger = logging.getLogger(__name__)
[docs]def value_column_index(sym,gams_value_type):
for i, val in enumerate(sym.value_cols):
if val[1] == gams_value_type.value:
break
return len(sym.dims) + i
[docs]def test_roundtrip_just_special_values(manage_rundir):
outdir = os.path.join(run_dir,'special_values')
if not os.path.exists(outdir):
os.mkdir(outdir)
# create gdx file containing all special values
with gdxpds.gdx.GdxFile() as f:
df = pd.DataFrame([['sv' + str(i+1), gdxpds.special.SPECIAL_VALUES[i]] for i in range(gdxcc.GMS_SVIDX_MAX-2)],
columns=['sv','Value'])
logger.info("Special values are:\n{}".format(df))
# save this directly as a GdxSymbol
filename = os.path.join(outdir,'direct_write_special_values.gdx')
ret = gdxcc.gdxOpenWrite(f.H,filename,"gdxpds")
if not ret:
raise gdxpds.gdx.GdxError(f.H,"Could not open {} for writing. Consider cloning this file (.clone()) before trying to write".format(repr(filename)))
# write the universal set
f.universal_set.write()
if not gdxcc.gdxDataWriteStrStart(f.H,
'special_values',
'',
1,
gdxpds.gdx.GamsDataType.Parameter.value,
0):
raise gdxpds.gdx.GdxError(f.H,"Could not start writing data for symbol special_values")
# set domain information
if not gdxcc.gdxSymbolSetDomainX(f.H,1,[df.columns[0]]):
raise gdxpds.gdx.GdxError(f.H,"Could not set domain information for special_values.")
values = gdxcc.doubleArray(gdxcc.GMS_VAL_MAX)
for row in df.itertuples(index=False,name=None):
dims = [str(x) for x in row[:1]]
vals = row[1:]
for _col_name, col_ind in gdxpds.gdx.GAMS_VALUE_COLS_MAP[gdxpds.gdx.GamsDataType.Parameter]:
values[col_ind] = float(vals[col_ind])
gdxcc.gdxDataWriteStr(f.H,dims,values)
gdxcc.gdxDataWriteDone(f.H)
gdxcc.gdxClose(f.H)
# general test for expected values
def check_special_values(gdx_file):
df = gdx_file['special_values'].dataframe
for i, val in enumerate(df['Value'].values):
assert gdxpds.special.pd_val_equal(val, gdxpds.special.NUMPY_SPECIAL_VALUES[i])
# now roundtrip it gdx-only
with gdxpds.gdx.GdxFile(lazy_load=False) as f:
f.read(filename)
check_special_values(f)
with f.clone() as g:
rt_filename = os.path.join(outdir,'roundtripped.gdx')
g.write(rt_filename)
with gdxpds.gdx.GdxFile(lazy_load=False) as g:
g.read(filename)
check_special_values(g)
# now roundtrip it through csv
roundtripped_gdx = roundtrip_one_gdx(filename,'roundtrip_just_special_values')
with gdxpds.gdx.GdxFile(lazy_load=False) as h:
h.read(roundtripped_gdx)
check_special_values(h)
[docs]def test_roundtrip_special_values(manage_rundir):
filename = 'OptimalCSPConfig_Out.gdx'
original_gdx = os.path.join(base_dir,filename)
roundtripped_gdx = roundtrip_one_gdx(filename,'roundtrip_special_values')
data = []
for gdx_file in [original_gdx, roundtripped_gdx]:
with gdxpds.gdx.GdxFile(lazy_load=False) as gdx:
data.append([])
gdx.read(gdx_file)
sym = gdx['calculate_capacity_value']
assert sym.data_type == gdxpds.gdx.GamsDataType.Equation
val = sym.dataframe.iloc[0,value_column_index(sym,gdxpds.gdx.GamsValueType.Marginal)]
assert gdxpds.special.is_np_sv(val)
data[-1].append(val)
sym = gdx['CapacityValue']
assert sym.data_type == gdxpds.gdx.GamsDataType.Variable
val = sym.dataframe.iloc[0,value_column_index(sym,gdxpds.gdx.GamsValueType.Upper)]
assert gdxpds.special.is_np_sv(val)
data[-1].append(val)
data = list(zip(*data))
for pt in data:
for i in range(1,len(pt)):
assert (pt[i] == pt[0]) or (np.isnan(pt[i]) and np.isnan(pt[0]))
[docs]def test_special_integrity():
"""
Check that the special values line up
"""
assert all(sv in gdxpds.special.GDX_TO_NP_SVS for sv in gdxpds.special.SPECIAL_VALUES)
assert all(sv in gdxpds.special.NP_TO_GDX_SVS for sv in gdxpds.special.NUMPY_SPECIAL_VALUES)
for val in gdxpds.special.SPECIAL_VALUES:
assert gdxpds.special.NP_TO_GDX_SVS[gdxpds.special.GDX_TO_NP_SVS[val]] == val
for val in gdxpds.special.NUMPY_SPECIAL_VALUES:
# Can't use "==", as None != NaN
assert gdxpds.special.pd_val_equal(gdxpds.special.GDX_TO_NP_SVS[gdxpds.special.NP_TO_GDX_SVS[val]], val)
[docs]def test_numpy_eps():
assert(gdxpds.special.is_np_eps(np.finfo(float).eps))
assert(not gdxpds.special.is_np_eps(float(0.0)))
assert(not gdxpds.special.is_np_eps(2.0 * np.finfo(float).eps))
[docs]def test_convert_np_to_gdx_svs_eps():
test_df = pd.DataFrame([["a", np.finfo(float).eps],
["b", 0.0],
["c", 2.0 * np.finfo(float).eps]],
columns=["A", "Value"])
result_df = gdxpds.special.convert_np_to_gdx_svs(test_df, num_dims=1)
expected_df = pd.Series([gdxpds.special.SPECIAL_VALUES[4], 0.0, 2.0 * np.finfo(float).eps])
assert result_df["Value"].equals(expected_df)