Source code for mappymatch.matchers.valhalla

from __future__ import annotations

import json
import logging
from typing import List, Tuple

import numpy as np
import polyline
import requests
from shapely.geometry import LineString

from mappymatch.constructs.match import Match
from mappymatch.constructs.road import Road, RoadId
from mappymatch.constructs.trace import Trace
from mappymatch.matchers.matcher_interface import MatcherInterface, MatchResult
from import LATLON_CRS

log = logging.getLogger(__name__)


[docs] def build_path_from_result( edges: List[dict], shape: List[Tuple[float, float]] ) -> List[Road]: """ builds a mappymatch path from the result of a Valhalla map matching request """ path = [] for edge in edges: way_id = edge["way_id"] road_id = RoadId(start=None, end=None, key=way_id) start_point_i = edge["begin_shape_index"] end_point_i = edge["end_shape_index"] start_point = shape[start_point_i] end_point = shape[end_point_i] geom = LineString([start_point, end_point]) speed = edge["speed"] length = edge["length"] metadata = { "speed_mph": speed, "length_miles": length, } road = Road(road_id=road_id, geom=geom, metadata=metadata) path.append(road) return path
[docs] def build_match_result( trace: Trace, matched_points: List[dict], path: List[Road] ) -> MatchResult: """ builds a mappymatch MatchResult from the result of a Valhalla map matching request """ matches = [] for i, coord in enumerate(trace.coords): mp = matched_points[i] ei = mp.get("edge_index") dist = mp.get("distance_from_trace_point") if ei is None: road = None else: try: road = path[ei] except IndexError: road = None if dist is None: dist = np.inf match = Match(road, coord, dist) matches.append(match) return MatchResult(matches=matches, path=path)
[docs] class ValhallaMatcher(MatcherInterface): """ pings a Valhalla server for map matching """ def __init__( self, valhalla_url=DEMO_VALHALLA_ADDRESS, cost_model="auto", shape_match="map_snap", attributes=DEFAULT_ATTRIBUTES, ): self.url_base = valhalla_url self.cost_model = cost_model self.shape_match = shape_match all_attributes = list(REQUIRED_ATTRIBUTES.union(set(attributes))) self.attributes = all_attributes
[docs] def match_trace(self, trace: Trace) -> MatchResult: if not == LATLON_CRS: trace = trace.to_crs(LATLON_CRS) points = [{"lat": c.y, "lon": c.x} for c in trace.coords] json_payload = json.dumps( { "shape": points, "costing": self.cost_model, "shape_match": self.shape_match, "filters": { "attributes": self.attributes, "action": "include", }, "units": "miles", } ) valhalla_request = f"{self.url_base}?json={json_payload}" r = requests.get(valhalla_request) if not r.status_code == r.raise_for_status() j = r.json() edges = j["edges"] shape = polyline.decode(j["shape"], precision=6, geojson=True) matched_points = j["matched_points"] path = build_path_from_result(edges, shape) result = build_match_result(trace, matched_points, path) return result
[docs] def match_trace_batch(self, trace_batch: list[Trace]) -> list[MatchResult]: return [self.match_trace(t) for t in trace_batch]