elm.ords.services.openai.OpenAIService

class OpenAIService(client, rate_limit=1000.0, rate_tracker=None)[source]

Bases: RateLimitedService

OpenAI Chat GPT query service

Purpose:

Orchestrate OpenAI API calls.

Responsibilities:
  1. Monitor OpenAI call queue.

  2. Submit calls to OpenAI API if rate limit has not been exceeded.

  3. Track token usage, both instantaneous (rate) and total (if user requests it).

  4. Parse responses into str and pass back to calling function.

Key Relationships:

Must be activated with RunningAsyncServices context.

Parameters:
  • client (openai.AsyncOpenAI | openai.AsyncAzureOpenAI) – Async OpenAI client instance. Must have an async client.chat.completions.create method.

  • rate_limit (int | float, optional) – Token rate limit (typically per minute, but the time interval is ultimately controlled by the rate_tracker instance). By default, 1e3.

  • rate_tracker (TimeBoundedUsageTracker, optional) – A TimeBoundedUsageTracker instance. This will be used to track usage per time interval and compare to rate_limit. If None, a TimeBoundedUsageTracker instance is created with default parameters. By default, None.

Methods

acquire_resources()

Use this method to allocate resources, if needed

call(*args, **kwargs)

Call the service.

process([usage_tracker, usage_sub_label])

Process a call to OpenAI Chat GPT.

process_using_futures(fut, *args, **kwargs)

Process a call to the service.

release_resources()

Use this method to clean up resources, if needed

Attributes

MAX_CONCURRENT_JOBS

Max number of concurrent job submissions.

can_process

Check if usage is under the rate limit.

name

Service name used to pull the correct queue object.

async process(usage_tracker=None, usage_sub_label='default', *, model, **kwargs)[source]

Process a call to OpenAI Chat GPT.

Note that this method automatically retries queries (with backoff) if a rate limit error is throw by the API.

Parameters:
  • model (str) – OpenAI GPT model to query.

  • usage_tracker (elm.ords.services.usage.UsageTracker, optional) – UsageTracker instance. Providing this input will update your tracker with this call’s token usage info. By default, None.

  • usage_sub_label (str, optional) – Optional label to categorize usage under. This can be used to track usage related to certain categories. By default, "default".

  • **kwargs – Keyword arguments to be passed to client.chat.completions.create.

Returns:

str | None – Chat GPT response as a string, or None if the call failed.

MAX_CONCURRENT_JOBS = 10000

Max number of concurrent job submissions.

acquire_resources()

Use this method to allocate resources, if needed

async classmethod call(*args, **kwargs)

Call the service.

Parameters:

*args, **kwargs – Positional and keyword arguments to be passed to the underlying service processing function.

Returns:

obj – A response object from the underlying service.

property can_process

Check if usage is under the rate limit.

property name

Service name used to pull the correct queue object.

Type:

str

async process_using_futures(fut, *args, **kwargs)

Process a call to the service.

Parameters:
  • fut (asyncio.Future) – A future object that should get the result of the processing operation. If the processing function returns answer, this method should call fut.set_result(answer).

  • **kwargs – Keyword arguments to be passed to the underlying processing function.

release_resources()

Use this method to clean up resources, if needed