elm.ords.services.openai.OpenAIService
- class OpenAIService(client, rate_limit=1000.0, rate_tracker=None)[source]
Bases:
RateLimitedService
OpenAI Chat GPT query service
- Parameters:
client (openai.AsyncOpenAI | openai.AsyncAzureOpenAI) – Async OpenAI client instance. Must have an async client.chat.completions.create method.
rate_limit (int | float, optional) – Token rate limit (typically per minute, but the time interval is ultimately controlled by the rate_tracker instance). By default,
1e3
.rate_tracker (TimeBoundedUsageTracker, optional) – A TimeBoundedUsageTracker instance. This will be used to track usage per time interval and compare to rate_limit. If
None
, a TimeBoundedUsageTracker instance is created with default parameters. By default,None
.
Methods
Use this method to allocate resources, if needed
call
(*args, **kwargs)Call the service.
process
([usage_tracker, usage_sub_label])Process a call to OpenAI Chat GPT.
process_using_futures
(fut, *args, **kwargs)Process a call to the service.
Use this method to clean up resources, if needed
Attributes
Max number of concurrent job submissions.
Check if usage is under the rate limit.
Service name used to pull the correct queue object.
- async process(usage_tracker=None, usage_sub_label='default', *, model, **kwargs)[source]
Process a call to OpenAI Chat GPT.
Note that this method automatically retries queries (with backoff) if a rate limit error is throw by the API.
- Parameters:
model (str) – OpenAI GPT model to query.
usage_tracker (elm.ords.services.usage.UsageTracker, optional) – UsageTracker instance. Providing this input will update your tracker with this call’s token usage info. By default,
None
.usage_sub_label (str, optional) – Optional label to categorize usage under. This can be used to track usage related to certain categories. By default,
"default"
.**kwargs – Keyword arguments to be passed to client.chat.completions.create.
- Returns:
str | None – Chat GPT response as a string, or
None
if the call failed.
- MAX_CONCURRENT_JOBS = 10000
Max number of concurrent job submissions.
- acquire_resources()
Use this method to allocate resources, if needed
- async classmethod call(*args, **kwargs)
Call the service.
- Parameters:
*args, **kwargs – Positional and keyword arguments to be passed to the underlying service processing function.
- Returns:
obj – A response object from the underlying service.
- property can_process
Check if usage is under the rate limit.
- async process_using_futures(fut, *args, **kwargs)
Process a call to the service.
- Parameters:
fut (asyncio.Future) – A future object that should get the result of the processing operation. If the processing function returns
answer
, this method should callfut.set_result(answer)
.**kwargs – Keyword arguments to be passed to the underlying processing function.
- release_resources()
Use this method to clean up resources, if needed