Source code for compass.services.base

"""COMPASS abstract Service class"""

import asyncio
import logging
from abc import ABC, abstractmethod

from compass.services.queues import get_service_queue
from compass.exceptions import COMPASSNotInitializedError


logger = logging.getLogger(__name__)
MISSING_SERVICE_MESSAGE = """Must initialize the queue for {service_name!r}.
You can likely use the following code structure to fix this:

    from compass.services.provider import RunningAsyncServices

    services = [
        ...
        {service_name}(...),
        ...
    ]
    async with RunningAsyncServices(services):
        # function call here

"""


[docs] class Service(ABC): """Abstract base class for a Service that can be queued to run""" MAX_CONCURRENT_JOBS = 10_000 """Max number of concurrent job submissions.""" @classmethod def _queue(cls): """Get queue for class.""" service_name = cls.__name__ queue = get_service_queue(service_name) if queue is None: msg = MISSING_SERVICE_MESSAGE.format(service_name=service_name) raise COMPASSNotInitializedError(msg) return queue
[docs] @classmethod async def call(cls, *args, **kwargs): """Call the service Parameters ---------- *args, **kwargs Positional and keyword arguments to be passed to the underlying service processing function. Returns ------- obj A response object from the underlying service. """ fut = asyncio.Future() outer_task_name = asyncio.current_task().get_name() await cls._queue().put((fut, outer_task_name, args, kwargs)) return await fut
@property def name(self): """str: Service name used to pull the correct queue object""" return self.__class__.__name__
[docs] async def process_using_futures(self, fut, *args, **kwargs): """Process a call to the service Parameters ---------- fut : asyncio.Future A future object that should get the result of the processing operation. If the processing function returns ``answer``, this method should call ``fut.set_result(answer)``. **kwargs Keyword arguments to be passed to the underlying processing function. """ try: response = await self.process(*args, **kwargs) except Exception as e: # noqa: BLE001 fut.set_exception(e) return fut.set_result(response)
[docs] def acquire_resources(self): # noqa: B027 """Use this method to allocate resources, if needed"""
[docs] def release_resources(self): # noqa: B027 """Use this method to clean up resources, if needed"""
@property @abstractmethod def can_process(self): """Check if process function can be called. This should be a fast-running method that returns a boolean indicating whether or not the service can accept more processing calls. """
[docs] @abstractmethod async def process(self, *args, **kwargs): """Process a call to the service. Parameters ---------- *args, **kwargs Positional and keyword arguments to be passed to the underlying processing function. """
[docs] class LLMService(Service): """Base class for LLm service This service differs from other services in that it must be used as an object, not as a class. that is, users must initialize it and pass it around in functions in order to use it. """ def __init__(self, model_name, rate_limit, rate_tracker, service_tag=None): """ Parameters ---------- model_name : str Name of model being used. rate_limit : int or float Max usage per duration of the rate tracker. For example, if the rate tracker is set to compute the total over minute-long intervals, this value should be the max usage per minute. rate_tracker : `TimeBoundedUsageTracker` A TimeBoundedUsageTracker instance. This will be used to track usage per time interval and compare to `rate_limit`. service_tag : str, optional Optional tag to use to distinguish service (i.e. make unique from other services). Must set this if multiple models with the same name are run concurrently. By default, ``None``. """ self.model_name = model_name self.rate_limit = rate_limit self.rate_tracker = rate_tracker self.service_tag = service_tag or "" @property def can_process(self): """bool: Check if usage is under the rate limit""" return self.rate_tracker.total < self.rate_limit @property def name(self): """str: Unique service name used to pull the correct queue""" return f"{self.__class__.__name__}-{self.model_name}{self.service_tag}" def _queue(self): """Get queue for class""" queue = get_service_queue(self.name) if queue is None: msg = MISSING_SERVICE_MESSAGE.format(service_name=self.name) raise COMPASSNotInitializedError(msg) return queue
[docs] async def call(self, *args, **kwargs): """Call the service Parameters ---------- *args, **kwargs Positional and keyword arguments to be passed to the underlying service processing function. Returns ------- obj A response object from the underlying service. """ fut = asyncio.Future() outer_task_name = asyncio.current_task().get_name() await self._queue().put((fut, outer_task_name, args, kwargs)) return await fut