"""
Least cost transmission utilities
"""
import logging
from typing import Optional, Union, Dict
import pandas as pd
import rasterio as rio
import geopandas as gpd
import numpy.typing as npt
from shapely.geometry import Point, LineString
from reVX.utilities import rasterize
from reVX.least_cost_xmission.config.constants import DEFAULT_DTYPE
VECTOR_CACHE: Dict[str, gpd.GeoDataFrame] = {}
logger = logging.getLogger(__name__)
[docs]def rasterize_shape_file(fname: str, profile: Dict,
buffer_dist: Optional[float] = None,
all_touched: bool = False,
reproject_vector: bool = True,
burn_value: Union[int, float] = 1,
boundary_only: bool = False,
dtype: npt.DTypeLike = DEFAULT_DTYPE,
) -> npt.NDArray:
"""
Rasterize a vector layer.
Parameters
----------
fname : str
Full path to gpgk or shp file
profile : dict
Raster profile to use
buffer_dist : float, optional
Distance to buffer features in fname by. Same units as the
template raster. By default, ``None``.
all_touched : bool, optional
Set all cells touched by vector to 1. False results in less cells
being set to 1. By default, ``False``.
reproject_vector : bool, optional
Reproject CRS of vector to match template raster if True.
By default, ``True``.
burn_value : int | float, optional
Value used to burn vectors into raster. By default, ``1``.
boundary_only : bool, optional
If True, rasterize boundary of vector. By default, ``False``.
dtype : np.dtype, optional
Datatype to use. By default, ``float32``.
Returns
-------
numpy.nd_array
Rasterized vector data
"""
if fname in VECTOR_CACHE:
logger.debug('Vector data for %s found in cache', fname)
gdf = VECTOR_CACHE[fname]
else:
logger.debug('Loading %s', fname)
gdf = gpd.read_file(fname)
VECTOR_CACHE[fname] = gdf
if reproject_vector:
logger.debug('Reprojecting vector')
gdf = gdf.to_crs(crs=profile['crs'])
logger.debug('Rasterizing %s', fname)
return rasterize(gdf, profile, buffer_dist=buffer_dist,
all_touched=all_touched, burn_value=burn_value,
boundary_only=boundary_only, dtype=dtype)
[docs]def convert_pois_to_lines(poi_csv_f: str, template_f: str, out_f: str):
"""
Convert POIs in CSV to lines and save in a geopackage as substations. Also
create a fake transmission line to connect to the substations.
Parameters
----------
poi_csv_f : str
Path to CSV file with POIs in it
template_f : str
Path to template raster with CRS to use for geopackage
out_f : str
Path and file name for geopackage
"""
logger.debug('Converting POIs in %s to lines in %s', poi_csv_f, out_f)
with rio.open(template_f) as ras:
crs = ras.crs
df = pd.read_csv(poi_csv_f)[['POI Name', 'State', 'Voltage (kV)', 'Lat',
'Long']]
pts = gpd.GeoDataFrame(df, geometry=gpd.points_from_xy(df.Long, df.Lat))
pts = pts.set_crs('EPSG:4326')
pts = pts.to_crs(crs)
# Convert points to short lines
new_geom = []
for pt in pts.geometry:
end = Point(pt.x + 50, pt.y + 50)
line = LineString([pt, end])
new_geom.append(line)
lines = pts.set_geometry(new_geom, crs=crs)
# Append some fake values to make the LCP code happy
lines['ac_cap'] = 9999999
lines['category'] = 'Substation'
lines['voltage'] = 500 # kV
lines['trans_gids'] = '[9999]'
# add a fake trans line for the subs to connect to to make LCP code happy
trans_line = pd.DataFrame({'POI Name': 'fake',
'ac_cap': 9999999,
'category': 'TransLine',
'voltage': 500, # kV
'trans_gids': None},
index=[9999])
trans_line = gpd.GeoDataFrame(trans_line)
geo = LineString([Point(0, 0), Point(100000, 100000)])
trans_line = trans_line.set_geometry([geo], crs=crs) # type: ignore
pois: gpd.GeoDataFrame = pd.concat([lines, trans_line])
pois['gid'] = pois.index
pois.to_file(out_f, driver="GPKG")
logger.debug('Complete')