Source code for reV.bespoke.place_turbines

# -*- coding: utf-8 -*-
# pylint: disable=inconsistent-return-statements
"""
place turbines for bespoke wind plants
"""
import numpy as np
from shapely.geometry import MultiPoint, MultiPolygon, Point, Polygon

from reV.bespoke.gradient_free import GeneticAlgorithm
from reV.bespoke.pack_turbs import PackTurbines
from reV.utilities.exceptions import WhileLoopPackingError


[docs]def none_until_optimized(func): """Decorator that returns None until `PlaceTurbines` is optimized. Meant for exclusive use in `PlaceTurbines` and its subclasses. `PlaceTurbines` is considered optimized when its `optimized_design_variables` attribute is not `None`. Parameters ---------- func : callable A callable function that should return `None` until `PlaceTurbines` is optimized. Returns ------- callable New function that returns `None` until `PlaceTurbines` is optimized. """ def _func(pt): """Wrapper to return `None` if `PlaceTurbines` is not optimized""" if pt.optimized_design_variables is None: return return func(pt) return _func
[docs]class PlaceTurbines: """Framework for optimizing turbine locations for site specific exclusions, wind resources, and objective """ def __init__(self, wind_plant, objective_function, capital_cost_function, fixed_operating_cost_function, variable_operating_cost_function, balance_of_system_cost_function, include_mask, pixel_side_length, min_spacing, wake_loss_multiplier=1): """ Parameters ---------- wind_plant : WindPowerPD wind plant object to analyze wind plant performance. This object should have everything in the plant defined, such that only the turbine coordinates and plant capacity need to be defined during the optimization. objective_function : str The objective function of the optimization as a string, should return the objective to be minimized during layout optimization. Variables available are: - ``n_turbines``: the number of turbines - ``system_capacity``: wind plant capacity - ``aep``: annual energy production - ``avg_sl_dist_to_center_m``: Average straight-line distance to the supply curve point center from all turbine locations (in m). Useful for computing plant BOS costs. - ``avg_sl_dist_to_medoid_m``: Average straight-line distance to the medoid of all turbine locations (in m). Useful for computing plant BOS costs. - ``nn_conn_dist_m``: Total BOS connection distance using nearest-neighbor connections. This variable is only available for the ``balance_of_system_cost_function`` equation. - ``fixed_charge_rate``: user input fixed_charge_rate if included as part of the sam system config. - ``capital_cost``: plant capital cost as evaluated by `capital_cost_function` - ``fixed_operating_cost``: plant fixed annual operating cost as evaluated by `fixed_operating_cost_function` - ``variable_operating_cost``: plant variable annual operating cost as evaluated by `variable_operating_cost_function` - ``balance_of_system_cost``: plant balance of system cost as evaluated by `balance_of_system_cost_function` - ``self.wind_plant``: the SAM wind plant object, through which all SAM variables can be accessed capital_cost_function : str The plant capital cost function as a string, must return the total capital cost in $. Has access to the same variables as the objective_function. fixed_operating_cost_function : str The plant annual fixed operating cost function as a string, must return the fixed operating cost in $/year. Has access to the same variables as the objective_function. variable_operating_cost_function : str The plant annual variable operating cost function as a string, must return the variable operating cost in $/kWh. Has access to the same variables as the objective_function. You can set this to "0" to effectively ignore variable operating costs. balance_of_system_cost_function : str The plant balance-of-system cost function as a string, must return the variable operating cost in $. Has access to the same variables as the objective_function. You can set this to "0" to effectively ignore balance-of-system costs. include_mask : np.ndarray Supply curve point 2D inclusion mask where included pixels are set to 1 and excluded pixels are set to 0. pixel_side_length : int Side length (m) of a single pixel of the `include_mask`. min_spacing : float The minimum spacing between turbines (in meters). wake_loss_multiplier : float, optional A multiplier used to scale the annual energy lost due to wake losses. **IMPORTANT**: This multiplier will ONLY be applied during the optimization process and will NOT be come through in output values such as aep, any of the cost functions, or even the output objective. """ # inputs self.wind_plant = wind_plant self.capital_cost_function = capital_cost_function self.fixed_operating_cost_function = fixed_operating_cost_function self.variable_operating_cost_function = \ variable_operating_cost_function self.balance_of_system_cost_function = balance_of_system_cost_function self.objective_function = objective_function self.include_mask = include_mask self.pixel_side_length = pixel_side_length self.min_spacing = min_spacing self.wake_loss_multiplier = wake_loss_multiplier # internal variables self.nrows, self.ncols = np.shape(include_mask) self.x_locations = np.array([]) self.y_locations = np.array([]) self.turbine_capacity = \ np.max(self.wind_plant. sam_sys_inputs["wind_turbine_powercurve_powerout"]) self.full_polygons = None self.packing_polygons = None self.optimized_design_variables = None self.safe_polygons = None self._optimized_nn_conn_dist_m = None self.ILLEGAL = ('import ', 'os.', 'sys.', '.__', '__.', 'eval', 'exec') self._preflight(self.objective_function) self._preflight(self.capital_cost_function) self._preflight(self.fixed_operating_cost_function) self._preflight(self.variable_operating_cost_function) self._preflight(self.balance_of_system_cost_function) def _preflight(self, eqn): """Run preflight checks on the equation string.""" for substr in self.ILLEGAL: if substr in str(eqn): msg = ('Will not evaluate string which contains "{}": {}' .format(substr, eqn)) raise ValueError(msg)
[docs] def define_exclusions(self): """From the exclusions data, create a shapely MultiPolygon as self.safe_polygons that defines where turbines can be placed. """ ny, nx = np.shape(self.include_mask) self.safe_polygons = MultiPolygon() side_x = np.arange(nx + 1) * self.pixel_side_length side_y = np.arange(ny, -1, -1) * self.pixel_side_length floored = np.floor(self.include_mask) for i in range(nx): for j in range(ny): if floored[j, i] == 1: added_poly = Polygon(((side_x[i], side_y[j]), (side_x[i + 1], side_y[j]), (side_x[i + 1], side_y[j + 1]), (side_x[i], side_y[j + 1]))) self.safe_polygons = self.safe_polygons.union(added_poly) if self.safe_polygons.area == 0.0: self.full_polygons = MultiPolygon([]) self.packing_polygons = MultiPolygon([]) else: self.full_polygons = self.safe_polygons.buffer(0) # add extra setback to cell boundary minx = 0.0 miny = 0.0 maxx = nx * self.pixel_side_length maxy = ny * self.pixel_side_length minx += self.min_spacing / 2.0 miny += self.min_spacing / 2.0 maxx -= self.min_spacing / 2.0 maxy -= self.min_spacing / 2.0 boundary_poly = \ Polygon(((minx, miny), (minx, maxy), (maxx, maxy), (maxx, miny))) packing_polygons = boundary_poly.intersection(self.full_polygons) if isinstance(packing_polygons, MultiPolygon): self.packing_polygons = packing_polygons elif isinstance(packing_polygons, Polygon): self.packing_polygons = MultiPolygon([packing_polygons]) else: self.packing_polygons = MultiPolygon([])
[docs] def initialize_packing(self): """Run the turbine packing algorithm (maximizing plant capacity) to define potential turbine locations that will be used as design variables in the gentic algorithm. """ packing = PackTurbines(self.min_spacing, self.packing_polygons) nturbs = 1E6 mult = 1.0 iters = 0 while nturbs > 300: iters += 1 if iters > 10000: msg = ('Too many attempts within initialize packing') raise WhileLoopPackingError(msg) packing.clear() packing.min_spacing = self.min_spacing * mult packing.pack_turbines_poly() nturbs = len(packing.turbine_x) mult *= 1.1 self.x_locations = packing.turbine_x self.y_locations = packing.turbine_y
def _sc_center(self): """Supply curve point center. """ ny, nx = np.shape(self.include_mask) cx = nx * self.pixel_side_length / 2 cy = ny * self.pixel_side_length / 2 return cx, cy def _avg_sl_dist_to_cent(self, x_locs, y_locs): """Average straight-line distance to center from turb locations. """ cx, cy = self._sc_center() return np.hypot(x_locs - cx, y_locs - cy).mean() def _avg_sl_dist_to_med(self, x_locs, y_locs): """Average straight-line distance to turbine medoid. """ cx, cy = _turb_medoid(x_locs, y_locs) return np.hypot(x_locs - cx, y_locs - cy).mean() # pylint: disable=W0641,W0123
[docs] def optimization_objective(self, x): """The optimization objective used in the bespoke optimization """ x = [bool(y) for y in x] if len(x) > 0: n_turbines = np.sum(x) x_locs, y_locs = self.x_locations[x], self.y_locations[x] self.wind_plant["wind_farm_xCoordinates"] = x_locs self.wind_plant["wind_farm_yCoordinates"] = y_locs system_capacity = n_turbines * self.turbine_capacity self.wind_plant["system_capacity"] = system_capacity self.wind_plant.assign_inputs() self.wind_plant.execute() aep = self._aep_after_scaled_wake_losses() avg_sl_dist_to_center_m = self._avg_sl_dist_to_cent(x_locs, y_locs) avg_sl_dist_to_medoid_m = self._avg_sl_dist_to_med(x_locs, y_locs) if "nn_conn_dist_m" in self.balance_of_system_cost_function: nn_conn_dist_m = _compute_nn_conn_dist(x_locs, y_locs) else: n_turbines = system_capacity = aep = 0 avg_sl_dist_to_center_m = avg_sl_dist_to_medoid_m = 0 nn_conn_dist_m = 0 fixed_charge_rate = self.fixed_charge_rate capital_cost = eval(self.capital_cost_function, globals(), locals()) fixed_operating_cost = eval(self.fixed_operating_cost_function, globals(), locals()) variable_operating_cost = eval(self.variable_operating_cost_function, globals(), locals()) balance_of_system_cost = eval(self.balance_of_system_cost_function, globals(), locals()) capital_cost *= self.wind_plant.sam_sys_inputs.get( 'capital_cost_multiplier', 1) fixed_operating_cost *= self.wind_plant.sam_sys_inputs.get( 'fixed_operating_cost_multiplier', 1) variable_operating_cost *= self.wind_plant.sam_sys_inputs.get( 'variable_operating_cost_multiplier', 1) balance_of_system_cost *= self.wind_plant.sam_sys_inputs.get( 'balance_of_system_cost_multiplier', 1) objective = eval(self.objective_function, globals(), locals()) return objective
def _aep_after_scaled_wake_losses(self): """AEP after scaling the energy lost due to wake.""" wake_loss_pct = self.wind_plant['wake_losses'] aep = self.wind_plant['annual_energy'] agep = self.wind_plant['annual_gross_energy'] energy_lost_due_to_wake = wake_loss_pct / 100 * agep aep_after_wake_losses = agep - energy_lost_due_to_wake other_losses_multiplier = 1 - aep / aep_after_wake_losses scaled_wake_losses = (self.wake_loss_multiplier * energy_lost_due_to_wake) aep_after_scaled_wake_losses = max(0, agep - scaled_wake_losses) return aep_after_scaled_wake_losses * (1 - other_losses_multiplier)
[docs] def optimize(self, **kwargs): """Optimize wind farm layout. Use a genetic algorithm to optimize wind plant layout for the user-defined objective function. Parameters ---------- **kwargs Keyword arguments to pass to GA initialization. See Also -------- :class:`~reV.bespoke.gradient_free.GeneticAlgorithm` : GA Algorithm. """ nlocs = len(self.x_locations) bits = np.ones(nlocs, dtype=int) bounds = np.zeros((nlocs, 2), dtype=int) bounds[:, 1] = 2 variable_type = np.array([]) for _ in range(nlocs): variable_type = np.append(variable_type, "int") ga_kwargs = { 'max_generation': 10000, 'population_size': 25, 'crossover_rate': 0.2, 'mutation_rate': 0.01, 'tol': 1E-6, 'convergence_iters': 10000, 'max_time': 3600 } ga_kwargs.update(kwargs) ga = GeneticAlgorithm(bits, bounds, variable_type, self.optimization_objective, **ga_kwargs) ga.optimize_ga() optimized_design_variables = ga.optimized_design_variables self.optimized_design_variables = \ [bool(y) for y in optimized_design_variables] self.wind_plant["wind_farm_xCoordinates"] = self.turbine_x self.wind_plant["wind_farm_yCoordinates"] = self.turbine_y self.wind_plant["system_capacity"] = self.capacity
[docs] def place_turbines(self, **kwargs): """Define bespoke wind plant turbine layouts. Run all functions to define bespoke wind plant turbine layouts. Parameters ---------- **kwargs Keyword arguments to pass to GA initialization. See Also -------- :class:`~reV.bespoke.gradient_free.GeneticAlgorithm` : GA Algorithm. """ self.define_exclusions() self.initialize_packing() self.optimize(**kwargs)
[docs] def capital_cost_per_kw(self, capacity_mw): """Capital cost function ($ per kW) evaluated for a given capacity. The capacity will be adjusted to be an exact multiple of the turbine rating in order to yield an integer number of turbines. Parameters ---------- capacity_mw : float The desired capacity (MW) to sample the cost curve at. Note as mentioned above, the capacity will be adjusted to be an exact multiple of the turbine rating in order to yield an integer number of turbines. For best results, set this value to be an integer multiple of the turbine rating. Returns ------- capital_cost : float Capital cost ($ per kW) for the (adjusted) plant capacity. """ fixed_charge_rate = self.fixed_charge_rate avg_sl_dist_to_center_m = self.avg_sl_dist_to_center_m n_turbines = int(round(capacity_mw * 1e3 / self.turbine_capacity)) system_capacity = n_turbines * self.turbine_capacity mult = self.wind_plant.sam_sys_inputs.get( 'capital_cost_multiplier', 1) / system_capacity return eval(self.capital_cost_function, globals(), locals()) * mult
@property def fixed_charge_rate(self): """Fixed charge rate if input to the SAM WindPowerPD object, None if not found in inputs.""" return self.wind_plant.sam_sys_inputs.get("fixed_charge_rate", None) @property @none_until_optimized def turbine_x(self): """This is the final optimized turbine x locations (m)""" return self.x_locations[self.optimized_design_variables] @property @none_until_optimized def turbine_y(self): """This is the final optimized turbine y locations (m)""" return self.y_locations[self.optimized_design_variables] @property @none_until_optimized def avg_sl_dist_to_center_m(self): """This is the final avg straight line distance to center (m)""" return self._avg_sl_dist_to_cent(self.turbine_x, self.turbine_y) @property @none_until_optimized def avg_sl_dist_to_medoid_m(self): """This is the final avg straight line distance to turb medoid (m)""" return self._avg_sl_dist_to_med(self.turbine_x, self.turbine_y) @property @none_until_optimized def nn_conn_dist_m(self): """This is the final avg straight line distance to turb medoid (m)""" if self._optimized_nn_conn_dist_m is None: self._optimized_nn_conn_dist_m = _compute_nn_conn_dist( self.turbine_x, self.turbine_y ) return self._optimized_nn_conn_dist_m @property @none_until_optimized def nturbs(self): """This is the final optimized number of turbines""" return np.sum(self.optimized_design_variables) @property @none_until_optimized def capacity(self): """This is the final optimized plant nameplate capacity (kW)""" return self.turbine_capacity * self.nturbs @property @none_until_optimized def convex_hull(self): """This is the convex hull of the turbine locations""" turbines = MultiPoint([Point(x, y) for x, y in zip(self.turbine_x, self.turbine_y)]) return turbines.convex_hull @property @none_until_optimized def area(self): """This is the area available for wind turbine placement (km^2)""" return self.full_polygons.area / 1e6 @property @none_until_optimized def convex_hull_area(self): """This is the area of the convex hull of the turbines (km^2)""" return self.convex_hull.area / 1e6 @property @none_until_optimized def full_cell_area(self): """This is the full non-excluded area available for wind turbine placement (km^2)""" nx, ny = np.shape(self.include_mask) side_x = nx * self.pixel_side_length side_y = ny * self.pixel_side_length return side_x * side_y / 1e6 @property @none_until_optimized def capacity_density(self): """This is the optimized capacity density of the wind plant defined with the area available after removing the exclusions (MW/km2)""" if self.full_polygons is None or self.capacity is None: return if self.area != 0.0: return self.capacity / self.area / 1E3 return 0.0 @property @none_until_optimized def convex_hull_capacity_density(self): """This is the optimized capacity density of the wind plant defined with the convex hull area of the turbine layout (MW/km2)""" if self.convex_hull_area != 0.0: return self.capacity / self.convex_hull_area / 1E3 return 0.0 @property @none_until_optimized def full_cell_capacity_density(self): """This is the optimized capacity density of the wind plant defined with the full non-excluded area of the turbine layout (MW/km2) """ if self.full_cell_area != 0.0: return self.capacity / self.full_cell_area / 1E3 return 0.0 @property @none_until_optimized def aep(self): """This is the annual energy production of the optimized plant (kWh)""" if self.nturbs <= 0: return 0 self.wind_plant["wind_farm_xCoordinates"] = self.turbine_x self.wind_plant["wind_farm_yCoordinates"] = self.turbine_y self.wind_plant["system_capacity"] = self.capacity self.wind_plant.assign_inputs() self.wind_plant.execute() return self.wind_plant.annual_energy() # pylint: disable=W0641,W0123 @property @none_until_optimized def capital_cost(self): """This is the capital cost of the optimized plant ($)""" fixed_charge_rate = self.fixed_charge_rate n_turbines = self.nturbs system_capacity = self.capacity aep = self.aep avg_sl_dist_to_center_m = self.avg_sl_dist_to_center_m avg_sl_dist_to_medoid_m = self.avg_sl_dist_to_medoid_m nn_conn_dist_m = self.nn_conn_dist_m mult = self.wind_plant.sam_sys_inputs.get( 'capital_cost_multiplier', 1) return eval(self.capital_cost_function, globals(), locals()) * mult # pylint: disable=W0641,W0123 @property @none_until_optimized def fixed_operating_cost(self): """This is the annual fixed operating cost of the optimized plant ($/year)""" fixed_charge_rate = self.fixed_charge_rate n_turbines = self.nturbs system_capacity = self.capacity aep = self.aep avg_sl_dist_to_center_m = self.avg_sl_dist_to_center_m avg_sl_dist_to_medoid_m = self.avg_sl_dist_to_medoid_m nn_conn_dist_m = self.nn_conn_dist_m mult = self.wind_plant.sam_sys_inputs.get( 'fixed_operating_cost_multiplier', 1) return eval(self.fixed_operating_cost_function, globals(), locals()) * mult # pylint: disable=W0641,W0123 @property @none_until_optimized def variable_operating_cost(self): """This is the annual variable operating cost of the optimized plant ($/kWh)""" fixed_charge_rate = self.fixed_charge_rate n_turbines = self.nturbs system_capacity = self.capacity aep = self.aep avg_sl_dist_to_center_m = self.avg_sl_dist_to_center_m avg_sl_dist_to_medoid_m = self.avg_sl_dist_to_medoid_m nn_conn_dist_m = self.nn_conn_dist_m mult = self.wind_plant.sam_sys_inputs.get( 'variable_operating_cost_multiplier', 1) return eval(self.variable_operating_cost_function, globals(), locals()) * mult @property @none_until_optimized def balance_of_system_cost(self): """This is the balance of system cost of the optimized plant ($)""" fixed_charge_rate = self.fixed_charge_rate n_turbines = self.nturbs system_capacity = self.capacity aep = self.aep avg_sl_dist_to_center_m = self.avg_sl_dist_to_center_m avg_sl_dist_to_medoid_m = self.avg_sl_dist_to_medoid_m nn_conn_dist_m = self.nn_conn_dist_m mult = self.wind_plant.sam_sys_inputs.get( 'balance_of_system_cost_multiplier', 1) return eval(self.balance_of_system_cost_function, globals(), locals()) * mult # pylint: disable=W0641,W0123 @property @none_until_optimized def objective(self): """This is the optimized objective function value""" fixed_charge_rate = self.fixed_charge_rate n_turbines = self.nturbs system_capacity = self.capacity aep = self.aep capital_cost = self.capital_cost fixed_operating_cost = self.fixed_operating_cost variable_operating_cost = self.variable_operating_cost balance_of_system_cost = self.balance_of_system_cost avg_sl_dist_to_center_m = self.avg_sl_dist_to_center_m avg_sl_dist_to_medoid_m = self.avg_sl_dist_to_medoid_m nn_conn_dist_m = self.nn_conn_dist_m return eval(self.objective_function, globals(), locals())
def _turb_medoid(x_locs, y_locs): """Turbine medoid. """ return np.median(x_locs), np.median(y_locs) def _compute_nn_conn_dist(x_coords, y_coords): """Connect turbines using a greedy nearest-neighbor approach. """ if len(x_coords) <= 1: return 0 coordinates = np.c_[x_coords, y_coords] allowed_conns = np.r_[coordinates.mean(axis=0)[None], coordinates] mask = np.zeros_like(allowed_conns) mask[0] = 1 left_to_connect = np.ma.array(allowed_conns, mask=mask) mask = np.ones_like(allowed_conns) mask[0] = 0 allowed_conns = np.ma.array(allowed_conns, mask=mask) total_dist = 0 for __ in range(len(coordinates)): dists = left_to_connect[:, :, None] - allowed_conns.T[None] dists = np.hypot(dists[:, 0], dists[:, 1]) min_dists = dists.min(axis=-1) total_dist += min_dists.min() next_connection = min_dists.argmin() allowed_conns.mask[next_connection] = 0 left_to_connect.mask[next_connection] = 1 return total_dist