Source code for nrel.routee.compass.io.utils

from __future__ import annotations

from enum import Enum
from pathlib import Path

import logging
from typing import Union, Optional, TYPE_CHECKING
import math

if TYPE_CHECKING:
    import networkx
    import shapely

log = logging.getLogger(__name__)

CACHE_DIR = Path("cache")


[docs] class TileResolution(Enum): ONE_ARC_SECOND = 1 ONE_THIRD_ARC_SECOND = 13 @classmethod def from_string(self, string: str) -> TileResolution: if string.lower() in ["1", "one"]: return TileResolution.ONE_ARC_SECOND elif string.lower() in ["1/3", "one third"]: return TileResolution.ONE_THIRD_ARC_SECOND else: raise ValueError( f"invalid string {string} for tile resolution. Must be one of: 1, one, 1/3, one third" ) @classmethod def from_int(self, int: int) -> TileResolution: if int == 1: return TileResolution.ONE_ARC_SECOND elif int == 13: return TileResolution.ONE_THIRD_ARC_SECOND else: raise ValueError( f"invalid int {int} for tile resolution. Must be one of: 1, 13" )
def get_usgs_tiles(lat_lon_pairs: list[tuple[float, float]]) -> list[str]: def tile_index(lat: float, lon: float) -> str: if lat < 0 or lon > 0: raise ValueError( f"USGS Tiles are not available for point ({lat}, {lon}). " "Consider re-running with `grade=False`." ) lat_deg = int(lat) + 1 lon_deg = abs(int(lon)) + 1 return f"n{lat_deg:02}w{lon_deg:03}" tiles = set() for lat, lon in lat_lon_pairs: tile = tile_index(lat, lon) tiles.add(tile) return list(tiles) def _build_download_link( tile: str, resolution: TileResolution = TileResolution.ONE_ARC_SECOND ) -> str: base_link_fragment = "https://prd-tnm.s3.amazonaws.com/StagedProducts/Elevation/" resolution_link_fragment = f"{resolution.value}/TIFF/current/{tile}/" filename = f"USGS_{resolution.value}_{tile}.tif" link = base_link_fragment + resolution_link_fragment + filename return link def _download_tile( tile: str, output_dir: Path = CACHE_DIR, resolution: TileResolution = TileResolution.ONE_ARC_SECOND, ) -> Path: try: import requests except ImportError: raise ImportError( "requires requests to be installed. Try 'pip install requests'" ) url = _build_download_link(tile, resolution) filename = url.split("/")[-1] destination = output_dir / filename if destination.is_file(): log.info(f"{str(destination)} already exists, skipping") return destination with requests.get(url, stream=True) as r: log.info(f"downloading {tile}") try: r.raise_for_status() except requests.exceptions.HTTPError as e: raise ValueError( f"Failed to download USGS tile {tile} from {url}. " "If this road network is outside of the US, consider re-running with `grade=False`." ) from e destination.parent.mkdir(exist_ok=True) # write to file in chunks with destination.open("wb") as f: for chunk in r.iter_content(chunk_size=8192): f.write(chunk) return destination
[docs] def add_grade_to_graph( g: networkx.MultiDiGraph, output_dir: Path = Path("cache"), resolution_arc_seconds: Union[str, int] = 1, api_key: Optional[str] = None, ) -> networkx.MultiDiGraph: """ Adds grade information to the edges of a graph. If using an api_key will try and download the grades from Google API, otherwise this will download the necessary elevation data from USGS as raster tiles and cache them in the output_dir. The resolution of the tiles can be specified with the resolution parameter. USGS has elevation data in increasing resolutions of: 1 arc-second and 1/3 arc-second Average tile file sizes for each resolution are about: * 1 arc-second: 50 MB * 1/3 arc-second: 350 MB Args: g (nx.MultiDiGraph): The networkx graph to add grades to. output_dir (Path, optional): The directory to cache the downloaded tiles in. Defaults to Path("cache"). resolution_arc_seconds (str, optional): The resolution (in arc-seconds) of the tiles to download (either 1 or 1/3). Defaults to 1. api_key: The google API key to pull down grade information. If None will use USGS raster elevation tiles Returns: g: The graph with grade information added to the edges. Example: >>> import osmnx as ox >>> g = ox.graph_from_place("Denver, Colorado, USA") >>> g = add_grade_to_graph(g) >>> g2 = ox.graph_from_place("Denver, Colorado, USA") >>> g2 = add_grade_to_graph(g2, api_key=<api_key>) """ try: import osmnx as ox except ImportError: raise ImportError("requires osmnx to be installed. Try 'pip install osmnx'") if api_key is None: node_gdf = ox.graph_to_gdfs(g, nodes=True, edges=False) all_points = [(t.y, t.x) for t in node_gdf.itertuples()] tiles = get_usgs_tiles(all_points) if isinstance(resolution_arc_seconds, int): resolution = TileResolution.from_int(resolution_arc_seconds) elif isinstance(resolution_arc_seconds, str): resolution = TileResolution.from_string(resolution_arc_seconds) else: raise ValueError( f"invalid type for resolution {resolution_arc_seconds}." "Must be one of: int, str" ) files = [] for tile in tiles: downloaded_file = _download_tile( tile, output_dir=output_dir, resolution=resolution ) files.append(str(downloaded_file)) g = ox.add_node_elevations_raster(g, files) else: g = ox.add_node_elevations_google(g, api_key=api_key) g = ox.add_edge_grades(g) return g
def compass_heading(point1: tuple[float, float], point2: tuple[float, float]) -> float: lon1, lat1 = point1 lon2, lat2 = point2 lat1, lon1, lat2, lon2 = map(math.radians, [lat1, lon1, lat2, lon2]) dlon = lon2 - lon1 x = math.sin(dlon) * math.cos(lat2) y = math.cos(lat1) * math.sin(lat2) - ( math.sin(lat1) * math.cos(lat2) * math.cos(dlon) ) initial_bearing = math.atan2(x, y) initial_bearing = math.degrees(initial_bearing) compass_bearing = (initial_bearing + 360) % 360 return compass_bearing def calculate_bearings(geom: shapely.geometry.LineString) -> tuple[int, int]: if len(geom.coords) < 2: raise ValueError("Geometry must have at least two points") if len(geom.coords) == 2: # start and end heading is equal heading = int(compass_heading(geom.coords[0], geom.coords[1])) return (heading, heading) else: start_heading = int(compass_heading(geom.coords[0], geom.coords[1])) end_heading = int(compass_heading(geom.coords[-2], geom.coords[-1])) # returns headings as a list of tuples return (start_heading, end_heading)