Source code for compass.web.website_crawl

"""Custom COMPASS website crawler

Much more simplistic than the Crawl4AI crawler, but designed to access
some links that Crawl4AI cannot (such as those behind a button
interface).
"""

import logging
import operator
from collections import Counter
from contextlib import AsyncExitStack
from urllib.parse import (
    urlparse,
    urlunparse,
    quote,
    unquote,
    parse_qsl,
    urlencode,
    urljoin,
)

from crawl4ai.models import Link as c4AILink
from bs4 import BeautifulSoup
from rebrowser_playwright.async_api import async_playwright
from rebrowser_playwright.async_api import Error as RBPlaywrightError
from playwright._impl._errors import Error as PlaywrightError  # noqa: PLC2701
from elm.web.utilities import pw_page
from elm.web.document import PDFDocument, HTMLDocument
from elm.web.file_loader import AsyncFileLoader
from elm.web.website_crawl import ELMLinkScorer, _SCORE_KEY  # noqa: PLC2701


logger = logging.getLogger(__name__)
_DEPTH_KEY = "web_crawl_depth"
_CLICKABLE_SELECTORS = [
    "button",  # "a", "p"
]
_BLACKLIST_SUBSTRINGS = [
    "facebook",
    "twitter",
    "linkedin",
    "instagram",
    "youtube",
    "instagram",
    "pinterest",  # cspell: disable-line
    "tiktok",  # cspell: disable-line
    "x.com",
    "snapchat",
    "reddit",
    "mailto:",
    "tel:",
    "javascript:",
    "login",
    "signup",
    "sign up",
    "signin",
    "sign in",
    "register",
    "subscribe",
    "donate",
    "shop",
    "cart",
    "careers",
    "event",
    "events",
    "calendar",
]
DOC_THRESHOLD = 5
"""Default max documents to collect before terminating COMPASS crawl"""


[docs] class COMPASSLinkScorer(ELMLinkScorer): """Custom URL scorer for COMPASS website crawling""" def _assign_value(self, text): """Score based on the presence of keywords in link text""" score = 0 text = text.casefold().replace("plant", "") for kw, kw_score in self.keyword_points.items(): if kw in text: score += kw_score return score
[docs] class COMPASSCrawler: """A simple website crawler to search for ordinance documents""" def __init__( self, validator, url_scorer, file_loader_kwargs=None, already_visited=None, num_link_scores_to_check_per_page=4, max_pages=100, browser_semaphore=None, ): """ Parameters ---------- validator : callable An async callable that takes a document instance (containing the text from a PDF or a webpage) and returns a boolean indicating whether the text passes the validation check. This is used to determine whether or not to keep (i.e. return) the document. url_scorer : callable An async callable that takes a list of dictionaries containing URL information and assigns each dictionary a `score` key representing the score for that URL. The input URL dictionaries will each have at least one key: "href". This key will contain the URL of the link. The dictionary may also have other attributes such as "title", which contains the link title text. file_loader_kwargs : dict, optional Additional keyword-value argument pairs to pass to the :class:`~elm.web.file_loader.AsyncFileLoader` class. If this dictionary contains the ``pw_launch_kwargs`` key, it's value (assumes to be another dictionary) will be used to initialize the playwright instances used for the crawl. By default, ``None``. already_visited : set, optional A set of URLs (either strings or :class:``Link`` objects) that have already been visited. This is used to avoid re-visiting links that have already been checked. By default, ``None``. num_link_scores_to_check_per_page : int, default=3 Number of top unique-scoring links per page to use for recursive crawling. This helps the crawl stay focused on the most likely links to contain documents of interest. max_pages : int, default=100 Maximum number of pages to crawl before terminating, regardless of whether the document was found or not. By default, ``100``. browser_semaphore : :class:`asyncio.Semaphore`, optional Semaphore instance that can be used to limit the number of playwright browsers open concurrently. If ``None``, no limits are applied. By default, ``None``. """ self.validator = validator self.url_scorer = url_scorer self.num_scores_to_check_per_page = num_link_scores_to_check_per_page self.checked_previously = already_visited or set() self.max_pages = max_pages file_loader_kwargs = file_loader_kwargs or {} flk = {"verify_ssl": False} flk.update(file_loader_kwargs or {}) self.afl = AsyncFileLoader(**flk) self.pw_launch_kwargs = ( file_loader_kwargs.get("pw_launch_kwargs") or {} ) self.browser_semaphore = ( AsyncExitStack() if browser_semaphore is None else browser_semaphore ) self._out_docs = [] self._already_visited = {} self._should_stop = None
[docs] async def run( self, base_url, termination_callback=None, on_new_page_visit_hook=None ): """Run the COMPASS website crawler Parameters ---------- base_url : str URL of the website to start crawling from. termination_callback : callable, optional An async callable that takes a list of documents and returns a boolean indicating whether to stop crawling. If ``None``, the crawl will simply terminates when :obj:`DOC_THRESHOLD` number of documents have been found. By default, ``None``. on_new_page_visit_hook : callable, optional An async callable that is called every time a new page is found during the crawl. The callable should accept a single argument, which is the page :class:`Link` instance. If ``None``, no additional processing is done on new pages. By default, ``None``. Returns ------- list List of document instances that passed the validation check. Each document contains the text from a PDF and has an attribute `source` that contains the URL of the document. This list may be empty if no documents are found. """ self._should_stop = termination_callback or _default_found_enough_docs await self._run( base_url, on_new_page_visit_hook=on_new_page_visit_hook ) self._should_stop = None self._log_crawl_stats() if self._out_docs: self._out_docs.sort(key=lambda x: -1 * x.attrs[_SCORE_KEY]) return self._out_docs
async def _run( self, base_url, link=None, depth=0, score=0, on_new_page_visit_hook=None, ): """Recursive web crawl function""" if link is None: base_url, link = self._reset_crawl(base_url) if link in self._already_visited: return if on_new_page_visit_hook: await on_new_page_visit_hook(link) self._already_visited[link] = (depth, score) logger.trace("self._already_visited=%r", self._already_visited) if await self._website_link_is_doc(link, depth, score): return num_urls_checked_on_this_page = 0 curr_url_score = None for next_link in await self._get_links_from_page(link, base_url): prev_len = len(self._out_docs) await self._run( base_url, link=Link( title=next_link["title"], href=next_link["href"], base_domain=base_url, ), depth=depth + 1, score=next_link["score"], on_new_page_visit_hook=on_new_page_visit_hook, ) doc_was_just_found = ( # fmt: off len(self._out_docs) == (prev_len + 1) and ( self._out_docs[-1].attrs.get(_DEPTH_KEY, -1) == (depth + 1) ) ) if doc_was_just_found: if await self.validator(self._out_docs[-1]): logger.debug(" - Document passed validation check!") else: self._out_docs = self._out_docs[:-1] elif ( not link.resembles_pdf and curr_url_score != next_link["score"] ): logger.trace( "Finished checking score %d at depth %d. Next score: %d", curr_url_score or -1, depth, next_link["score"], ) num_urls_checked_on_this_page += 1 curr_url_score = next_link["score"] if await self._should_terminate_crawl( num_urls_checked_on_this_page, link ): break return def _reset_crawl(self, base_url): """Reset crawl state and initialize crawling link""" self._out_docs = [] self._already_visited = {} base_url = _sanitize_url(base_url) return base_url, Link( title="Landing Page", href=_sanitize_url(urljoin(base_url, base_url.split(" ")[0])), base_domain=base_url, ) async def _website_link_is_doc(self, link, depth, score): """Check if website link contains doc""" if link in self.checked_previously and link.consistent_domain: # Don't re-check pages on main website return False if await self._website_link_is_pdf(link, depth, score): return True # at this point the page is NOT a PDF. However, it could still # just be a normal webpage on the main domain that we haven't # visited before. In that case, just return False if not link.consistent_domain: return False # now we are on an external page that we either have not visited # before or that we have seen but determined is NOT a PDF file return await self._website_link_as_html_doc(link, depth, score) async def _website_link_is_pdf(self, link, depth, score): """Fetch page content; keep only PDFs""" logger.debug("Loading Link: %s", link) try: doc = await self.afl.fetch(link.href) except KeyboardInterrupt: raise except Exception as e: msg = ( "Encountered error of type %r while trying to fetch " "content from %s" ) err_type = type(e) logger.exception(msg, err_type, link) return False if isinstance(doc, PDFDocument): logger.debug(" - Found PDF!") doc.attrs[_DEPTH_KEY] = depth doc.attrs[_SCORE_KEY] = score self._out_docs.append(doc) return True return False async def _website_link_as_html_doc(self, link, depth, score): """Fetch page content as HTML doc""" logger.debug("Loading Link as HTML: %s", link) html_text = await self._get_text_no_err(link.href) attrs = {_DEPTH_KEY: depth, _SCORE_KEY: score} doc = HTMLDocument([html_text], attrs=attrs) self._out_docs.append(doc) return True async def _get_links_from_page(self, link, base_url): """Get all links from a page sorted by relevance score""" if not link.consistent_domain: logger.debug("Detected new domain, stopping link discovery") return [] html_text = await self._get_text_no_err(link.href) page_links = [] if html_text: page_links = _extract_links_from_html(html_text, base_url=base_url) page_links = await self.url_scorer( [dict(link) for link in page_links] ) page_links = sorted( page_links, key=operator.itemgetter("score"), reverse=True ) _debug_info_on_links(page_links) return page_links async def _get_text_no_err(self, url): """Get all text from a page; return empty string if pw error""" try: text = await self._get_text(url) except (PlaywrightError, RBPlaywrightError): text = "" return text async def _get_text(self, url): """Get all html text from a page""" all_text = [] pw_page_kwargs = { "intercept_routes": True, "ignore_https_errors": True, "timeout": 60_0000, } async with async_playwright() as p, self.browser_semaphore: browser = await p.chromium.launch(**self.pw_launch_kwargs) async with pw_page(browser, **pw_page_kwargs) as page: await page.goto(url) await page.wait_for_load_state("networkidle", timeout=60_000) all_text.append(await page.content()) all_text += await _get_text_from_all_locators(page) return "\n".join(all_text) async def _should_terminate_crawl( self, num_urls_checked_on_this_page, link ): """Check if crawl should terminate""" if num_urls_checked_on_this_page >= self.num_scores_to_check_per_page: logger.debug( "Already checked %d unique link scores from %s", self.num_scores_to_check_per_page, link.href, ) return True if await self._should_stop(self._out_docs): logger.debug("Exiting crawl early due to user condition") return True if len(self._already_visited) >= self.max_pages: logger.debug(" - Too many links visited, stopping recursion") return True logger.trace( "Only checked %d pages, continuing crawl...", len(self._already_visited), ) return False def _log_crawl_stats(self): """Log statistics about crawled pages and depths""" logger.info("Crawled %d pages", len(self._already_visited)) logger.info("Found %d potential documents", len(self._out_docs)) logger.debug("Average score: %.2f", self._compute_avg_link_score()) logger.debug("Pages crawled by depth:") for depth, count in sorted(self._crawl_depth_counts().items()): logger.debug(" Depth %d: %d pages", depth, count) def _compute_avg_link_score(self): """Compute the average score of the crawled results""" return sum( score for __, score in self._already_visited.values() ) / len(self._already_visited) def _crawl_depth_counts(self): """Compute number of pages per depth""" depth_counts = Counter() depth_counts.update([d for d, __ in self._already_visited.values()]) return depth_counts
async def _default_found_enough_docs(out_docs): # noqa: RUF029 """Check if a predetermined # of documents has been found The number to check is set by the module-level constant :obj:`DOC_THRESHOLD`. """ return len(out_docs) >= DOC_THRESHOLD def _debug_info_on_links(links): """Send debug info on links to logger""" num_links = len(links) if num_links <= 0: logger.debug("Found no links!") return logger.debug("Found %d links:", len(links)) for link in links[:3]: logger.debug( " - %d: %s (%s)", link["score"], link["title"], link["href"] ) if num_links > 3: # noqa: PLR2004 logger.debug(" ...") def _sanitize_url(url): """Fix common URL issues - Encode spaces and unsafe characters in the path - Encode query parameters safely - Leave existing percent-encoding intact """ parsed = urlparse(url) safe_path = quote(unquote(parsed.path), safe="/") query_params = parse_qsl(parsed.query, keep_blank_values=True) safe_query = urlencode(query_params, doseq=True) # cspell: disable-line return urlunparse( ( parsed.scheme, parsed.netloc, safe_path, parsed.params, safe_query, parsed.fragment, ) ) def _extract_links_from_html(text, base_url): """Parse HTML and extract all links""" soup = BeautifulSoup(text, "html.parser") links = [ (a.get_text().strip(), a["href"]) for a in soup.find_all("a", href=True) ] return { Link( title=title, href=_sanitize_url(urljoin(base_url, path)), base_domain=base_url, ) for (title, path) in links if title and path and not any( substr in title.lower() for substr in _BLACKLIST_SUBSTRINGS ) and not any(substr in path.lower() for substr in _BLACKLIST_SUBSTRINGS) } async def _get_text_from_all_locators(page): """Go through locators on page and get text behind them""" all_text = [] for selector in _CLICKABLE_SELECTORS: logger.trace("Checking selector %r", selector) locators = page.locator(selector) locator_count = await locators.count() logger.trace(" - Found %d instances", locator_count) for index in range(locator_count): try: text = await _get_locator_text(locators, index, page) except (PlaywrightError, RBPlaywrightError): continue if text: all_text.append(text) return all_text async def _get_locator_text(locators, index, page): """Get text after clicking on one of the page locators""" locator = locators.nth(index) if not await locator.is_visible(): return None if not await locator.is_enabled(): return None await locator.click(timeout=10_000) return await page.content()