"""Ordinance file downloading logic"""
import logging
from contextlib import AsyncExitStack
from elm.web.document import PDFDocument
from elm.web.search.run import (
load_docs,
search_with_fallback,
web_search_links_as_docs,
)
from elm.web.website_crawl import (
_SCORE_KEY, # noqa: PLC2701
ELMWebsiteCrawler,
ELMLinkScorer,
)
from elm.web.utilities import filter_documents
from compass.extraction import check_for_ordinance_info, extract_date
from compass.services.threaded import TempFileCache, TempFileCachePB
from compass.validation.location import (
DTreeJurisdictionValidator,
JurisdictionValidator,
JurisdictionWebsiteValidator,
)
from compass.web.website_crawl import COMPASSCrawler, COMPASSLinkScorer
from compass.utilities.enums import LLMTasks
from compass.pb import COMPASS_PB
logger = logging.getLogger(__name__)
_NEG_INF = -1 * float("infinity")
[docs]
async def download_known_urls(
jurisdiction, urls, browser_semaphore=None, file_loader_kwargs=None
):
"""Download documents from known URLs
Parameters
----------
jurisdiction : :class:`~compass.utilities.location.Jurisdiction`
Jurisdiction instance representing the jurisdiction
corresponding to the documents.
urls : iterable of str
Collection of URLs to download documents from.
browser_semaphore : :class:`asyncio.Semaphore`, optional
Semaphore instance that can be used to limit the number of
downloads happening concurrently. If ``None``, no limits
are applied. By default, ``None``.
file_loader_kwargs : dict, optional
Dictionary of keyword arguments pairs to initialize
:class:`elm.web.file_loader.AsyncFileLoader`.
By default, ``None``.
Returns
-------
out_docs : list
List of :obj:`~elm.web.document.BaseDocument` instances
containing documents from the URL's, or an empty list if
something went wrong during the retrieval process.
Notes
-----
Requires :class:`~compass.services.threaded.TempFileCachePB`
service to be running.
"""
COMPASS_PB.update_jurisdiction_task(
jurisdiction.full_name,
description="Downloading known URL(s)...",
)
file_loader_kwargs = file_loader_kwargs or {}
file_loader_kwargs.update({"file_cache_coroutine": TempFileCachePB.call})
async with COMPASS_PB.file_download_prog_bar(
jurisdiction.full_name, len(urls)
):
try:
out_docs = await load_docs(
urls, browser_semaphore=browser_semaphore, **file_loader_kwargs
)
except KeyboardInterrupt:
raise
except Exception as e:
msg = (
"Encountered error of type %r while downloading known URLs: %r"
)
err_type = type(e)
logger.exception(msg, err_type, urls)
out_docs = []
return out_docs
[docs]
async def find_jurisdiction_website(
jurisdiction,
model_configs,
file_loader_kwargs=None,
search_semaphore=None,
browser_semaphore=None,
usage_tracker=None,
url_ignore_substrings=None,
**kwargs,
):
"""Search for the main landing page of a given jurisdiction
Parameters
----------
jurisdiction : :class:`~compass.utilities.location.Jurisdiction`
Jurisdiction instance representing the jurisdiction to find the
main webpage for.
model_configs : dict
Dictionary of :class:`~compass.llm.config.LLMConfig` instances.
Should have at minium a "default" key that is used as a fallback
for all tasks.
file_loader_kwargs : dict, optional
Dictionary of keyword arguments pairs to initialize
:class:`elm.web.file_loader.AsyncFileLoader`. If found, the
"pw_launch_kwargs" key in these will also be used to initialize
the :class:`elm.web.search.google.PlaywrightGoogleLinkSearch`
used for the Google URL search. By default, ``None``.
search_semaphore : :class:`asyncio.Semaphore`, optional
Semaphore instance that can be used to limit the number of
playwright browsers used to submit search engine queries open
concurrently. If ``None``, no limits are applied.
By default, ``None``.
browser_semaphore : :class:`asyncio.Semaphore`, optional
Semaphore instance that can be used to limit the number of
playwright browsers open concurrently. If ``None``, no limits
are applied. By default, ``None``.
usage_tracker : compass.services.usage.UsageTracker, optional
Optional tracker instance to monitor token usage during
LLM calls. By default, ``None``.
Returns
-------
str | None
URL for the jurisdiction website, if found; ``None`` otherwise.
"""
kwargs.update(file_loader_kwargs or {})
name = jurisdiction.full_name_the_prefixed
name_no_the = name.removeprefix("the ")
query_1 = f"{name_no_the} website".casefold().replace(",", "")
query_2 = f"main website {name}".casefold().replace(",", "")
potential_website_links = await search_with_fallback(
queries=[query_1, query_2],
num_urls=3,
ignore_url_parts=url_ignore_substrings,
browser_sem=search_semaphore,
task_name=jurisdiction.full_name,
**kwargs,
)
if not potential_website_links:
return None
model_config = model_configs.get(
LLMTasks.JURISDICTION_MAIN_WEBSITE_VALIDATION,
model_configs[LLMTasks.DEFAULT],
)
validator = JurisdictionWebsiteValidator(
browser_semaphore=browser_semaphore,
file_loader_kwargs=file_loader_kwargs,
usage_tracker=usage_tracker,
llm_service=model_config.llm_service,
**model_config.llm_call_kwargs,
)
for url in potential_website_links:
if await validator.check(url, jurisdiction):
return url
return None
[docs]
async def download_jurisdiction_ordinances_from_website(
website,
heuristic,
keyword_points,
file_loader_kwargs=None,
browser_config_kwargs=None,
crawler_config_kwargs=None,
max_urls=100,
crawl_semaphore=None,
pb_jurisdiction_name=None,
return_c4ai_results=False,
):
"""Download ordinance documents from a jurisdiction website
Parameters
----------
website : str
URL of the jurisdiction website to search.
keyword_points : dict
Dictionary of keyword points to use for scoring links.
Keys are keywords, values are points to assign to links
containing the keyword. If a link contains multiple keywords,
the points are summed up.
file_loader_kwargs : dict, optional
Dictionary of keyword arguments pairs to initialize
:class:`elm.web.file_loader.AsyncFileLoader`. If found, the
"pw_launch_kwargs" key in these will also be used to initialize
the :class:`elm.web.search.google.PlaywrightGoogleLinkSearch`
used for the Google URL search. By default, ``None``.
browser_config_kwargs : dict, optional
Dictionary of keyword arguments pairs to initialize the
:class:`crawl4ai.async_configs.BrowserConfig` class used for the
web crawl. By default, ``None``.
crawler_config_kwargs : dict, optional
Dictionary of keyword arguments pairs to initialize the
:class:`crawl4ai.async_configs.CrawlerConfig` class used for the
web crawl. By default, ``None``.
max_urls : int, optional
Max number of URLs to check from the website before terminating
the search. By default, ``100``.
crawl_semaphore : :class:`asyncio.Semaphore`, optional
Semaphore instance that can be used to limit the number of
website searches happening concurrently. If ``None``, no limits
are applied. By default, ``None``.
pb_jurisdiction_name : str, optional
Optional jurisdiction name to use to update progress bar, if
it's being used. By default, ``None``.
return_c4ai_results : bool, default=False
If ``True``, the crawl4ai results will be returned as a second
return value. This is useful for debugging and examining the
crawled URLs. If ``False``, only the documents will be returned.
By default, ``False``.
Returns
-------
out_docs : list
List of :obj:`~elm.web.document.BaseDocument` instances
containing potential ordinance information, or an empty list if
no ordinance document was found.
results : list, optional
List of crawl4ai results containing metadata about the crawled
pages. This is only returned if `return_c4ai_results` is
``True``.
Notes
-----
Requires :class:`~compass.services.threaded.TempFileCache` service
to be running.
"""
if crawl_semaphore is None:
crawl_semaphore = AsyncExitStack()
async def _doc_heuristic(doc): # noqa: RUF029
"""Heuristic check for wind ordinance documents"""
is_valid_document = heuristic.check(doc.text.lower())
if is_valid_document and pb_jurisdiction_name:
COMPASS_PB.update_website_crawl_doc_found(pb_jurisdiction_name)
return is_valid_document
async def _crawl_hook(*__, **___): # noqa: RUF029
"""Update progress bar as pages are searched"""
COMPASS_PB.update_website_crawl_task(pb_jurisdiction_name, advance=1)
file_loader_kwargs = file_loader_kwargs or {}
file_loader_kwargs.update({"file_cache_coroutine": TempFileCache.call})
browser_config_kwargs = browser_config_kwargs or {}
pw_launch_kwargs = file_loader_kwargs.get("pw_launch_kwargs", {})
browser_config_kwargs["headless"] = pw_launch_kwargs.get("headless", True)
crawler = ELMWebsiteCrawler(
validator=_doc_heuristic,
url_scorer=ELMLinkScorer(keyword_points).score,
file_loader_kwargs=file_loader_kwargs,
browser_config_kwargs=browser_config_kwargs,
crawler_config_kwargs=crawler_config_kwargs,
include_external=True,
max_pages=max_urls,
page_limit=int(max_urls * 3),
)
if pb_jurisdiction_name:
COMPASS_PB.update_jurisdiction_task(
pb_jurisdiction_name,
description=f"Searching for documents from {website} ...",
)
cpb = COMPASS_PB.website_crawl_prog_bar(pb_jurisdiction_name, max_urls)
ch = _crawl_hook
else:
cpb = AsyncExitStack()
ch = None
async with crawl_semaphore, cpb:
return await crawler.run(
website,
on_result_hook=ch,
return_c4ai_results=return_c4ai_results,
)
[docs]
async def download_jurisdiction_ordinances_from_website_compass_crawl(
website,
heuristic,
keyword_points,
file_loader_kwargs=None,
already_visited=None,
num_link_scores_to_check_per_page=4,
max_urls=100,
crawl_semaphore=None,
pb_jurisdiction_name=None,
):
"""Download ord documents from a website using the COMPASS crawler
The COMPASS crawler is much more simplistic than the Crawl4AI
crawler, but is designed to access some links that Crawl4AI cannot
(such as those behind a button interface).
Parameters
----------
website : str
URL of the jurisdiction website to search.
keyword_points : dict
Dictionary of keyword points to use for scoring links.
Keys are keywords, values are points to assign to links
containing the keyword. If a link contains multiple keywords,
the points are summed up.
file_loader_kwargs : dict, optional
Dictionary of keyword arguments pairs to initialize
:class:`elm.web.file_loader.AsyncFileLoader`. If found, the
"pw_launch_kwargs" key in these will also be used to initialize
the :class:`elm.web.search.google.PlaywrightGoogleLinkSearch`
used for the Google URL search. By default, ``None``.
max_urls : int, optional
Max number of URLs to check from the website before terminating
the search. By default, ``100``.
crawl_semaphore : :class:`asyncio.Semaphore`, optional
Semaphore instance that can be used to limit the number of
website crawls happening concurrently. If ``None``, no limits
are applied. By default, ``None``.
pb_jurisdiction_name : str, optional
Optional jurisdiction name to use to update progress bar, if
it's being used. By default, ``None``.
Returns
-------
out_docs : list
List of :obj:`~elm.web.document.BaseDocument` instances
containing potential ordinance information, or an empty list if
no ordinance document was found.
Notes
-----
Requires :class:`~compass.services.threaded.TempFileCache` service
to be running.
"""
if crawl_semaphore is None:
crawl_semaphore = AsyncExitStack()
async def _doc_heuristic(doc): # noqa: RUF029
"""Heuristic check for wind ordinance documents"""
is_valid_document = heuristic.check(doc.text.lower())
if is_valid_document and pb_jurisdiction_name:
COMPASS_PB.update_compass_website_crawl_doc_found(
pb_jurisdiction_name
)
return is_valid_document
async def _crawl_hook(*__, **___): # noqa: RUF029
"""Update progress bar as pages are searched"""
COMPASS_PB.update_compass_website_crawl_task(
pb_jurisdiction_name, advance=1
)
file_loader_kwargs = file_loader_kwargs or {}
file_loader_kwargs.update({"file_cache_coroutine": TempFileCache.call})
crawler = COMPASSCrawler(
validator=_doc_heuristic,
url_scorer=COMPASSLinkScorer(keyword_points).score,
file_loader_kwargs=file_loader_kwargs,
num_link_scores_to_check_per_page=num_link_scores_to_check_per_page,
already_visited=already_visited,
max_pages=max_urls,
)
if pb_jurisdiction_name:
COMPASS_PB.update_jurisdiction_task(
pb_jurisdiction_name,
description=f"Double-checking {website} for documents ...",
)
cpb = COMPASS_PB.compass_website_crawl_prog_bar(
pb_jurisdiction_name, max_urls
)
ch = _crawl_hook
else:
cpb = AsyncExitStack()
ch = None
async with crawl_semaphore, cpb:
return await crawler.run(website, on_new_page_visit_hook=ch)
[docs]
async def download_jurisdiction_ordinance_using_search_engine(
question_templates,
jurisdiction,
num_urls=5,
file_loader_kwargs=None,
search_semaphore=None,
browser_semaphore=None,
url_ignore_substrings=None,
**kwargs,
):
"""Download the ordinance document(s) for a single jurisdiction
Parameters
----------
jurisdiction : :class:`~compass.utilities.location.Jurisdiction`
Location objects representing the jurisdiction.
model_configs : dict
Dictionary of :class:`~compass.llm.config.LLMConfig` instances.
Should have at minium a "default" key that is used as a fallback
for all tasks.
num_urls : int, optional
Number of unique Google search result URL's to check for
ordinance document. By default, ``5``.
file_loader_kwargs : dict, optional
Dictionary of keyword-argument pairs to initialize
:class:`elm.web.file_loader.AsyncFileLoader` with. If found, the
"pw_launch_kwargs" key in these will also be used to initialize
the :class:`elm.web.search.google.PlaywrightGoogleLinkSearch`
used for the google URL search. By default, ``None``.
search_semaphore : :class:`asyncio.Semaphore`, optional
Semaphore instance that can be used to limit the number of
playwright browsers used to submit search engine queries open
concurrently. If this input is ``None``, the input from
`browser_semaphore` will be used in its place (i.e. the searches
and file downloads will be limited using the same semaphore).
By default, ``None``.
browser_semaphore : :class:`asyncio.Semaphore`, optional
Semaphore instance that can be used to limit the number of
playwright browsers used to download content from the web open
concurrently. If ``None``, no limits are applied.
By default, ``None``.
usage_tracker : compass.services.usage.UsageTracker, optional
Optional tracker instance to monitor token usage during
LLM calls. By default, ``None``.
Returns
-------
list or None
List of :obj:`~elm.web.document.BaseDocument` instances possibly
containing ordinance information, or ``None`` if no ordinance
document was found.
Notes
-----
Requires :class:`~compass.services.threaded.TempFileCachePB`
service to be running.
"""
COMPASS_PB.update_jurisdiction_task(
jurisdiction.full_name, description="Searching web..."
)
pb_store = []
async def _download_hook(urls): # noqa: RUF029
"""Update progress bar as file download starts"""
if not urls:
return
COMPASS_PB.update_jurisdiction_task(
jurisdiction.full_name, description="Downloading files..."
)
pb, task = COMPASS_PB.start_file_download_prog_bar(
jurisdiction.full_name, len(urls)
)
pb_store.append((pb, task, len(urls)))
kwargs.update(file_loader_kwargs or {})
try:
out_docs = await _docs_from_web_search(
question_templates=question_templates,
jurisdiction=jurisdiction,
num_urls=num_urls,
search_semaphore=search_semaphore,
browser_semaphore=browser_semaphore,
url_ignore_substrings=url_ignore_substrings,
on_search_complete_hook=_download_hook,
**kwargs,
)
finally:
if pb_store:
pb, task, num_urls = pb_store[0]
await COMPASS_PB.tear_down_file_download_prog_bar(
jurisdiction.full_name, num_urls, pb, task
)
return out_docs
[docs]
async def filter_ordinance_docs(
docs,
jurisdiction,
model_configs,
heuristic,
ordinance_text_collector_class,
permitted_use_text_collector_class,
usage_tracker=None,
check_for_correct_jurisdiction=True,
):
"""Filter a list of documents to only those that contain ordinances
Parameters
----------
jurisdiction : :class:`~compass.utilities.location.Jurisdiction`
Location objects representing the jurisdiction.
model_configs : dict
Dictionary of :class:`~compass.llm.config.LLMConfig` instances.
Should have at minium a "default" key that is used as a fallback
for all tasks.
usage_tracker : compass.services.usage.UsageTracker, optional
Optional tracker instance to monitor token usage during
LLM calls. By default, ``None``.
Returns
-------
list or None
List of :obj:`~elm.web.document.BaseDocument` instances possibly
containing ordinance information, or ``None`` if no ordinance
document was found.
"""
if check_for_correct_jurisdiction:
COMPASS_PB.update_jurisdiction_task(
jurisdiction.full_name,
description="Checking files for correct jurisdiction...",
)
docs = await _down_select_docs_correct_jurisdiction(
docs,
jurisdiction=jurisdiction,
usage_tracker=usage_tracker,
model_config=model_configs.get(
LLMTasks.DOCUMENT_JURISDICTION_VALIDATION,
model_configs[LLMTasks.DEFAULT],
),
)
logger.info(
"%d document(s) remaining after jurisdiction filter for %s"
"\n\t- %s",
len(docs),
jurisdiction.full_name,
"\n\t- ".join(
[doc.attrs.get("source", "Unknown source") for doc in docs]
),
)
COMPASS_PB.update_jurisdiction_task(
jurisdiction.full_name, description="Checking files for legal text..."
)
docs = await _down_select_docs_correct_content(
docs,
jurisdiction=jurisdiction,
model_configs=model_configs,
heuristic=heuristic,
ordinance_text_collector_class=ordinance_text_collector_class,
permitted_use_text_collector_class=permitted_use_text_collector_class,
usage_tracker=usage_tracker,
)
if not docs:
logger.info(
"Did not find any potential ordinance documents for %s",
jurisdiction.full_name,
)
return docs
docs = _sort_final_ord_docs(docs)
logger.info(
"Found %d potential ordinance documents for %s\n\t- %s",
len(docs),
jurisdiction.full_name,
"\n\t- ".join(
[doc.attrs.get("source", "Unknown source") for doc in docs]
),
)
return docs
async def _docs_from_web_search(
question_templates,
jurisdiction,
num_urls,
search_semaphore,
browser_semaphore,
url_ignore_substrings,
on_search_complete_hook,
**kwargs,
):
"""Download docs from web using jurisdiction queries"""
queries = [
question.format(jurisdiction=jurisdiction.full_name)
for question in question_templates
]
kwargs.update({"file_cache_coroutine": TempFileCachePB.call})
try:
docs = await web_search_links_as_docs(
queries,
num_urls=num_urls,
search_semaphore=search_semaphore,
browser_semaphore=browser_semaphore,
ignore_url_parts=url_ignore_substrings,
task_name=jurisdiction.full_name,
on_search_complete_hook=on_search_complete_hook,
**kwargs,
)
except KeyboardInterrupt:
raise
except Exception as e:
msg = (
"Encountered error of type %r while searching web for docs for %s:"
)
err_type = type(e)
logger.exception(msg, err_type, jurisdiction.full_name)
docs = []
return docs
async def _down_select_docs_correct_jurisdiction(
docs, jurisdiction, usage_tracker, model_config
):
"""Remove all documents not pertaining to the jurisdiction"""
jurisdiction_validator = JurisdictionValidator(
text_splitter=model_config.text_splitter,
llm_service=model_config.llm_service,
usage_tracker=usage_tracker,
**model_config.llm_call_kwargs,
)
logger.debug("Validating documents for %r", jurisdiction)
return await filter_documents(
docs,
validation_coroutine=jurisdiction_validator.check,
jurisdiction=jurisdiction,
task_name=jurisdiction.full_name,
)
async def _down_select_docs_correct_content(
docs,
jurisdiction,
model_configs,
heuristic,
ordinance_text_collector_class,
permitted_use_text_collector_class,
usage_tracker,
):
"""Remove all documents that don't contain ordinance info"""
return await filter_documents(
docs,
validation_coroutine=_contains_ordinances,
task_name=jurisdiction.full_name,
model_configs=model_configs,
heuristic=heuristic,
ordinance_text_collector_class=ordinance_text_collector_class,
permitted_use_text_collector_class=permitted_use_text_collector_class,
usage_tracker=usage_tracker,
)
async def _contains_ordinances(
doc, model_configs, usage_tracker=None, **kwargs
):
"""Helper coroutine that checks for ordinance and date info"""
model_config = model_configs.get(
LLMTasks.DOCUMENT_CONTENT_VALIDATION,
model_configs[LLMTasks.DEFAULT],
)
doc = await check_for_ordinance_info(
doc,
model_config=model_config,
usage_tracker=usage_tracker,
**kwargs,
)
contains_ordinances = doc.attrs.get("contains_ord_info", False)
if contains_ordinances:
logger.debug("Detected ordinance info; parsing date...")
date_model_config = model_configs.get(
LLMTasks.DATE_EXTRACTION, model_configs[LLMTasks.DEFAULT]
)
doc = await extract_date(
doc, date_model_config, usage_tracker=usage_tracker
)
return contains_ordinances
def _sort_final_ord_docs(all_ord_docs):
"""Sort the list of documents by year, type, and text length"""
if not all_ord_docs:
return None
return sorted(all_ord_docs, key=_ord_doc_sorting_key, reverse=True)
def _ord_doc_sorting_key(doc):
"""Sorting key for documents. The higher this value, the better"""
latest_year, latest_month, latest_day = doc.attrs.get("date", (-1, -1, -1))
best_docs_from_website = doc.attrs.get(_SCORE_KEY, 0)
prefer_pdf_files = isinstance(doc, PDFDocument)
highest_jurisdiction_score = doc.attrs.get(
# If not present, URL check passed with confidence so we set
# score to 1
DTreeJurisdictionValidator.META_SCORE_KEY,
1,
)
shortest_text_length = -1 * len(doc.text)
return (
best_docs_from_website,
latest_year or _NEG_INF,
prefer_pdf_files,
highest_jurisdiction_score,
shortest_text_length,
latest_month or _NEG_INF,
latest_day or _NEG_INF,
)