compass.services.openai.OpenAIService#

class OpenAIService(client, model_name, rate_limit=1000.0, rate_tracker=None, service_tag=None)[source]#

Bases: LLMService

OpenAI Chat GPT query service

Purpose:

Orchestrate OpenAI API calls.

Responsibilities:
  1. Monitor OpenAI call queue.

  2. Submit calls to OpenAI API if rate limit has not been exceeded.

  3. Track token usage, both instantaneous (rate) and total (if user requests it).

  4. Parse responses into str and pass back to calling function.

Key Relationships:

Must be activated with RunningAsyncServices context.

Parameters:
  • client (openai.AsyncOpenAI or openai.AsyncAzureOpenAI) – Async OpenAI client instance. Must have an async client.chat.completions.create method.

  • model_name (str) – Name of model being used.

  • rate_limit (int or float, optional) – Token rate limit (typically per minute, but the time interval is ultimately controlled by the rate_tracker instance). By default, 1e3.

  • rate_tracker (TimeBoundedUsageTracker, optional) – A TimeBoundedUsageTracker instance. This will be used to track usage per time interval and compare to rate_limit. If None, a TimeBoundedUsageTracker instance is created with default parameters. By default, None.

  • service_tag (str, optional) – optional tag to use to distinguish service (i.e. make unique from other services). Must set this if multiple models with the same name are run concurrently. By default, None.

Methods

acquire_resources()

Use this method to allocate resources, if needed

call(*args, **kwargs)

Call the service

process([usage_tracker, usage_sub_label])

Process a call to OpenAI Chat GPT

process_using_futures(fut, *args, **kwargs)

Process a call to the service

release_resources()

Use this method to clean up resources, if needed

Attributes

MAX_CONCURRENT_JOBS

Max number of concurrent job submissions.

can_process

Check if usage is under the rate limit

name

Unique service name used to pull the correct queue

async process(usage_tracker=None, usage_sub_label=LLMUsageCategory.DEFAULT, **kwargs)[source]#

Process a call to OpenAI Chat GPT

Note that this method automatically retries queries (with backoff) if a rate limit error is throw by the API.

Parameters:
  • model (str) – OpenAI GPT model to query.

  • usage_tracker (compass.services.usage.UsageTracker, optional) – UsageTracker instance. Providing this input will update your tracker with this call’s token usage info. By default, None.

  • usage_sub_label (str, optional) – Optional label to categorize usage under. This can be used to track usage related to certain categories. By default, "default".

  • **kwargs – Keyword arguments to be passed to client.chat.completions.create.

Returns:

str or None – Chat GPT response as a string, or None if the call failed.

MAX_CONCURRENT_JOBS = 10000#

Max number of concurrent job submissions.

acquire_resources()#

Use this method to allocate resources, if needed

async call(*args, **kwargs)#

Call the service

Parameters:

*args, **kwargs – Positional and keyword arguments to be passed to the underlying service processing function.

Returns:

obj – A response object from the underlying service.

property can_process#

Check if usage is under the rate limit

Type:

bool

property name#

Unique service name used to pull the correct queue

Type:

str

async process_using_futures(fut, *args, **kwargs)#

Process a call to the service

Parameters:
  • fut (asyncio.Future) – A future object that should get the result of the processing operation. If the processing function returns answer, this method should call fut.set_result(answer).

  • **kwargs – Keyword arguments to be passed to the underlying processing function.

release_resources()#

Use this method to clean up resources, if needed