Source code for elm.base

# -*- coding: utf-8 -*-
"""
ELM abstract class for API calls
"""
from abc import ABC
import os
import numpy as np
import asyncio
import aiohttp
import openai
import requests
import tiktoken
import time
import logging


logger = logging.getLogger(__name__)


[docs] class ApiBase(ABC): """Class to parse text from a PDF document.""" DEFAULT_MODEL = 'gpt-3.5-turbo' """Default model to do pdf text cleaning.""" EMBEDDING_MODEL = 'text-embedding-ada-002' """Default model to do text embeddings.""" EMBEDDING_URL = 'https://api.openai.com/v1/embeddings' """OpenAI embedding API URL""" URL = 'https://api.openai.com/v1/chat/completions' """OpenAI API URL to be used with environment variable OPENAI_API_KEY. Use an Azure API endpoint to trigger Azure usage along with environment variables AZURE_OPENAI_KEY, AZURE_OPENAI_VERSION, and AZURE_OPENAI_ENDPOINT""" HEADERS = {"Content-Type": "application/json", "Authorization": f"Bearer {openai.api_key}", "api-key": f"{openai.api_key}", } """OpenAI API Headers""" MODEL_ROLE = "You are a research assistant that answers questions." """High level model role""" TOKENIZER_ALIASES = {'gpt-35-turbo': 'gpt-3.5-turbo', 'gpt-4-32k': 'gpt-4-32k-0314', 'llmev-gpt-4-32k': 'gpt-4-32k-0314', 'wetosa-gpt-4': 'gpt-4', 'wetosa-gpt-4-standard': 'gpt-4', 'wetosa-gpt-4o': 'gpt-4o', } """Optional mappings for unusual Azure names to tiktoken/openai names.""" TOKENIZER_PATTERNS = ('gpt-4o', 'gpt-4-32k', 'gpt-4') """Order-prioritized list of model sub-strings to look for in model name to send to tokenizer. As an alternative to alias lookup, this will use the tokenizer pattern if found in the model string""" def __init__(self, model=None): """ Parameters ---------- model : None | str Optional specification of OpenAI model to use. Default is cls.DEFAULT_MODEL """ self.model = model or self.DEFAULT_MODEL self.api_queue = None self.messages = [] self.clear() if 'openai.azure.com' in self.URL.lower(): key = os.environ.get("AZURE_OPENAI_KEY") version = os.environ.get("AZURE_OPENAI_VERSION") endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT") assert key is not None, "Must set AZURE_OPENAI_KEY!" assert version is not None, "Must set AZURE_OPENAI_VERSION!" assert endpoint is not None, "Must set AZURE_OPENAI_ENDPOINT!" self._client = openai.AzureOpenAI(api_key=key, api_version=version, azure_endpoint=endpoint) else: key = os.environ.get("OPENAI_API_KEY") assert key is not None, "Must set OPENAI_API_KEY!" self._client = openai.OpenAI(api_key=key) @property def all_messages_txt(self): """Get a string printout of the full conversation with the LLM Returns ------- str """ messages = [f"{msg['role'].upper()}: {msg['content']}" for msg in self.messages] messages = '\n\n'.join(messages) return messages
[docs] def clear(self): """Clear chat history and reduce messages to just the initial model role message.""" self.messages = [{"role": "system", "content": self.MODEL_ROLE}]
[docs] @staticmethod async def call_api(url, headers, request_json): """Make an asyncronous OpenAI API call. Parameters ---------- url : str OpenAI API url, typically either: https://api.openai.com/v1/embeddings https://api.openai.com/v1/chat/completions headers : dict OpenAI API headers, typically: {"Content-Type": "application/json", "Authorization": f"Bearer {openai.api_key}"} request_json : dict API data input, typically looks like this for chat completion: {"model": "gpt-3.5-turbo", "messages": [{"role": "system", "content": "You do this..."}, {"role": "user", "content": "Do this: {}"}], "temperature": 0.0} Returns ------- out : dict API response in json format """ out = None kwargs = dict(url=url, headers=headers, json=request_json) try: async with aiohttp.ClientSession() as session: async with session.post(**kwargs) as response: out = await response.json() except Exception as e: logger.debug(f'Error in OpenAI API call from ' f'`aiohttp.ClientSession().post(**kwargs)` with ' f'kwargs: {kwargs}') logger.exception('Error in OpenAI API call! Turn on debug logging ' 'to see full query that caused error.') out = {'error': str(e)} return out
[docs] async def call_api_async(self, url, headers, all_request_jsons, ignore_error=None, rate_limit=40e3): """Use GPT to clean raw pdf text in parallel calls to the OpenAI API. NOTE: you need to call this using the await command in ipython or jupyter, e.g.: `out = await PDFtoTXT.clean_txt_async()` Parameters ---------- url : str OpenAI API url, typically either: https://api.openai.com/v1/embeddings https://api.openai.com/v1/chat/completions headers : dict OpenAI API headers, typically: {"Content-Type": "application/json", "Authorization": f"Bearer {openai.api_key}"} all_request_jsons : list List of API data input, one entry typically looks like this for chat completion: {"model": "gpt-3.5-turbo", "messages": [{"role": "system", "content": "You do this..."}, {"role": "user", "content": "Do this: {}"}], "temperature": 0.0} ignore_error : None | callable Optional callable to parse API error string. If the callable returns True, the error will be ignored, the API call will not be tried again, and the output will be an empty string. rate_limit : float OpenAI API rate limit (tokens / minute). Note that the gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large factor of safety (~1/2) because we can only count the tokens on the input side and assume the output is about the same count. Returns ------- out : list List of API outputs where each list entry is a GPT answer from the corresponding message in the all_request_jsons input. """ self.api_queue = ApiQueue(url, headers, all_request_jsons, ignore_error=ignore_error, rate_limit=rate_limit) out = await self.api_queue.run() return out
[docs] def chat(self, query, temperature=0): """Have a continuous chat with the LLM including context from previous chat() calls stored as attributes in this class. Parameters ---------- query : str Question to ask ChatGPT temperature : float GPT model temperature, a measure of response entropy from 0 to 1. 0 is more reliable and nearly deterministic; 1 will give the model more creative freedom and may not return as factual of results. Returns ------- response : str Model response """ self.messages.append({"role": "user", "content": query}) kwargs = dict(model=self.model, messages=self.messages, temperature=temperature, stream=False) response = self._client.chat.completions.create(**kwargs) response = response.choices[0].message.content self.messages.append({'role': 'assistant', 'content': response}) return response
[docs] def generic_query(self, query, model_role=None, temperature=0): """Ask a generic single query without conversation Parameters ---------- query : str Question to ask ChatGPT model_role : str | None Role for the model to take, e.g.: "You are a research assistant". This defaults to self.MODEL_ROLE temperature : float GPT model temperature, a measure of response entropy from 0 to 1. 0 is more reliable and nearly deterministic; 1 will give the model more creative freedom and may not return as factual of results. Returns ------- response : str Model response """ model_role = model_role or self.MODEL_ROLE messages = [{"role": "system", "content": model_role}, {"role": "user", "content": query}] kwargs = dict(model=self.model, messages=messages, temperature=temperature, stream=False) response = self._client.chat.completions.create(**kwargs) response = response.choices[0].message.content return response
[docs] async def generic_async_query(self, queries, model_role=None, temperature=0, ignore_error=None, rate_limit=40e3): """Run a number of generic single queries asynchronously (not conversational) NOTE: you need to call this using the await command in ipython or jupyter, e.g.: `out = await Summary.run_async()` Parameters ---------- query : list Questions to ask ChatGPT (list of strings) model_role : str | None Role for the model to take, e.g.: "You are a research assistant". This defaults to self.MODEL_ROLE temperature : float GPT model temperature, a measure of response entropy from 0 to 1. 0 is more reliable and nearly deterministic; 1 will give the model more creative freedom and may not return as factual of results. ignore_error : None | callable Optional callable to parse API error string. If the callable returns True, the error will be ignored, the API call will not be tried again, and the output will be an empty string. rate_limit : float OpenAI API rate limit (tokens / minute). Note that the gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large factor of safety (~1/2) because we can only count the tokens on the input side and assume the output is about the same count. Returns ------- response : list Model responses with same length as query input. """ model_role = model_role or self.MODEL_ROLE all_request_jsons = [] for msg in queries: msg = [{'role': 'system', 'content': self.MODEL_ROLE}, {'role': 'user', 'content': msg}] req = {"model": self.model, "messages": msg, "temperature": temperature} all_request_jsons.append(req) self.api_queue = ApiQueue(self.URL, self.HEADERS, all_request_jsons, ignore_error=ignore_error, rate_limit=rate_limit) out = await self.api_queue.run() for i, response in enumerate(out): choice = response.get('choices', [{'message': {'content': ''}}])[0] message = choice.get('message', {'content': ''}) content = message.get('content', '') if not any(content): logger.error(f'Received no output for query {i + 1}!') else: out[i] = content return out
[docs] @classmethod def get_embedding(cls, text): """Get the 1D array (list) embedding of a text string. Parameters ---------- text : str Text to embed Returns ------- embedding : list List of float that represents the numerical embedding of the text """ kwargs = dict(url=cls.EMBEDDING_URL, headers=cls.HEADERS, json={'model': cls.EMBEDDING_MODEL, 'input': text}) out = requests.post(**kwargs) embedding = out.json() try: embedding = embedding["data"][0]["embedding"] except Exception as exc: msg = ('Embedding request failed: {} {}' .format(out.reason, embedding)) logger.error(msg) raise RuntimeError(msg) from exc return embedding
[docs] @classmethod def count_tokens(cls, text, model, fallback_model='gpt-4'): """Return the number of tokens in a string. Parameters ---------- text : str Text string to get number of tokens for model : str specification of OpenAI model to use (e.g., "gpt-3.5-turbo") fallback_model : str, default='gpt-4' Model to be used for tokenizer if input model can't be found in :obj:`TOKENIZER_ALIASES` and doesn't have any easily noticeable patterns. Returns ------- n : int Number of tokens in text """ if model in cls.TOKENIZER_ALIASES: token_model = cls.TOKENIZER_ALIASES[model] else: token_model = fallback_model for pattern in cls.TOKENIZER_PATTERNS: if pattern in model: token_model = pattern break encoding = tiktoken.encoding_for_model(token_model) return len(encoding.encode(text))
[docs] class ApiQueue: """Class to manage the parallel API queue and submission""" def __init__(self, url, headers, request_jsons, ignore_error=None, rate_limit=40e3, max_retries=10): """ Parameters ---------- url : str OpenAI API url, typically either: https://api.openai.com/v1/embeddings https://api.openai.com/v1/chat/completions headers : dict OpenAI API headers, typically: {"Content-Type": "application/json", "Authorization": f"Bearer {openai.api_key}"} all_request_jsons : list List of API data input, one entry typically looks like this for chat completion: {"model": "gpt-3.5-turbo", "messages": [{"role": "system", "content": "You do this..."}, {"role": "user", "content": "Do this: {}"}], "temperature": 0.0} ignore_error : None | callable Optional callable to parse API error string. If the callable returns True, the error will be ignored, the API call will not be tried again, and the output will be an empty string. rate_limit : float OpenAI API rate limit (tokens / minute). Note that the gpt-3.5-turbo limit is 90k as of 4/2023, but we're using a large factor of safety (~1/2) because we can only count the tokens on the input side and assume the output is about the same count. max_retries : int Number of times to retry an API call with an error response before raising an error. """ self.url = url self.headers = headers self.request_jsons = request_jsons self.ignore_error = ignore_error self.rate_limit = rate_limit self.max_retries = max_retries self.api_jobs = None self.todo = None self.out = None self.errors = None self.tries = None self._retry = False self._tsub = 0 self._reset() self.job_names = [f'job_{str(ijob).zfill(4)}' for ijob in range(len(request_jsons))] def _reset(self): self.api_jobs = {} self.todo = [True] * len(self) self.out = [None] * len(self) self.errors = [None] * len(self) self.tries = np.zeros(len(self), dtype=int) self._retry = False self._tsub = 0 def __len__(self): """Number of API calls to submit""" return len(self.request_jsons) @property def waiting_on(self): """Get a list of async jobs that are being waited on.""" return [job for ijob, job in self.api_jobs.items() if self.todo[ijob]]
[docs] def submit_jobs(self): """Submit a subset jobs asynchronously and hold jobs in the `api_jobs` attribute. Break when the `rate_limit` is exceeded.""" token_count = 0 t_elap = (time.time() - self._tsub) / 60 avail_tokens = self.rate_limit * t_elap avail_tokens = min(self.rate_limit, avail_tokens) for ijob, itodo in enumerate(self.todo): if (ijob not in self.api_jobs and itodo and token_count < avail_tokens): request = self.request_jsons[ijob] model = request['model'] tokens = ApiBase.count_tokens(str(request), model) if tokens > self.rate_limit: msg = ('Job index #{} with has {} tokens which ' 'is greater than the rate limit of {}!' .format(ijob, tokens, self.rate_limit)) logger.error(msg) raise RuntimeError(msg) elif tokens < avail_tokens: token_count += tokens task = asyncio.create_task(ApiBase.call_api(self.url, self.headers, request), name=self.job_names[ijob]) self.api_jobs[ijob] = task self.tries[ijob] += 1 self._tsub = time.time() logger.debug('Submitted "{}" ({} out of {}). ' 'Token count: {} ' '(rate limit is {}). ' 'Attempts: {}' .format(self.job_names[ijob], ijob + 1, len(self), token_count, self.rate_limit, self.tries[ijob])) elif token_count >= avail_tokens: token_count = 0 break
[docs] async def collect_jobs(self): """Collect asyncronous API calls and API outputs. Store outputs in the `out` attribute.""" if not any(self.waiting_on): return complete, _ = await asyncio.wait(self.waiting_on, return_when=asyncio.FIRST_COMPLETED) for job in complete: job_name = job.get_name() ijob = self.job_names.index(job_name) task_out = job.result() if 'error' in task_out: msg = ('Received API error for task #{0} ' '(see `ApiQueue.errors[{1}]` and ' '`ApiQueue.request_jsons[{1}]` for more details). ' 'Error message: {2}'.format(ijob + 1, ijob, task_out)) self.errors[ijob] = 'Error: {}'.format(task_out) if (self.ignore_error is not None and self.ignore_error(str(task_out))): msg += ' Ignoring error and moving on.' dummy = {'choices': [{'message': {'content': ''}}]} self.out[ijob] = dummy self.todo[ijob] = False else: del self.api_jobs[ijob] msg += ' Retrying query.' self._retry = True logger.error(msg) else: self.out[ijob] = task_out self.todo[ijob] = False n_complete = len(self) - sum(self.todo) logger.debug('Finished {} API calls, {} left' .format(n_complete, sum(self.todo)))
[docs] async def run(self): """Run all asyncronous API calls. Returns ------- out : list List of API call outputs with same ordering as `request_jsons` input. """ self._reset() logger.debug('Submitting async API calls...') i = 0 while any(self.todo): i += 1 self._retry = False self.submit_jobs() await self.collect_jobs() if any(self.tries > self.max_retries): msg = (f'Hit {self.max_retries} retries on API queries. ' 'Stopping. See `ApiQueue.errors` for more ' 'details on error response') logger.error(msg) raise RuntimeError(msg) elif self._retry: time.sleep(10) elif i > 1e4: raise RuntimeError('Hit 1e4 iterations. What are you doing?') elif any(self.todo): time.sleep(5) return self.out