Source code for elm.ords.process

# -*- coding: utf-8 -*-
"""ELM Ordinance full processing logic"""
import time
import json
import asyncio
import logging
from datetime import datetime, timedelta
from pathlib import Path
from functools import partial

import openai
import pandas as pd
from langchain.text_splitter import RecursiveCharacterTextSplitter

from elm import ApiBase
from elm.utilities import validate_azure_api_params
from elm.ords.download import download_county_ordinance
from elm.ords.extraction import (
    extract_ordinance_text_with_ngram_validation,
    extract_ordinance_values,
)
from elm.ords.services.usage import UsageTracker
from elm.ords.services.openai import OpenAIService, usage_from_response
from elm.ords.services.provider import RunningAsyncServices
from elm.ords.services.threaded import (
    TempFileCache,
    FileMover,
    CleanedFileWriter,
    OrdDBFileWriter,
    UsageUpdater,
)
from elm.ords.services.cpu import PDFLoader, read_pdf_doc, read_pdf_doc_ocr
from elm.ords.utilities import (
    RTS_SEPARATORS,
    load_all_county_info,
    load_counties_from_fp,
)
from elm.ords.utilities.location import County
from elm.ords.utilities.queued_logging import (
    LocationFileLog,
    LogListener,
    NoLocationFilter,
)

logger = logging.getLogger(__name__)


OUT_COLS = [
    "county",
    "state",
    "FIPS",
    "feature",
    "fixed_value",
    "mult_value",
    "mult_type",
    "adder",
    "min_dist",
    "max_dist",
    "value",
    "units",
    "ord_year",
    "last_updated",
    "section",
    "source",
    "comment",
]

CHECK_COLS = [
    "fixed_value",
    "mult_value",
    "adder",
    "min_dist",
    "max_dist",
    "value",
]


[docs] async def process_counties_with_openai( out_dir, county_fp=None, model="gpt-4", azure_api_key=None, azure_version=None, azure_endpoint=None, llm_call_kwargs=None, llm_service_rate_limit=4000, text_splitter_chunk_size=3000, text_splitter_chunk_overlap=300, num_urls_to_check_per_county=5, max_num_concurrent_browsers=10, file_loader_kwargs=None, pytesseract_exe_fp=None, td_kwargs=None, tpe_kwargs=None, ppe_kwargs=None, log_dir=None, clean_dir=None, county_ords_dir=None, county_dbs_dir=None, log_level="INFO", ): """Download and extract ordinances for a list of counties. Parameters ---------- out_dir : path-like Path to output directory. This directory will be created if it does not exist. This directory will contain the structured ordinance output CSV as well as all of the scraped ordinance documents (PDFs and HTML text files). Usage information and default options for log/clean directories will also be stored here. county_fp : path-like, optional Path to CSV file containing a list of counties to extract ordinance information for. This CSV should have "County" and "State" columns that contains the county and state names. By default, ``None``, which runs the extraction for all known counties (this is untested and not currently recommended). model : str, optional Name of LLM model to perform scraping. By default, ``"gpt-4"``. azure_api_key : str, optional Azure OpenAI API key. By default, ``None``, which pulls the key from the environment variable ``AZURE_OPENAI_API_KEY`` instead. azure_version : str, optional Azure OpenAI API version. By default, ``None``, which pulls the version from the environment variable ``AZURE_OPENAI_VERSION`` instead. azure_endpoint : str, optional Azure OpenAI API endpoint. By default, ``None``, which pulls the endpoint from the environment variable ``AZURE_OPENAI_ENDPOINT`` instead. llm_call_kwargs : dict, optional Keyword-value pairs used to initialize an `elm.ords.llm.LLMCaller` instance. By default, ``None``. llm_service_rate_limit : int, optional Token rate limit of LLm service being used (OpenAI). By default, ``4000``. text_splitter_chunk_size : int, optional Chunk size input to `langchain.text_splitter.RecursiveCharacterTextSplitter`. By default, ``3000``. text_splitter_chunk_overlap : int, optional Chunk overlap input to `langchain.text_splitter.RecursiveCharacterTextSplitter`. By default, ``300``. num_urls_to_check_per_county : int, optional Number of unique Google search result URL's to check for ordinance document. By default, ``5``. max_num_concurrent_browsers : int, optional Number of unique concurrent browser instances to open when performing Google search. Setting this number too high on a machine with limited processing can lead to increased timeouts and therefore decreased quality of Google search results. By default, ``10``. pytesseract_exe_fp : path-like, optional Path to pytesseract executable. If this option is specified, OCR parsing for PDf files will be enabled via pytesseract. By default, ``None``. td_kwargs : dict, optional Keyword-value argument pairs to pass to :class:`tempfile.TemporaryDirectory`. The temporary directory is used to store files downloaded from the web that are still being parsed for ordinance information. By default, ``None``. tpe_kwargs : dict, optional Keyword-value argument pairs to pass to :class:`concurrent.futures.ThreadPoolExecutor`. The thread pool executor is used to run I/O intensive tasks like writing to a log file. By default, ``None``. ppe_kwargs : dict, optional Keyword-value argument pairs to pass to :class:`concurrent.futures.ProcessPoolExecutor`. The process pool executor is used to run CPU intensive tasks like loading a PDF file. By default, ``None``. log_dir : path-like, optional Path to directory for log files. This directory will be created if it does not exist. By default, ``None``, which creates a ``logs`` folder in the output directory for the county-specific log files. clean_dir : path-like, optional Path to directory for cleaned ordinance text output. This directory will be created if it does not exist. By default, ``None``, which creates a ``clean`` folder in the output directory for the cleaned ordinance text files. county_ords_dir : path-like, optional Path to directory for individual county ordinance file outputs. This directory will be created if it does not exist. By default, ``None``, which creates a ``county_ord_files`` folder in the output directory. county_dbs_dir : path-like, optional Path to directory for individual county ordinance database outputs. This directory will be created if it does not exist. By default, ``None``, which creates a ``county_dbs`` folder in the output directory. log_level : str, optional Log level to set for county retrieval and parsing loggers. By default, ``"INFO"``. Returns ------- pd.DataFrame DataFrame of parsed ordinance information. This file will also be stored in the output directory under "wind_db.csv". """ start_time = time.time() log_listener = LogListener(["elm"], level=log_level) dirs = _setup_folders( out_dir, log_dir=log_dir, clean_dir=clean_dir, cod=county_ords_dir, cdd=county_dbs_dir, ) out_dir, log_dir, clean_dir, county_ords_dir, county_dbs_dir = dirs async with log_listener as ll: _setup_main_logging(log_dir, log_level, ll) db = await _process_with_logs( out_dir, log_dir, clean_dir, county_ords_dir, county_dbs_dir, ll, county_fp=county_fp, model=model, azure_api_key=azure_api_key, azure_version=azure_version, azure_endpoint=azure_endpoint, llm_call_kwargs=llm_call_kwargs, llm_service_rate_limit=llm_service_rate_limit, text_splitter_chunk_size=text_splitter_chunk_size, text_splitter_chunk_overlap=text_splitter_chunk_overlap, num_urls_to_check_per_county=num_urls_to_check_per_county, max_num_concurrent_browsers=max_num_concurrent_browsers, file_loader_kwargs=file_loader_kwargs, pytesseract_exe_fp=pytesseract_exe_fp, td_kwargs=td_kwargs, tpe_kwargs=tpe_kwargs, ppe_kwargs=ppe_kwargs, log_level=log_level, ) _record_total_time(out_dir / "usage.json", time.time() - start_time) return db
async def _process_with_logs( out_dir, log_dir, clean_dir, county_ords_dir, county_dbs_dir, log_listener, county_fp=None, model="gpt-4", azure_api_key=None, azure_version=None, azure_endpoint=None, llm_call_kwargs=None, llm_service_rate_limit=4000, text_splitter_chunk_size=3000, text_splitter_chunk_overlap=300, num_urls_to_check_per_county=5, max_num_concurrent_browsers=10, file_loader_kwargs=None, pytesseract_exe_fp=None, td_kwargs=None, tpe_kwargs=None, ppe_kwargs=None, log_level="INFO", ): """Process counties with logging enabled.""" counties = _load_counties_to_process(county_fp) azure_api_key, azure_version, azure_endpoint = validate_azure_api_params( azure_api_key, azure_version, azure_endpoint ) tpe_kwargs = _configure_thread_pool_kwargs(tpe_kwargs) file_loader_kwargs = _configure_file_loader_kwargs(file_loader_kwargs) if pytesseract_exe_fp is not None: _setup_pytesseract(pytesseract_exe_fp) file_loader_kwargs.update({"pdf_ocr_read_coroutine": read_pdf_doc_ocr}) text_splitter = RecursiveCharacterTextSplitter( RTS_SEPARATORS, chunk_size=text_splitter_chunk_size, chunk_overlap=text_splitter_chunk_overlap, length_function=partial(ApiBase.count_tokens, model=model), ) client = openai.AsyncAzureOpenAI( api_key=azure_api_key, api_version=azure_version, azure_endpoint=azure_endpoint, ) services = [ OpenAIService(client, rate_limit=llm_service_rate_limit), TempFileCache(td_kwargs=td_kwargs, tpe_kwargs=tpe_kwargs), FileMover(county_ords_dir, tpe_kwargs=tpe_kwargs), CleanedFileWriter(clean_dir, tpe_kwargs=tpe_kwargs), OrdDBFileWriter(county_dbs_dir, tpe_kwargs=tpe_kwargs), UsageUpdater(out_dir / "usage.json", tpe_kwargs=tpe_kwargs), PDFLoader(**(ppe_kwargs or {})), ] browser_semaphore = ( asyncio.Semaphore(max_num_concurrent_browsers) if max_num_concurrent_browsers else None ) async with RunningAsyncServices(services): tasks = [] trackers = [] for __, row in counties.iterrows(): county, state, fips = row[["County", "State", "FIPS"]] location = County(county.strip(), state=state.strip(), fips=fips) usage_tracker = UsageTracker( location.full_name, usage_from_response ) trackers.append(usage_tracker) task = asyncio.create_task( process_county_with_logging( log_listener, log_dir, location, text_splitter, num_urls=num_urls_to_check_per_county, file_loader_kwargs=file_loader_kwargs, browser_semaphore=browser_semaphore, level=log_level, llm_service=OpenAIService, usage_tracker=usage_tracker, model=model, **(llm_call_kwargs or {}), ), name=location.full_name, ) tasks.append(task) docs = await asyncio.gather(*tasks) db = _docs_to_db(docs) db.to_csv(out_dir / "wind_db.csv", index=False) return db def _setup_main_logging(log_dir, level, listener): """Setup main logger for catching exceptions during execution.""" handler = logging.FileHandler(log_dir / "main.log", encoding="utf-8") handler.setLevel(level) handler.addFilter(NoLocationFilter()) listener.addHandler(handler) def _setup_folders( out_dir, log_dir=None, clean_dir=None, cod=None, cdd=None, ): """Setup output directory folders.""" out_dir = Path(out_dir) out_folders = [ out_dir, Path(log_dir) if log_dir else out_dir / "logs", Path(clean_dir) if clean_dir else out_dir / "clean", Path(cod) if cod else out_dir / "county_ord_files", Path(cdd) if cdd else out_dir / "county_dbs", ] for folder in out_folders: folder.mkdir(exist_ok=True, parents=True) return out_folders def _load_counties_to_process(county_fp): """Load the counties to retrieve documents for.""" if county_fp is None: logger.info("No `county_fp` input! Loading all counties") return load_all_county_info() return load_counties_from_fp(county_fp) def _configure_thread_pool_kwargs(tpe_kwargs): """Set thread pool workers to 5 if user didn't specify.""" tpe_kwargs = tpe_kwargs or {} tpe_kwargs.setdefault("max_workers", 5) return tpe_kwargs def _configure_file_loader_kwargs(file_loader_kwargs): """Add PDF reading coroutine to kwargs.""" file_loader_kwargs = file_loader_kwargs or {} file_loader_kwargs.update({"pdf_read_coroutine": read_pdf_doc}) return file_loader_kwargs
[docs] async def process_county_with_logging( listener, log_dir, county, text_splitter, num_urls=5, file_loader_kwargs=None, browser_semaphore=None, level="INFO", **kwargs, ): """Retrieve ordinance document for a single county with async logs. Parameters ---------- listener : elm.ords.utilities.queued_logging.LogListener Active ``LogListener`` instance that can be passed to :class:`elm.ords.utilities.queued_logging.LocationFileLog`. log_dir : path-like Path to output directory to contain log file. county : elm.ords.utilities.location.Location County to retrieve ordinance document for. text_splitter : obj, optional Instance of an object that implements a `split_text` method. The method should take text as input (str) and return a list of text chunks. Langchain's text splitters should work for this input. num_urls : int, optional Number of unique Google search result URL's to check for ordinance document. By default, ``5``. file_loader_kwargs : dict, optional Dictionary of keyword-argument pairs to initialize :class:`elm.web.file_loader.AsyncFileLoader` with. The "pw_launch_kwargs" key in these will also be used to initialize the :class:`elm.web.google_search.PlaywrightGoogleLinkSearch` used for the google URL search. By default, ``None``. browser_semaphore : asyncio.Semaphore, optional Semaphore instance that can be used to limit the number of playwright browsers open concurrently. If ``None``, no limits are applied. By default, ``None``. level : str, optional Log level to set for retrieval logger. By default, ``"INFO"``. **kwargs Keyword-value pairs used to initialize an `elm.ords.llm.LLMCaller` instance. Returns ------- elm.web.document.BaseDocument | None Document instance for the ordinance document, or ``None`` if no document was found. Extracted ordinance information is stored in the document's ``metadata`` attribute. """ with LocationFileLog( listener, log_dir, location=county.full_name, level=level ): task = asyncio.create_task( process_county( county, text_splitter, num_urls=num_urls, file_loader_kwargs=file_loader_kwargs, browser_semaphore=browser_semaphore, **kwargs, ), name=county.full_name, ) try: doc, *__ = await asyncio.gather(task) except KeyboardInterrupt: raise except Exception as e: logger.error( "Encountered error while processing %s:", county.full_name ) logger.exception(e) doc = None return doc
[docs] async def process_county( county, text_splitter, num_urls=5, file_loader_kwargs=None, browser_semaphore=None, **kwargs, ): """Download and parse ordinance document for a single county. Parameters ---------- county : elm.ords.utilities.location.Location County to retrieve ordinance document for. text_splitter : obj, optional Instance of an object that implements a `split_text` method. The method should take text as input (str) and return a list of text chunks. Langchain's text splitters should work for this input. num_urls : int, optional Number of unique Google search result URL's to check for ordinance document. By default, ``5``. file_loader_kwargs : dict, optional Dictionary of keyword-argument pairs to initialize :class:`elm.web.file_loader.AsyncFileLoader` with. The "pw_launch_kwargs" key in these will also be used to initialize the :class:`elm.web.google_search.PlaywrightGoogleLinkSearch` used for the google URL search. By default, ``None``. browser_semaphore : asyncio.Semaphore, optional Semaphore instance that can be used to limit the number of playwright browsers open concurrently. If ``None``, no limits are applied. By default, ``None``. **kwargs Keyword-value pairs used to initialize an `elm.ords.llm.LLMCaller` instance. Returns ------- elm.web.document.BaseDocument | None Document instance for the ordinance document, or ``None`` if no document was found. Extracted ordinance information is stored in the document's ``metadata`` attribute. """ start_time = time.time() doc = await download_county_ordinance( county, text_splitter, num_urls=num_urls, file_loader_kwargs=file_loader_kwargs, browser_semaphore=browser_semaphore, **kwargs, ) if doc is None: await _record_time_and_usage(start_time, **kwargs) return None doc.metadata["location"] = county doc.metadata["location_name"] = county.full_name await _record_usage(**kwargs) doc = await extract_ordinance_text_with_ngram_validation( doc, text_splitter, **kwargs ) await _record_usage(**kwargs) doc = await _write_cleaned_text(doc) doc = await extract_ordinance_values(doc, **kwargs) ord_count = _num_ords_in_doc(doc) if ord_count > 0: doc = await _move_file_to_out_dir(doc) doc = await _write_ord_db(doc) logger.info( "%d ordinance value(s) found for %s. Outputs are here: '%s'", ord_count, county.full_name, doc.metadata["ord_db_fp"], ) else: logger.info("No ordinances found for %s.", county.full_name) await _record_time_and_usage(start_time, **kwargs) return doc
async def _record_usage(**kwargs): """Dump usage to file if tracker found in kwargs.""" usage_tracker = kwargs.get("usage_tracker") if usage_tracker: await UsageUpdater.call(usage_tracker) async def _record_time_and_usage(start_time, **kwargs): """Add elapsed time before updating usage to file.""" seconds_elapsed = time.time() - start_time usage_tracker = kwargs.get("usage_tracker") if usage_tracker: usage_tracker["total_time_seconds"] = seconds_elapsed usage_tracker["total_time"] = str(timedelta(seconds=seconds_elapsed)) await UsageUpdater.call(usage_tracker) async def _move_file_to_out_dir(doc): """Move PDF or HTML text file to output directory.""" out_fp = await FileMover.call(doc) doc.metadata["out_fp"] = out_fp return doc async def _write_cleaned_text(doc): """Write cleaned text to `clean_dir`.""" out_fp = await CleanedFileWriter.call(doc) doc.metadata["cleaned_fp"] = out_fp return doc async def _write_ord_db(doc): """Write cleaned text to `county_dbs_dir`.""" out_fp = await OrdDBFileWriter.call(doc) doc.metadata["ord_db_fp"] = out_fp return doc def _setup_pytesseract(exe_fp): """Set the pytesseract command.""" import pytesseract logger.debug("Setting `tesseract_cmd` to %s", exe_fp) pytesseract.pytesseract.tesseract_cmd = exe_fp def _record_total_time(fp, seconds_elapsed): """Dump usage to an existing file.""" if not Path(fp).exists(): usage_info = {} else: with open(fp, "r") as fh: usage_info = json.load(fh) total_time_str = str(timedelta(seconds=seconds_elapsed)) usage_info["total_time_seconds"] = seconds_elapsed usage_info["total_time"] = total_time_str with open(fp, "w") as fh: json.dump(usage_info, fh, indent=4) logger.info("Total processing time: %s", total_time_str) def _num_ords_in_doc(doc): """Check if doc contains any scraped ordinance values.""" if doc is None: return 0 if "ordinance_values" not in doc.metadata: return 0 ord_vals = doc.metadata["ordinance_values"] if ord_vals.empty: return 0 check_cols = [col for col in CHECK_COLS if col in ord_vals] if not check_cols: return 0 return (~ord_vals[check_cols].isna()).values.sum(axis=1).sum() def _docs_to_db(docs): """Convert list of docs to output database.""" db = [] for doc in docs: if doc is None or isinstance(doc, Exception): continue if _num_ords_in_doc(doc) == 0: continue results = _db_results(doc) results = _formatted_db(results) db.append(results) if not db: return pd.DataFrame(columns=OUT_COLS) db = pd.concat(db) db = _empirical_adjustments(db) return _formatted_db(db) def _db_results(doc): """Extract results from doc metadata to DataFrame.""" results = doc.metadata.get("ordinance_values") if results is None: return None results["source"] = doc.metadata.get("source") year = doc.metadata.get("date", (None, None, None))[0] results["ord_year"] = year if year is not None and year > 0 else None results["last_updated"] = datetime.now().strftime("%m/%d/%Y") location = doc.metadata["location"] results["FIPS"] = location.fips results["county"] = location.name results["state"] = location.state return results def _empirical_adjustments(db): """Post-processing adjustments based on empirical observations. Current adjustments include: - Limit adder to max of 250 ft. - Chat GPT likes to report large values here, but in practice all values manually observed in ordinance documents are below 250 ft. """ if "adder" in db.columns: db.loc[db["adder"] > 250, "adder"] = None return db def _formatted_db(db): """Format DataFrame for output.""" out_cols = [col for col in OUT_COLS if col in db.columns] return db[out_cols]