# -*- coding: utf-8 -*-
"""ELM Web Scraping - Google search."""
import pprint
import asyncio
import logging
from itertools import zip_longest, chain
from contextlib import AsyncExitStack
from playwright.async_api import (
async_playwright,
TimeoutError as PlaywrightTimeoutError,
)
from elm.web.document import PDFDocument
from elm.web.file_loader import AsyncFileLoader
from elm.web.utilities import clean_search_query
logger = logging.getLogger(__name__)
_SEARCH_RESULT_TAG = '[jsname="UWckNb"]'
[docs]
class PlaywrightGoogleLinkSearch:
"""Search for top results on google and return their links
Purpose:
Search Google using Playwright engine.
Responsibilities:
1. Launch browser using Playwright and navigate to Google.
2. Submit a query to Google Search.
3. Get list of resulting URLs.
Key Relationships:
Relies on `Playwright <https://playwright.dev/python/>`_ for
web access.
.. end desc
"""
EXPECTED_RESULTS_PER_PAGE = 10
"""Number of results displayed per Google page. """
def __init__(self, **launch_kwargs):
"""
Parameters
----------
**launch_kwargs
Keyword arguments to be passed to
`playwright.chromium.launch`. For example, you can pass
``headless=False, slow_mo=50`` for a visualization of the
search.
"""
self.launch_kwargs = launch_kwargs
self._browser = None
async def _load_browser(self, pw_instance):
"""Launch a chromium instance and load a page"""
self._browser = await pw_instance.chromium.launch(**self.launch_kwargs)
async def _close_browser(self):
"""Close browser instance and reset internal attributes"""
await self._browser.close()
self._browser = None
async def _search(self, query, num_results=10):
"""Search google for links related to a query."""
logger.debug("Searching Google: %r", query)
num_results = min(num_results, self.EXPECTED_RESULTS_PER_PAGE)
page = await self._browser.new_page()
await _navigate_to_google(page)
await _perform_google_search(page, query)
return await _extract_links(page, num_results)
async def _skip_exc_search(self, query, num_results=10):
"""Perform search while ignoring timeout errors"""
try:
return await self._search(query, num_results=num_results)
except PlaywrightTimeoutError as e:
logger.exception(e)
return []
async def _get_links(self, queries, num_results):
"""Get links for multiple queries"""
outer_task_name = asyncio.current_task().get_name()
async with async_playwright() as pw_instance:
await self._load_browser(pw_instance)
searches = [
asyncio.create_task(
self._skip_exc_search(query, num_results=num_results),
name=outer_task_name,
)
for query in queries
]
results = await asyncio.gather(*searches)
await self._close_browser()
return results
[docs]
async def results(self, *queries, num_results=10):
"""Retrieve links for the first `num_results` of each query.
This function executes a google search for each input query and
returns a list of links corresponding to the top `num_results`.
Parameters
----------
num_results : int, optional
Number of top results to retrieve for each query. Note that
this value can never exceed the number of results per page
(typically 10). If you pass in a larger value, it will be
reduced to the number of results per page.
By default, ``10``.
Returns
-------
list
List equal to the length of the input queries, where each
entry is another list containing the top `num_results`
links.
"""
queries = map(clean_search_query, queries)
return await self._get_links(queries, num_results)
[docs]
async def google_results_as_docs(
queries,
num_urls=None,
browser_semaphore=None,
task_name=None,
**file_loader_kwargs,
):
"""Retrieve top ``N`` google search results as document instances.
Parameters
----------
queries : collection of str
Collection of strings representing google queries. Documents for
the top `num_urls` google search results (from all of these
queries _combined_ will be returned from this function.
num_urls : int, optional
Number of unique top Google search result to return as docs. The
google search results from all queries are interleaved and the
top `num_urls` unique URL's are downloaded as docs. If this
number is less than ``len(queries)``, some of your queries may
not contribute to the final output. By default, ``None``, which
sets ``num_urls = 3 * len(queries)``.
browser_semaphore : :class:`asyncio.Semaphore`, optional
Semaphore instance that can be used to limit the number of
playwright browsers open concurrently. If ``None``, no limits
are applied. By default, ``None``.
task_name : str, optional
Optional task name to use in :func:`asyncio.create_task`.
By default, ``None``.
**file_loader_kwargs
Keyword-argument pairs to initialize
:class:`elm.web.file_loader.AsyncFileLoader` with. If found, the
"pw_launch_kwargs" key in these will also be used to initialize
the :class:`elm.web.google_search.PlaywrightGoogleLinkSearch`
used for the google URL search. By default, ``None``.
Returns
-------
list of :class:`elm.web.document.BaseDocument`
List of documents representing the top `num_urls` results from
the google searches across all `queries`.
"""
pw_launch_kwargs = file_loader_kwargs.get("pw_launch_kwargs", {})
urls = await _find_urls(
queries,
num_results=10,
browser_sem=browser_semaphore,
task_name=task_name,
**pw_launch_kwargs
)
num_urls = num_urls or 3 * len(queries)
urls = _down_select_urls(urls, num_urls=num_urls)
logger.debug("Downloading documents for URLS: \n\t-%s", "\n\t-".join(urls))
docs = await _load_docs(urls, browser_semaphore, **file_loader_kwargs)
return docs
[docs]
async def filter_documents(
documents, validation_coroutine, task_name=None, **kwargs
):
"""Filter documents by applying a filter function to each.
Parameters
----------
documents : iter of :class:`elm.web.document.BaseDocument`
Iterable of documents to filter.
validation_coroutine : coroutine
A coroutine that returns ``False`` if the document should be
discarded and ``True`` otherwise. This function should take a
single :class:`elm.web.document.BaseDocument` instance as the
first argument. The function may have other arguments, which
will be passed down using `**kwargs`.
task_name : str, optional
Optional task name to use in :func:`asyncio.create_task`.
By default, ``None``.
**kwargs
Keyword-argument pairs to pass to `validation_coroutine`. This
should not include the document instance itself, which will be
independently passed in as the first argument.
Returns
-------
list of :class:`elm.web.document.BaseDocument`
List of documents that passed the validation check, sorted by
text length, with PDF documents taking the highest precedence.
"""
searchers = [
asyncio.create_task(
validation_coroutine(doc, **kwargs), name=task_name
)
for doc in documents
]
output = await asyncio.gather(*searchers)
filtered_docs = [doc for doc, check in zip(documents, output) if check]
return sorted(
filtered_docs,
key=lambda doc: (not isinstance(doc, PDFDocument), len(doc.text)),
)
async def _find_urls(
queries, num_results=10, browser_sem=None, task_name=None, **kwargs
):
"""Parse google search output for URLs."""
searchers = [
asyncio.create_task(
_search_single(
query, browser_sem, num_results=num_results, **kwargs
),
name=task_name,
)
for query in queries
]
return await asyncio.gather(*searchers)
async def _search_single(question, browser_sem, num_results=10, **kwargs):
"""Perform a single google search."""
if browser_sem is None:
browser_sem = AsyncExitStack()
search_engine = PlaywrightGoogleLinkSearch(**kwargs)
async with browser_sem:
return await search_engine.results(question, num_results=num_results)
def _down_select_urls(search_results, num_urls=5):
"""Select the top 5 URLs."""
all_urls = chain.from_iterable(
zip_longest(*[results[0] for results in search_results])
)
urls = set()
for url in all_urls:
if not url:
continue
urls.add(url)
if len(urls) == num_urls:
break
return urls
async def _load_docs(urls, browser_semaphore=None, **kwargs):
"""Load a document for each input URL."""
file_loader = AsyncFileLoader(
browser_semaphore=browser_semaphore, **kwargs
)
docs = await file_loader.fetch_all(*urls)
logger.debug(
"Loaded the following number of pages for docs: %s",
pprint.PrettyPrinter().pformat(
{
doc.metadata.get("source", "Unknown"): len(doc.pages)
for doc in docs
}
),
)
return [doc for doc in docs if not doc.empty]
async def _navigate_to_google(page):
"""Navigate to Google domain."""
await page.goto("https://www.google.com")
await page.wait_for_load_state("networkidle")
async def _perform_google_search(page, search_query):
"""Fill in search bar with user query and click search button"""
await page.get_by_label("Search", exact=True).fill(search_query)
await _close_autofill_suggestions(page)
await page.get_by_role("button", name="Google Search").click()
async def _close_autofill_suggestions(page):
"""Google autofill suggestions often get in way of search button.
We get around this by closing the suggestion dropdown before
looking for the search button. Looking for the "Google Search"
button doesn't work because it is sometimes obscured by the dropdown
menu. Clicking the "Google" logo can also fail when they add
seasonal links/images (e.g. holiday logos). Current solutions is to
look for a specific div at the top of the page.
"""
await page.locator("#gb").click()
async def _extract_links(page, num_results):
"""Extract links for top `num_results` on page"""
links = await asyncio.to_thread(page.locator, _SEARCH_RESULT_TAG)
return [
await links.nth(i).get_attribute("href") for i in range(num_results)
]