Source code for revrt.costs.cli

"""Code to build cost layer file"""

import json
import logging
from warnings import warn

import dask.config
import dask.distributed
from gaps.cli import CLICommandFromFunction

from revrt.models.cost_layers import ALL, TransmissionLayerCreationConfig
from revrt.costs.layer_creator import LayerCreator
from revrt.costs.dry_costs_creator import DryCostsCreator
from revrt.costs.masks import Masks
from revrt.utilities import (
    LayeredFile,
    load_data_using_layer_file_profile,
    save_data_using_layer_file_profile,
)
from revrt.exceptions import revrtAttributeError, revrtConfigurationError
from revrt.warn import revrtWarning


logger = logging.getLogger(__name__)
CONFIG_ACTIONS = ["layers", "dry_costs", "merge_friction_and_barriers"]


[docs] def build_routing_layers( # noqa: PLR0917, PLR0913 routing_file, template_file=None, input_layer_dir=".", output_tiff_dir=".", masks_dir=".", layers=None, dry_costs=None, merge_friction_and_barriers=None, max_workers=1, memory_limit_per_worker="auto", create_kwargs=None, ): """Create costs, barriers, and frictions from a config file You can re-run this function on an existing file to add new layers without overwriting existing layers or needing to change your original config. Parameters ---------- routing_file : path-like Path to GeoTIFF/Zarr file to store cost layers in. If the file does not exist, it will be created based on the `template_file` input. template_file : path-like, optional Path to template GeoTIFF (``*.tif`` or ``*.tiff``) or Zarr (``*.zarr``) file containing the profile and transform to be used for the layered costs file. If ``None``, then the `routing_file` is assumed to exist on disk already. By default, ``None``. input_layer_dir : path-like, optional Directory to search for input layers in, if not found in current directory. By default, ``'.'``. output_tiff_dir : path-like, optional Directory where cost layers should be saved as GeoTIFF. By default, ``"."``. masks_dir : path-like, optional Directory for storing/finding mask GeoTIFFs (wet, dry, landfall, wet+, dry+). By default, ``"."``. layers : list of LayerConfig, optional Configuration for layers to be built and added to the file. At least one of `layers`, `dry_costs`, or `merge_friction_and_barriers` must be defined. By default, ``None``. dry_costs : DryCosts, optional Configuration for dry cost layers to be built and added to the file. At least one of `layers`, `dry_costs`, or `merge_friction_and_barriers` must be defined. By default, ``None``. merge_friction_and_barriers : MergeFrictionBarriers, optional Configuration for merging friction and barriers and adding to the layered costs file. At least one of `layers`, `dry_costs`, or `merge_friction_and_barriers` must be defined. By default, ``None`` max_workers : int, optional Number of parallel workers to use for file creation. If ``None`` or >1, processing is performed in parallel using Dask. By default, ``1``. memory_limit_per_worker : str, float, int, or None, default="auto" Sets the memory limit *per worker*. This only applies if ``max_workers != 1``. If ``None`` or ``0``, no limit is applied. If ``"auto"``, the total system memory is split evenly between the workers. If a float, that fraction of the system memory is used *per worker*. If a string giving a number of bytes (like "1GiB"), that amount is used *per worker*. If an int, that number of bytes is used *per worker*. By default, ``"auto"`` create_kwargs : dict, optional Additional keyword arguments to pass to :meth:`LayeredFile.create_new` when creating a new layered file. Do not include ``template_file``; it will be ignored. By default, ``None``. """ config = _validated_config( routing_file=routing_file, template_file=template_file or routing_file, input_layer_dir=input_layer_dir, output_tiff_dir=output_tiff_dir, masks_dir=masks_dir, layers=layers, dry_costs=dry_costs, merge_friction_and_barriers=merge_friction_and_barriers, ) logger.debug( "Using dask config:\n%s", json.dumps(dask.config.config, indent=4) ) lock = None if max_workers != 1: client = dask.distributed.Client( n_workers=max_workers, memory_limit=memory_limit_per_worker ) logger.info( "Dask client created with %s workers and %r memory limit per " "worker", max_workers, memory_limit_per_worker, ) logger.info("Dashboard link: %s", client.dashboard_link) lock = dask.distributed.Lock("rioxarray-write", client=client) lf_handler = LayeredFile(fp=config.routing_file) if not lf_handler.fp.exists(): create_kwargs = create_kwargs or {} create_kwargs.pop("template_file", None) logger.info( "%s not found. Creating new layered file with kwargs:\n%r", lf_handler.fp, create_kwargs, ) lf_handler.create_new( template_file=config.template_file, **create_kwargs ) masks = _load_masks(config, lf_handler) builder = LayerCreator( lf_handler, masks, input_layer_dir=config.input_layer_dir, output_tiff_dir=config.output_tiff_dir, ) _build_layers(config, builder, lf_handler, lock=lock) if config.dry_costs is not None: _build_dry_costs(config, masks, lf_handler, lock=lock) if config.merge_friction_and_barriers is not None: _combine_friction_and_barriers(config, lf_handler, lock=lock)
def _validated_config(**config_dict): """Validate use config inputs""" config = TransmissionLayerCreationConfig.model_validate(config_dict) if not any(config.model_dump()[key] is not None for key in CONFIG_ACTIONS): msg = f"At least one of {CONFIG_ACTIONS!r} must be in the config file" raise revrtConfigurationError(msg) return config def _load_masks(config, lf_handler): """Load masks based on config file""" masks = Masks( shape=lf_handler.shape, crs=lf_handler.profile["crs"], transform=lf_handler.profile["transform"], masks_dir=config.masks_dir, ) if not config.layers: return masks build_configs = [lc.build for lc in config.layers] need_masks = any( lc.extent != ALL for bc in build_configs for lc in bc.values() ) if need_masks: masks.load(lf_handler.fp) return masks def _build_layers(config, builder, lf_handler, lock): """Build layers from config file""" existing_layers = set(lf_handler.data_layers) for lc in config.layers or []: if lc.layer_name in existing_layers: logger.info( "Layer %r already exists in %s! Skipping...", lc.layer_name, lf_handler.fp, ) continue builder.build( lc.layer_name, lc.build, values_are_costs_per_mile=lc.values_are_costs_per_mile, write_to_file=lc.include_in_file, description=lc.description, lock=lock, ) def _build_dry_costs(config, masks, lf_handler, lock): """Build dry costs from config file""" dc = config.dry_costs dry_mask = None try: dry_mask = masks.dry_mask except revrtAttributeError: msg = "Dry mask not found! Computing dry costs for full extent!" warn(msg, revrtWarning) dcc = DryCostsCreator( lf_handler, input_layer_dir=config.input_layer_dir, output_tiff_dir=config.output_tiff_dir, ) cost_configs = None if not dc.cost_configs else str(dc.cost_configs) dcc.build( iso_region_tiff=dc.iso_region_tiff, nlcd_tiff=dc.nlcd_tiff, slope_tiff=dc.slope_tiff, transmission_config=cost_configs, mask=dry_mask, default_mults=dc.default_mults, extra_tiffs=dc.extra_tiffs, lock=lock, ) def _combine_friction_and_barriers(config, io_handler, lock): """Combine friction and barriers and save to layered file""" logger.info("Loading friction and raw barriers") merge_config = config.merge_friction_and_barriers friction = load_data_using_layer_file_profile( io_handler.fp, f"{merge_config.friction_layer}.tif", layer_dirs=[config.output_tiff_dir, config.input_layer_dir], ) barriers = load_data_using_layer_file_profile( io_handler.fp, f"{merge_config.barrier_layer}.tif", layer_dirs=[config.output_tiff_dir, config.input_layer_dir], ) combined = friction + barriers * merge_config.barrier_multiplier out_fp = config.output_tiff_dir / f"{merge_config.output_layer_name}.tif" logger.debug("Saving combined barriers to %s", out_fp) save_data_using_layer_file_profile( layer_fp=io_handler.fp, data=combined, geotiff=out_fp, lock=lock ) logger.info("Writing combined barriers to H5") io_handler.write_layer(combined, merge_config.output_layer_name) build_routing_layers_command = CLICommandFromFunction( build_routing_layers, name="build-routing-layers", add_collect=False, split_keys=None, )