Source code for reVX.offshore.dist_to_ports

# -*- coding: utf-8 -*-
"""
Compute least-cost distance to port
"""
from concurrent.futures import as_completed
import geopandas as gpd
import logging
import numpy as np
import os
import pandas as pd
from scipy.spatial import cKDTree
from skimage.graph import MCP_Geometric
import time
from warnings import warn

from reV.handlers.exclusions import ExclusionLayers
from reVX.handlers.geotiff import Geotiff
from reVX.utilities.utilities import log_versions, coordinate_distance
from rex.utilities.execution import SpawnProcessPool
from rex.utilities.loggers import log_mem
from rex.utilities.utilities import (check_res_file, get_lat_lon_cols,
                                     parse_table, row_col_indices)

logger = logging.getLogger(__name__)


[docs]class DistanceToPorts: """ Class to compute the least cost distance from offshore pixels to port locations. The distance to coast exclusion layer will be used to calculate least cost paths around land masses and other such obstructions. Produces the least cost distance from each offshore pixel to the nearest port. If a distance to port layer already exists it can be updated with the least cost distance to new ports. NOTE: Computing the least cost distance is both memory and computationally intensive! One EAGLE a bigmem node was needed to run in parallel and a medium (178GB) memory node is needed to run in serial. """ def __init__(self, ports, excl_fpath, input_dist_layer='dist_to_coast'): """ Parameters ---------- ports : str Path to shape, csv, or json file containing ports to compute least cost distance to excl_fpath: str Path to exclusions .h5 file with distance to coast layer input_dist_layer : str, optional Exclusions layer name with distance to coast, by default 'dist_to_coast' """ log_versions(logger) self._input_dist_layer = input_dist_layer self._ports_fpath = ports self._ports, self._cost_arr, self._profile = self._parse_ports( ports, excl_fpath, input_dist_layer=input_dist_layer) log_mem(logger) def __repr__(self): msg = "{} from {}".format(self.__class__.__name__, self.ports) return msg @property def ports(self): """ DataFrame of port locations Returns ------- DataFrame """ return self._ports @property def cost_arr(self): """ Cost array, used to compute least cost distance to ports Offshore pixels have a value of 90 (pixel size), onshore pixels have a value of -1 Returns ------- ndarray """ return self._cost_arr @classmethod def _parse_lat_lons(cls, excl_fpath, input_dist_layer='dist_to_coast'): """ Parse cost array, profile, and latitude and longitude coordinates Parameters ---------- excl_fpath: str Path to exclusions .h5 file with distance to coast layer input_dist_layer : str, optional Exclusions layer name with distance to coast values, by default 'dist_to_coast' Returns ------- lat_lon : pandas.DataFrame Table mapping the offshore pixel coordiantes (lat, lon) to thier position (row, col) within the distance to shore array/layer/raster cost_arr : ndarray Cost array with offshore pixels set to 90 (pixel width) and onshore pixels set to -1. profile : dict Profile (transform, crs, etc.) of arr raster """ hsds = check_res_file(excl_fpath)[1] with ExclusionLayers(excl_fpath, hsds=hsds) as tif: profile = tif.profile cost_arr = tif[input_dist_layer] lat = tif['latitude'].astype(np.float32) lon = tif['longitude'].astype(np.float32) mask = cost_arr > 0 cost_arr = np.where(mask, 90, -1).astype(np.int8) mask = mask.ravel() ids = np.arange(lat.size, dtype=np.uint32)[mask] row_len = lat.shape[1] lat = lat.ravel()[mask] lon = lon.ravel()[mask] del mask rows, cols = row_col_indices(ids, row_len) del ids del row_len lat_lon = pd.DataFrame({'latitude': lat, 'longitude': lon, 'row': rows, 'col': cols}) return lat_lon, cost_arr, profile @staticmethod def _check_ports_coords(port_coords, lat_lon): """ Check port coordinates to make sure they are within the resource domain Parameters ---------- port_coords : ndarray nx2 array of (lat, lon) port coordinates lat_lon : ndarray nx2 array of (lat, lon) offshore coordinates Returns ------- check : ndarray Boolean array indicating which ports are outside (False) the resource domain. """ lat_min, lat_max = np.sort(lat_lon[:, 0])[[0, -1]] lon_min, lon_max = np.sort(lat_lon[:, 1])[[0, -1]] lat = port_coords[:, 0] check = lat < lat_min check |= lat > lat_max lon = port_coords[:, 1] check |= lon < lon_min check |= lon > lon_max if any(check): bad_coords = port_coords[check] msg = ("Ports with coordinates ({}) are outsides of the " "resource domain: (({}, {}), ({}, {})) and will not be used" " to to compute the least cost distance!" .format(bad_coords, lat_min, lon_min, lat_max, lon_max)) logger.warning(msg) warn(msg) return ~check @staticmethod def _create_port_names(ports): """ Create port names from "PORT_NAME" and "STATE", confirm all names are unique Parameters ---------- ports : geopandas.GeoDataFrame | pandas.DataFrame DataFrame of port locations and their mapping to the offshore pixels for least cost distance computation Returns ------- ports : geopandas.GeoDataFrame | pandas.DataFrame DataFrame of port locations and their mapping to the offshore pixels for least cost distance computation which a unique port name added """ name = None state = None for c in ports.columns: if c.lower() == 'port_name': if name is not None: msg = ('Multiple potential "port names" were found: ' '({}, {})!'.format(name, c)) logger.error(msg) raise RuntimeError(msg) else: name = c.lower() if 'state' in c.lower(): if state is not None: msg = ('Multiple potential "states" were found: ' '({}, {})!'.format(state, c)) logger.error(msg) raise RuntimeError(msg) if state is not None and name is not None: break ports['name'] = (ports['PORT_NAME'].astype(str) + '_' + ports['ps_STATE'].astype(str)) counts = ports['name'].value_counts() if np.any(counts > 1): msg = ('Ports must have unique names! The following duplicate ' 'names were provided: {}'.format(counts[counts > 1])) logger.error(msg) raise RuntimeError(msg) return ports @classmethod def _parse_ports(cls, ports, excl_fpath, input_dist_layer='dist_to_coast'): """ Load ports for disc. Can be provided as a shape, csv, or json file. In all cases the ports latitude and longitude coordinates must be provided. Map the ports locations to the nearest offshore pixel in the distance to coast layer/array/raster. Compute the distance from the ports actual position to the nearest offshore pixel. Parameters ---------- ports : str Path to shape, csv, or json file containing ports to compute least cost distance to excl_fpath: str Path to exclusions .h5 file with distance to coast layer input_dist_layer : str, optional Exclusions layer name with distance to coast values, by default 'dist_to_coast' Returns ------- ports : geopandas.GeoDataFrame | pandas.DataFrame DataFrame of port locations and their mapping to the offshore pixels for least cost distance computation cost_arr : ndarray Cost array with offshore pixels set to 90 (pixel width) and onshore pixels set to -1. profile : dict Profile (transform, crs, etc.) of arr raster """ if ports.endswith('.shp'): ports = gpd.read_file(ports, ignore_geometry=True) else: ports = parse_table(ports) pixels, cost_arr, profile = \ cls._parse_lat_lons(excl_fpath, input_dist_layer=input_dist_layer) lat_lon_cols = get_lat_lon_cols(pixels) pixel_coords = pixels[lat_lon_cols].values tree = cKDTree(pixel_coords) # pylint: disable=not-callable lat_lon_cols = get_lat_lon_cols(ports) port_coords = ports[lat_lon_cols].values.astype('float32') # remove ports that are outside pixel bounds mask = cls._check_ports_coords(port_coords, pixel_coords) port_coords = port_coords[mask] ports = ports.loc[mask] _, idx = tree.query(port_coords) pixels = pixels.iloc[idx] pixel_coords = pixel_coords[idx] ports['row'] = pixels['row'].values ports['col'] = pixels['col'].values ports['dist_to_pixel'] = coordinate_distance(port_coords, pixel_coords) ports = cls._create_port_names(ports) return ports, cost_arr, profile
[docs] @classmethod def lc_dist_to_port(cls, cost_arr, port_idx, port_dist, geotiff=None, profile=None): """ Compute the least cost dist from the port coordinates to all offshore coordinates in km Parameters ---------- cost_arr : ndarray Cost array with offshore pixels set to 90 (pixel width) and onshore pixels set to -1. port_idx : list | tuple | ndarray Port (row, col) index, used as starting point for least cost distance port_dist : float Distance from port to pixel that corresponds to port_idx in meters geotiff : str, optional Output geotiff file path to save least cost distance to ports, by default None profile : dict, optional Profile (transform, crs, etc.) of cost array raster, needed to write distance to ports array to geotiff, by default None Returns ------- lc_dist : ndarray, optional Least cost distance from port to all offshore pixels in km """ try: ts = time.time() logger.debug('Port that is {:.4f} km from nearest offshore pixel ' '{}.'.format(port_dist, port_idx)) if not isinstance(port_idx, np.ndarray): port_idx = np.array(port_idx) if len(port_idx) == 2: port_idx = np.expand_dims(port_idx, 0) mcp = MCP_Geometric(cost_arr) lc_dist = mcp.find_costs(starts=port_idx)[0].astype('float32') lc_dist /= 1000 lc_dist += port_dist lc_dist[cost_arr == -1] = -1 tt = (time.time() - ts) / 60 logger.debug('- Least cost distance computed in {:.4f} minutes' .format(tt)) if geotiff is not None: logger.debug('Saving least cost distance to port to ' f'{geotiff}') msg = ('Profile is needed to write least cost distance to ' 'ports to {}!'.format(geotiff)) assert profile is not None, msg Geotiff.write(geotiff, profile, lc_dist) else: return lc_dist except Exception: logger.exception('- Error computing least cost distance to port!')
[docs] def distance_to_ports(self, out_dir, max_workers=1, replace=False): """ Compute the least cost distance from each offshore pixel to the nearest port in km Parameters ---------- out_dir : str Directory to save distance to port geotiffs too. max_workers : int, optional Number of workers to use for setback computation, if 1 run in serial, if > 1 run in parallel with that many workers, if None run in parallel on all available cores, by default 1 replace : bool, optional Flag to replace existing ports geotiffs, by default False """ if max_workers is None: max_workers = os.cpu_count() f_name = name = os.path.basename(self._ports_fpath).split('.')[0] if max_workers > 1: logger.info('Computing least cost distance to ports in parallel ' 'using {} workers'.format(max_workers)) loggers = [__name__, 'reVX'] with SpawnProcessPool(max_workers=max_workers, loggers=loggers) as exe: futures = [] for _, port in self.ports.iterrows(): name = port['name'].replace(' ', '_') geotiff = '{}-{}.tif'.format(f_name, name) geotiff = os.path.join(out_dir, geotiff) if os.path.exists(geotiff) and not replace: msg = ('{} already exists and will be skipped! To ' 'replace it set "replace=True"' .format(geotiff)) logger.warning(msg) warn(msg) else: logger.debug('Computing least cost distance to {}' .format(name)) port_idx = port[['row', 'col']].values port_dist = port['dist_to_pixel'] future = exe.submit( self.lc_dist_to_port, self.cost_arr, port_idx, port_dist, geotiff=geotiff, profile=self._profile) futures.append(future) for i, future in enumerate(as_completed(futures)): logger.debug('Computed least cost distance for {} of {} ' 'ports'.format((i + 1), len(futures))) log_mem(logger) else: logger.info('Computing least cost distance to ports in serial') for i, port in self.ports.iterrows(): name = port['name'].replace(' ', '_') geotiff = '{}-{}.tif'.format(f_name, name) geotiff = os.path.join(out_dir, geotiff) if os.path.exists(geotiff) and not replace: msg = ('{} already exists and will be skipped! To ' 'replace it set "replace=True"' .format(geotiff)) logger.warning(msg) warn(msg) else: logger.debug('Computing least cost distance to {}' .format(name)) port_idx = port[['row', 'col']].values port_dist = port['dist_to_pixel'] self.lc_dist_to_port( self.cost_arr, port_idx, port_dist, geotiff=geotiff, profile=self._profile) logger.debug('Computed least cost distance for {} of {} ' 'ports'.format((i + 1), len(self.ports))) log_mem(logger)
[docs] @classmethod def run(cls, ports, excl_fpath, out_dir, input_dist_layer='dist_to_coast', max_workers=1, replace=False): """ Compute the least cost distance from offshore pixels to port locations in km. The distance to coast exclusion layer will be used to calculate least cost paths around land masses and other such obstructions. Produces the least cost distance from each offshore pixel to the nearest port. If a distance to port layer already exists it can be updated with the least cost distance to new ports. Parameters ---------- ports : str Path to shape, csv, or json file containing ports to compute least cost distance to excl_fpath: str Path to exclusions .h5 file with distance to coast layer. Will also be the file into which the least cost distance to port is saved. out_dir : str Directory to save distance to port geotiffs too. input_dist_layer : str, optional Exclusions layer with distance to coast values, by default 'dist_to_coast' max_workers : int, optional Number of workers to use for setback computation, if 1 run in serial, if > 1 run in parallel with that many workers, if None run in parallel on all available cores, by default 1 replace : bool, optional Flag to replace existing ports geotiffs, by default False """ logger.info('Computing least cost distance to ports in {}' .format(ports)) dtp = cls(ports, excl_fpath, input_dist_layer=input_dist_layer) dtp.distance_to_ports(out_dir, max_workers=max_workers, replace=replace)