"""
Designs for technologies.
"""
import os
import sys
import importlib as il
import numpy as np
import pandas as pd
from .DataManager import DesignsDataset, FunctionsDataset, IndicesDataset, ParametersDataset, ResultsDataset
from .Distributions import parse_distribution
from .IO import check_tables
from .Types import Functions, Indices, Inputs, Results
[docs]def sampler(x, sample_count):
"""
Sample from an array.
Parameters
----------
x : array
The array.
sample_count : int
The sample size.
"""
if sample_count == 1:
it = np.nditer(
[x, np.empty(x.shape + (1,), dtype=np.float64)],
flags=["refs_ok"],
op_flags=[["readonly"], ["writeonly", "allocate"]],
op_axes=[list(range(x.ndim)) + [-1], None],
itershape=x.shape+(1,)
)
for x, y in it:
y[()] = x[()].rvs()
return it.operands[1]
else:
it = np.nditer(
[x, np.empty(x.shape + (sample_count,), dtype=np.float64)],
flags=["refs_ok", "external_loop"],
op_flags=[["readonly"], ["writeonly", "allocate"]],
op_axes=[list(range(x.ndim)) + [-1], None],
itershape=x.shape+(sample_count,)
)
for x, y in it:
y[...] = x[0].rvs(sample_count)
return it.operands[1]
[docs]class Designs:
"""
Designs for a technology.
Attributes
----------
indices : DataFrame
The *indices* table.
functions : DataFrame
The *functions* table.
designs : DataFrame
The *designs* table.
parameters : DataFrame
The *parameters* table.
results : DataFrame
The *results* table.
"""
def __init__(
self ,
path = None ,
name = 'technology.xlsx',
uncertain = True ,
):
"""
Parameters
----------
path : str
Location of the data files.
name : str
Filename where decision context datasets are kept in separate sheets.
uncertain : Boolean
Flag indicating whether probability distributions are present in the *designs* or *parameters* tables.
indices : str
Sheet name for the *indices* table.
functions : str
Sheet name for the *functions* table.
designs : str
Sheet name for the *designs* table.
parameters : str
Sheet name for the *parameters* table.
results : str
Sheet name for the *results* table.
"""
self.uncertain = uncertain
if not os.path.isfile(os.path.join(path, name)):
raise Exception(f"Designs: {os.path.join(path, name)} does not exist.")
if not check_tables(path, name):
raise Exception(f'Designs: {name} failed validation.')
self.indices = IndicesDataset( os.path.join(path, name)).sort_index()
self.functions = FunctionsDataset( os.path.join(path, name)).sort_index()
self.designs = DesignsDataset( os.path.join(path, name)).sort_index()
self.parameters = ParametersDataset( os.path.join(path, name)).sort_index()
self.results = ResultsDataset( os.path.join(path, name)).sort_index()
[docs] def vectorize_technologies(self):
"""
Make an array of technologies.
"""
return self.designs.reset_index(
["Scenario", "Variable", "Index"]
).sort_index(
).index.drop_duplicates(
).values
[docs] def vectorize_scenarios(self, technology):
"""
Make an array of scenarios.
"""
return self.designs.xs(technology
).reset_index(
["Variable", "Index"]
).sort_index(
).index.drop_duplicates(
).values
def _vectorize_indices(self, technology):
def extract_indices(index):
return self.indices.xs(
(technology, index)
).sort_values(
by="Offset"
)[["Offset"]]
return Indices(
capital = extract_indices("Capital"),
input = extract_indices("Input" ),
output = extract_indices("Output" ),
metric = extract_indices("Metric" ),
)
[docs] def vectorize_indices(self, technology):
"""
Make an array of indices.
"""
vectors = self._vectorize_indices(technology)
return Indices(
capital = vectors.capital.index.values,
input = vectors.input.index.values ,
output = vectors.output.index.values ,
metric = vectors.metric.index.values ,
)
[docs] def vectorize_designs(self, technology, scenario_count, sample_count=1):
"""
Make an array of designs.
"""
def extract_designs(variable):
return self.compiled_designs.xs(
(technology, variable),
level=["Technology", "Variable"]
)[["Distribution"]]
lifetimes = extract_designs("Lifetime" )
scales = extract_designs("Scale" )
inputs = extract_designs("Input" )
input_efficiencies = extract_designs("Input efficiency" )
input_prices = extract_designs("Input price" )
output_efficiencies = extract_designs("Output efficiency")
output_prices = extract_designs("Output price" )
all_indices = self._vectorize_indices(technology)
def join(values, offsets):
return values.join(
offsets
).reorder_levels(
[1, 0]
).reset_index(
).sort_values(
by=["Offset", "Scenario"]
)[
"Distribution"
].values.reshape(
(offsets.shape[0], scenario_count)
)
# If the technology model has probability distributions, sample them
if self.uncertain:
return Inputs(
scale = sampler(scales["Distribution"].values , sample_count),
lifetime = sampler(join(lifetimes , all_indices.capital), sample_count),
input = sampler(join(inputs , all_indices.input ), sample_count),
input_efficiency = sampler(join(input_efficiencies , all_indices.input ), sample_count),
input_price = sampler(join(input_prices , all_indices.input ), sample_count),
output_efficiency = sampler(join(output_efficiencies, all_indices.output ), sample_count),
output_price = sampler(join(output_prices , all_indices.output ), sample_count),
)
else:
# Otherwise, just sample once to get floats and then duplicate the data for later merging
# with the tranche samples
return Inputs(
scale = np.tile(sampler(scales["Distribution"].values , 1), sample_count),
lifetime = np.tile(sampler(join(lifetimes , all_indices.capital), 1), sample_count),
input = np.tile(sampler(join(inputs , all_indices.input ), 1), sample_count),
input_efficiency = np.tile(sampler(join(input_efficiencies , all_indices.input ), 1), sample_count),
input_price = np.tile(sampler(join(input_prices , all_indices.input ), 1), sample_count),
output_efficiency = np.tile(sampler(join(output_efficiencies, all_indices.output ), 1), sample_count),
output_price = np.tile(sampler(join(output_prices , all_indices.output ), 1), sample_count),
)
[docs] def vectorize_parameters(self, technology, scenario_count, sample_count=1):
"""
Make an array of parameters.
"""
x = self.compiled_parameters.xs(
technology
).reset_index(
).sort_values(
by=["Offset", "Scenario"]
)["Distribution"].values
return sampler(
x.reshape((-1, scenario_count)),
sample_count
)
[docs] def compile(self):
"""
Compile the production and metrics functions.
"""
self.compiled_functions = {}
for technology, metadata in self.functions.iterrows():
m = il.import_module("." + metadata["Model"], package="technology")
self.compiled_functions[technology] = Functions(
style = metadata["Style" ] ,
capital = eval("m." + metadata["Capital" ]),
fixed = eval("m." + metadata["Fixed" ]),
production = eval("m." + metadata["Production"]),
metric = eval("m." + metadata["Metrics" ]),
)
self.compiled_designs = self.designs.copy()
self.compiled_designs["Distribution"] = self.compiled_designs["Value"].apply(parse_distribution)
self.compiled_parameters = self.parameters.copy()
self.compiled_parameters["Distribution"] = self.compiled_parameters["Value"].apply(parse_distribution)
[docs] def evaluate(self, technology, sample_count=1):
"""
Evaluate the performance of a technology.
Parameters
----------
technology : str
The name of the technology.
sample_count : int
The number of random samples.
"""
print(f"Evaluating {technology}")
f_capital = self.compiled_functions[technology].capital
f_fixed = self.compiled_functions[technology].fixed
f_production = self.compiled_functions[technology].production
f_metrics = self.compiled_functions[technology].metric
indices = self.vectorize_indices(technology)
scenarios = self.vectorize_scenarios(technology)
n = scenarios.shape[0]
design = self.vectorize_designs( technology, n, sample_count)
parameter = self.vectorize_parameters(technology, n, sample_count)
capital_cost = f_capital(design.scale, parameter)
fixed_cost = f_fixed (design.scale, parameter)
input_raw = design.input
input = design.input_efficiency * input_raw
output_raw = f_production(design.scale, capital_cost, design.lifetime, fixed_cost, input, parameter)
output = design.output_efficiency * output_raw
cost = np.sum(capital_cost / design.lifetime, axis=0) / design.scale + \
np.sum(fixed_cost, axis=0) / design.scale + \
np.sum(design.input_price * input , axis=0) - \
np.sum(design.output_price * output, axis=0)
metric = f_metrics(design.scale, capital_cost, design.lifetime, fixed_cost, input_raw, input, design.input_price, output_raw, output, cost, parameter)
def organize(df, ix):
ix1 = pd.MultiIndex.from_product(
[ix, scenarios, range(1, sample_count + 1)],
names=["Index", "Scenario", "Sample"]
)
df1 = pd.DataFrame({"Value" : df.flatten()}, index=ix1)
df1["Technology"] = technology
return df1.set_index(
["Technology"],
append=True
).reorder_levels(
["Technology", "Scenario", "Sample", "Index"]
).sort_index()
return Results(
cost = organize(cost.reshape(cost.shape + (1,)), ["Cost"] ),
output = organize(output , indices.output),
metric = organize(metric , indices.metric),
)
[docs] def evaluate_scenarios(self, sample_count=1):
"""
Evaluate scenarios.
Parameters
----------
sample_count : int
The number of random samples.
"""
costs = pd.DataFrame()
outputs = pd.DataFrame()
metrics = pd.DataFrame()
for technology in self.vectorize_technologies():
result = self.evaluate(technology, sample_count)
costs = pd.concat([costs, result.cost])
outputs = pd.concat([outputs, result.output])
metrics = pd.concat([metrics, result.metric])
def organize(variable, values):
return self.results.xs(
variable,
level="Variable",
drop_level=False
).join(
values
).reorder_levels(
["Technology", "Scenario", "Sample", "Variable", "Index"]
)[["Value", "Units"]]
return pd.concat([
organize("Cost", costs),
organize("Output", outputs),
organize("Metric", metrics)
]).sort_index()