Source code for compass.services.cpu

"""COMPASS Ordinance CPU-bound services"""

import ast
import asyncio
import contextlib
from functools import partial
from concurrent.futures import ProcessPoolExecutor

from elm.web.document import PDFDocument
from elm.utilities.parse import read_pdf, read_pdf_ocr

from compass.services.base import Service


[docs] class ProcessPoolService(Service): """Service that contains a ProcessPoolExecutor instance""" def __init__(self, **kwargs): """ Parameters ---------- **kwargs Keyword-value argument pairs to pass to :class:`concurrent.futures.ProcessPoolExecutor`. By default, ``None``. """ self._ppe_kwargs = kwargs or {} self.pool = None
[docs] def acquire_resources(self): """Open thread pool and temp directory""" self.pool = ProcessPoolExecutor(**self._ppe_kwargs)
[docs] def release_resources(self): """Shutdown thread pool and cleanup temp directory""" self.pool.shutdown(wait=True, cancel_futures=True)
[docs] class PDFLoader(ProcessPoolService): """Class to load PDFs in a ProcessPoolExecutor""" @property def can_process(self): """bool: Always ``True`` (limiting is handled by asyncio)""" return True
[docs] async def process(self, fn, pdf_bytes, **kwargs): """Write URL doc to file asynchronously Parameters ---------- doc : elm.web.document.Document Document containing meta information about the file. Must have a "source" key in the ``attrs`` dict containing the URL, which will be converted to a file name using :func:`compute_fn_from_url`. file_content : str or bytes File content, typically string text for HTML files and bytes for PDF file. make_name_unique : bool, optional Option to make file name unique by adding a UUID at the end of the file name. By default, ``False``. Returns ------- Path Path to output file. """ loop = asyncio.get_running_loop() return await loop.run_in_executor( self.pool, partial(fn, pdf_bytes, **kwargs) )
[docs] class OCRPDFLoader(PDFLoader): """Loader service for OCR"""
def _read_pdf(pdf_bytes, **kwargs): """Utility func so that pdftotext.PDF doesn't have to be pickled""" pages = read_pdf(pdf_bytes, verbose=False) return PDFDocument(pages, **kwargs) def _read_pdf_ocr(pdf_bytes, tesseract_cmd, **kwargs): """Utility function that mimics `_read_pdf`""" if tesseract_cmd: _configure_pytesseract(tesseract_cmd) pages = read_pdf_ocr(pdf_bytes, verbose=False) doc = PDFDocument(_try_decode_ocr_pages(pages), **kwargs) doc.attrs["from_ocr"] = True return doc def _configure_pytesseract(tesseract_cmd): """Set the tesseract_cmd""" import pytesseract # noqa: PLC0415 pytesseract.pytesseract.tesseract_cmd = tesseract_cmd def _try_decode_ocr_pages(pages): """Try to decode pages into strings""" decoded_pages = [] for page in pages: with contextlib.suppress(Exception): page = ast.literal_eval(page).decode("utf-8") # noqa: PLW2901 decoded_pages.append(page) return decoded_pages
[docs] async def read_pdf_doc(pdf_bytes, **kwargs): """Read PDF file from bytes in a Process Pool Parameters ---------- pdf_bytes : bytes Bytes containing PDF file. **kwargs Keyword-value arguments to pass to :class:`elm.web.document.PDFDocument` initializer. Returns ------- elm.web.document.PDFDocument PDFDocument instances with pages loaded as text. """ return await PDFLoader.call(_read_pdf, pdf_bytes, **kwargs)
[docs] async def read_pdf_doc_ocr(pdf_bytes, **kwargs): """Read PDF file using OCR (pytesseract) Note that Pytesseract must be set up properly for this method to work. In particular, the `pytesseract.pytesseract.tesseract_cmd` attribute must be set to point to the pytesseract exe. Parameters ---------- pdf_bytes : bytes Bytes containing PDF file. **kwargs Keyword-value arguments to pass to :class:`elm.web.document.PDFDocument` initializer. Returns ------- elm.web.document.PDFDocument PDFDocument instances with pages loaded as text. """ import pytesseract # noqa: PLC0415 return await OCRPDFLoader.call( _read_pdf_ocr, pdf_bytes, tesseract_cmd=pytesseract.pytesseract.tesseract_cmd, **kwargs, )