"""COMPASS Ordinance Threaded services"""
import json
import shutil
import asyncio
import hashlib
import logging
import contextlib
from pathlib import Path
from abc import abstractmethod
from tempfile import TemporaryDirectory
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor
from elm.web.document import PDFDocument
from elm.web.utilities import write_url_doc_to_file
from compass import COMPASS_DEBUG_LEVEL
from compass.services.base import Service
from compass.utilities import (
LLM_COST_REGISTRY,
num_ordinances_in_doc,
)
from compass.pb import COMPASS_PB
logger = logging.getLogger(__name__)
def _cache_file_with_hash(doc, file_content, out_dir, make_name_unique=False):
"""Cache file and compute its hash"""
cache_fp = write_url_doc_to_file(
doc=doc,
file_content=file_content,
out_dir=out_dir,
make_name_unique=make_name_unique,
)
return cache_fp, _compute_sha256(cache_fp)
def _compute_sha256(file_path):
"""Compute sha256 checksum for file on disk"""
m = hashlib.sha256()
m.update(Path(file_path).read_bytes())
return f"sha256:{m.hexdigest()}"
def _move_file(doc, out_dir):
"""Move a file from a temp directory to an output directory"""
cached_fp = doc.attrs.get("cache_fn")
if cached_fp is None:
return None
cached_fp = Path(cached_fp)
date = datetime.now().strftime("%Y_%m_%d")
out_fn = doc.attrs.get("jurisdiction_name", cached_fp.stem)
out_fn = out_fn.replace(",", "").replace(" ", "_")
out_fn = f"{out_fn}_downloaded_{date}"
if not out_fn.endswith(cached_fp.suffix):
out_fn = f"{out_fn}{cached_fp.suffix}"
out_fp = Path(out_dir) / out_fn
shutil.move(cached_fp, out_fp)
return out_fp
def _write_cleaned_file(doc, out_dir):
"""Write cleaned ordinance text to directory"""
jurisdiction_name = doc.attrs.get("jurisdiction_name")
if jurisdiction_name is None:
return None
out_dir = Path(out_dir)
if COMPASS_DEBUG_LEVEL > 0:
_write_interim_cleaned_files(doc, out_dir, jurisdiction_name)
key_to_fp = {
"cleaned_ordinance_text": f"{jurisdiction_name} Ordinance Summary.txt",
"districts_text": f"{jurisdiction_name} Districts.txt",
}
out_paths = []
for key, fn in key_to_fp.items():
cleaned_text = doc.attrs.get(key)
if cleaned_text is None:
continue
out_fp = out_dir / fn
out_fp.write_text(cleaned_text, encoding="utf-8")
out_paths.append(out_fp)
return out_paths
def _write_interim_cleaned_files(doc, out_dir, jurisdiction_name):
"""Write intermediate output texts to file; helpful for debugging"""
key_to_fp = {
"ordinance_text": f"{jurisdiction_name} Ordinance Original text.txt",
"wind_energy_systems_text": (
f"{jurisdiction_name} Wind Ordinance text.txt"
),
"solar_energy_systems_text": (
f"{jurisdiction_name} Solar Ordinance text.txt"
),
"permitted_use_text": (
f"{jurisdiction_name} Permitted Use Original text.txt"
),
"permitted_use_only_text": (
f"{jurisdiction_name} Permitted Use Only text.txt"
),
}
for key, fn in key_to_fp.items():
text = doc.attrs.get(key)
if text is None:
continue
(out_dir / fn).write_text(text, encoding="utf-8")
def _write_ord_db(doc, out_dir):
"""Write parsed ordinance database to directory"""
ord_db = doc.attrs.get("scraped_values")
jurisdiction_name = doc.attrs.get("jurisdiction_name")
if ord_db is None or jurisdiction_name is None:
return None
out_fp = Path(out_dir) / f"{jurisdiction_name} Ordinances.csv"
ord_db.to_csv(out_fp, index=False)
return out_fp
_PROCESSING_FUNCTIONS = {
"move": _move_file,
"write_clean": _write_cleaned_file,
"write_db": _write_ord_db,
}
[docs]
class ThreadedService(Service):
"""Service that contains a ThreadPoolExecutor instance"""
def __init__(self, **kwargs):
"""
Parameters
----------
**kwargs
Keyword-value argument pairs to pass to
:class:`concurrent.futures.ThreadPoolExecutor`.
By default, ``None``.
"""
self._tpe_kwargs = kwargs or {}
self.pool = None
[docs]
def acquire_resources(self):
"""Open thread pool and temp directory"""
self.pool = ThreadPoolExecutor(**self._tpe_kwargs)
[docs]
def release_resources(self):
"""Shutdown thread pool and cleanup temp directory"""
self.pool.shutdown(wait=True, cancel_futures=True)
[docs]
class TempFileCache(ThreadedService):
"""Service that locally caches files downloaded from the internet"""
def __init__(self, td_kwargs=None, tpe_kwargs=None):
"""
Parameters
----------
td_kwargs : dict, optional
Keyword-value argument pairs to pass to
:class:`tempfile.TemporaryDirectory`. By default, ``None``.
tpe_kwargs : dict, optional
Keyword-value argument pairs to pass to
:class:`concurrent.futures.ThreadPoolExecutor`.
By default, ``None``.
"""
super().__init__(**(tpe_kwargs or {}))
self._td_kwargs = td_kwargs or {}
self._td = None
@property
def can_process(self):
"""bool: Always ``True`` (limiting is handled by asyncio)"""
return True
[docs]
def acquire_resources(self):
"""Open thread pool and temp directory"""
super().acquire_resources()
self._td = TemporaryDirectory(**self._td_kwargs)
[docs]
def release_resources(self):
"""Shutdown thread pool and cleanup temp directory"""
self._td.cleanup()
super().release_resources()
[docs]
async def process(self, doc, file_content, make_name_unique=False):
"""Write URL doc to file asynchronously
Parameters
----------
doc : elm.web.document.Document
Document containing meta information about the file. Must
have a "source" key in the ``attrs`` dict containing the
URL, which will be converted to a file name using
:func:`compute_fn_from_url`.
file_content : str or bytes
File content, typically string text for HTML files and bytes
for PDF file.
make_name_unique : bool, optional
Option to make file name unique by adding a UUID at the end
of the file name. By default, ``False``.
Returns
-------
Path
Path to output file.
"""
loop = asyncio.get_running_loop()
cache_fp, checksum = await loop.run_in_executor(
self.pool,
_cache_file_with_hash,
doc,
file_content,
self._td.name,
make_name_unique,
)
logger.debug("Cached doc from %s", doc.attrs.get("source", "Unknown"))
logger.trace(" ↪ checksum=%s", str(checksum))
doc.attrs["checksum"] = checksum
return cache_fp
[docs]
class TempFileCachePB(TempFileCache):
"""Service that locally caches files downloaded from the internet"""
[docs]
async def process(self, doc, file_content, make_name_unique=False):
"""Write URL doc to file asynchronously
Parameters
----------
doc : elm.web.document.Document
Document containing meta information about the file. Must
have a "source" key in the ``attrs`` dict containing the
URL, which will be converted to a file name using
:func:`compute_fn_from_url`.
file_content : str or bytes
File content, typically string text for HTML files and bytes
for PDF file.
make_name_unique : bool, optional
Option to make file name unique by adding a UUID at the end
of the file name. By default, ``False``.
Returns
-------
Path
Path to output file.
"""
out = await super().process(
doc=doc,
file_content=file_content,
make_name_unique=make_name_unique,
)
jurisdiction = asyncio.current_task().get_name()
with contextlib.suppress(KeyError):
COMPASS_PB.update_download_task(jurisdiction, advance=1)
return out
[docs]
class StoreFileOnDisk(ThreadedService):
"""Abstract service that manages the storage of a file on disk
Storage can occur due to creation or a move of a file.
"""
def __init__(self, out_dir, tpe_kwargs=None):
"""
Parameters
----------
out_dir : path-like
Path to output directory where file should be stored.
tpe_kwargs : dict, optional
Keyword-value argument pairs to pass to
:class:`concurrent.futures.ThreadPoolExecutor`.
By default, ``None``.
"""
super().__init__(**(tpe_kwargs or {}))
self.out_dir = out_dir
@property
def can_process(self):
"""bool: Always ``True`` (limiting is handled by asyncio)"""
return True
[docs]
async def process(self, doc):
"""Store file in out directory
Parameters
----------
doc : elm.web.document.Document
Document containing meta information about the file. Must
have relevant processing keys in the ``attrs`` dict,
otherwise the file may not be stored in the output
directory.
Returns
-------
Path or None
Path to output file, or `None` if no file was stored.
"""
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.pool, _PROCESSING_FUNCTIONS[self._PROCESS], doc, self.out_dir
)
@property
@abstractmethod
def _PROCESS(self): # noqa: N802
"""str: Key in `_PROCESSING_FUNCTIONS` defining the doc func"""
raise NotImplementedError
[docs]
class FileMover(StoreFileOnDisk):
"""Service that moves files to an output directory"""
_PROCESS = "move"
[docs]
class CleanedFileWriter(StoreFileOnDisk):
"""Service that writes cleaned text to a file"""
_PROCESS = "write_clean"
[docs]
class OrdDBFileWriter(StoreFileOnDisk):
"""Service that writes cleaned text to a file"""
_PROCESS = "write_db"
[docs]
class UsageUpdater(ThreadedService):
"""Service that updates usage info from a tracker into a file"""
def __init__(self, usage_fp, tpe_kwargs=None):
"""
Parameters
----------
usage_fp : path-like
Path to JSON file where usage should be tracked.
tpe_kwargs : dict, optional
Keyword-value argument pairs to pass to
:class:`concurrent.futures.ThreadPoolExecutor`.
By default, ``None``.
"""
super().__init__(**(tpe_kwargs or {}))
self.usage_fp = usage_fp
self._is_processing = False
@property
def can_process(self):
"""bool: ``True`` if file not currently being written to"""
return not self._is_processing
[docs]
async def process(self, tracker):
"""Add usage from tracker to file
Any existing usage info in the file will remain unchanged
EXCEPT for anything under the label of the input `tracker`,
all of which will be replaced with info from the tracker itself.
Parameters
----------
tracker : elm.ods.services.usage.UsageTracker
A usage tracker instance that contains usage info to be
added to output file.
"""
self._is_processing = True
try:
loop = asyncio.get_running_loop()
out = await loop.run_in_executor(
self.pool, _dump_usage, self.usage_fp, tracker
)
finally:
self._is_processing = False
return out
[docs]
class JurisdictionUpdater(ThreadedService):
"""Service that updates jurisdiction info into a file"""
def __init__(self, jurisdiction_fp, tpe_kwargs=None):
"""
Parameters
----------
jurisdiction_fp : path-like
Path to JSON file where jurisdictions should be tracked.
tpe_kwargs : dict, optional
Keyword-value argument pairs to pass to
:class:`concurrent.futures.ThreadPoolExecutor`.
By default, ``None``.
"""
super().__init__(**(tpe_kwargs or {}))
self.jurisdiction_fp = jurisdiction_fp
self._is_processing = False
@property
def can_process(self):
"""bool: ``True`` if file not currently being written to"""
return not self._is_processing
[docs]
async def process(
self, jurisdiction, doc, seconds_elapsed, usage_tracker=None
):
"""Add usage from tracker to file
Any existing usage info in the file will remain unchanged
EXCEPT for anything under the label of the input `tracker`,
all of which will be replaced with info from the tracker itself.
Parameters
----------
jurisdiction : compass.utilities.location.Jurisdiction
Jurisdiction to record.
doc : elm.web.document.Document | None
Document containing meta information about the jurisdiction.
Must have relevant processing keys in the ``attrs`` dict,
otherwise the jurisdiction may not be recorded properly.
If ``None``, the jurisdiction is assumed not to have been
found.
seconds_elapsed : int or float
Total number of seconds it took to look for (and possibly
parse) this document.
usage_tracker : compass.services.usage.UsageTracker, optional
Optional tracker instance to monitor token usage during
LLM calls. By default, ``None``.
"""
self._is_processing = True
try:
loop = asyncio.get_running_loop()
await loop.run_in_executor(
self.pool,
_dump_jurisdiction_info,
self.jurisdiction_fp,
jurisdiction,
doc,
seconds_elapsed,
usage_tracker,
)
finally:
self._is_processing = False
def _dump_usage(fp, tracker):
"""Dump usage to an existing file"""
if not Path(fp).exists():
usage_info = {}
else:
with Path.open(fp, encoding="utf-8") as fh:
usage_info = json.load(fh)
if tracker is not None:
tracker.add_to(usage_info)
with Path.open(fp, "w", encoding="utf-8") as fh:
json.dump(usage_info, fh, indent=4)
return usage_info
def _dump_jurisdiction_info(
fp, jurisdiction, doc, seconds_elapsed, usage_tracker
):
"""Dump jurisdiction info to an existing file"""
if not Path(fp).exists():
jurisdiction_info = {"jurisdictions": []}
else:
with Path.open(fp, encoding="utf-8") as fh:
jurisdiction_info = json.load(fh)
new_info = {
"full_name": jurisdiction.full_name,
"county": jurisdiction.county,
"state": jurisdiction.state,
"subdivision": jurisdiction.subdivision_name,
"jurisdiction_type": jurisdiction.type,
"FIPS": jurisdiction.code,
"found": False,
"total_time": seconds_elapsed,
"total_time_string": str(timedelta(seconds=seconds_elapsed)),
"jurisdiction_website": None,
"compass_crawl": False,
"cost": None,
"documents": None,
}
if usage_tracker is not None:
cost = _compute_jurisdiction_cost(usage_tracker)
new_info["cost"] = cost or None
if doc is not None and num_ordinances_in_doc(doc) > 0:
new_info["found"] = True
new_info["documents"] = [_compile_doc_info(doc)]
new_info["jurisdiction_website"] = doc.attrs.get(
"jurisdiction_website"
)
new_info["compass_crawl"] = doc.attrs.get("compass_crawl", False)
jurisdiction_info["jurisdictions"].append(new_info)
with Path.open(fp, "w", encoding="utf-8") as fh:
json.dump(jurisdiction_info, fh, indent=4)
def _compile_doc_info(doc):
"""Put together meta information about a single document"""
year, month, day = doc.attrs.get("date", (None, None, None))
return {
"source": doc.attrs.get("source"),
"effective_year": year if year is not None and year > 0 else None,
"effective_month": month if month is not None and month > 0 else None,
"effective_day": day if day is not None and day > 0 else None,
"ord_filename": Path(doc.attrs.get("out_fp") or "unknown").name,
"num_pages": len(doc.pages),
"checksum": doc.attrs.get("checksum"),
"is_pdf": isinstance(doc, PDFDocument),
"from_ocr": doc.attrs.get("from_ocr", False),
"ordinance_text_ngram_score": doc.attrs.get(
"ordinance_text_ngram_score"
),
"permitted_use_text_ngram_score": doc.attrs.get(
"permitted_use_text_ngram_score"
),
}
def _compute_jurisdiction_cost(usage_tracker):
"""Compute total cost from total tracked usage"""
total_cost = 0
for model, total_usage in usage_tracker.totals.items():
model_costs = LLM_COST_REGISTRY.get(model, {})
total_cost += (
total_usage.get("prompt_tokens", 0)
/ 1e6
* model_costs.get("prompt", 0)
)
total_cost += (
total_usage.get("response_tokens", 0)
/ 1e6
* model_costs.get("response", 0)
)
return total_cost