Правильный способ осуществления вызовов к API ChatGPT

Правильный способ вызова API ChatGPT

Как делать надежные вызовы к API ChatGPT для создания надежных приложений

LLM теперь повсюду, особенно ChatGPT. На его основе создается огромное количество приложений, и если вы еще не пробовали, вам следует попробовать.

Создано с помощью Midjourney.

Построение приложений на основе ChatGPT, скорее всего, потребует от вас совершения нескольких параллельных вызовов. К сожалению, вы не единственный. С таким количеством приложений, выполняющих миллионы запросов в день (КУДОС их команде разработчиков, кстати), API часто возвращает ошибку “слишком много запросов”. Поэтому нам нужен хороший способ обработки таких ошибок при совершении нескольких параллельных вызовов.

В этом небольшом учебнике на языке Python мы рассмотрим две важные темы, связанные с эффективным выполнением вызовов к API ChatGPT:

  1. Выполнение нескольких вызовов параллельно
  2. Повтор вызовов в случае ошибок

1. Выполнение нескольких вызовов параллельно

Самый простой способ выполнить вызов – выполнить его синхронно, то есть отправить запрос и дождаться получения ответа, чтобы продолжить программу. Мы можем сделать это просто следующим образом:

import requestsheaders = {    "Content-Type": "application/json",    "Authorization": f"Bearer {OPENAI_API_KEY}"}response_json = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json={    "model": "gpt-3.5-turbo",    "messages": [{"role": "user", "content": "ping"}],    "temperature": 0}).json()print(response_json["choices"][0]["message"]["content"])

Pong!

Если мы работаем в простой системе, это нормально. Однако, если мы хотим выполнить несколько вызовов параллельно к API или другим ресурсам, таким как база данных, мы можем сделать это асинхронно для более быстрых ответов.

Выполнение задач асинхронно вызовет каждое действие и будет ожидать их завершения параллельно, что сократит время ожидания.

Простейший способ сделать это – создать разные потоки для обработки каждого запроса, однако есть более эффективный способ сделать это с использованием асинхронных вызовов.

Выполнение асинхронного вызова часто более эффективно, поскольку вы можете указать точные места, где ваше приложение должно ожидать, в то время как в традиционном многопоточном программировании система автоматически ставит потоки на ожидание, что может быть неоптимальным.

Ниже приведен пример, показывающий разницу между использованием синхронных и асинхронных вызовов.

# Синхронный вызовimport timedef delay_print(msg):    print(msg, end=" ")    time.sleep(1)def sync_print():    for i in range(10):        delay_print(i)start_time = time.time()sync_print()print("\n", time.time() - start_time, "секунды.")

0 1 2 3 4 5 6 7 8 9  10.019574642181396 секунды.

# Асинхронный вызовimport asyncioasync def delay_print_async(msg):    print(msg, end=" ")    await asyncio.sleep(1)async def async_print():    asyncio.gather(*[delay_print_async(i) for i in range(10)])start_time = time.time()await async_print()print("\n", time.time() - start_time, "секунды.")

0.0002448558807373047 секунды.0 1 2 3 4 5 6 7 8 9 

Метод asyncio.gather вызовет все асинхронные вызовы, переданные ему, и вернет их результаты, как только они будут готовы.

К сожалению, выполнение асинхронных вызовов с использованием библиотеки requests невозможно. Для этого вы можете использовать библиотеку aiohttp. Ниже приведен пример того, как выполнить асинхронный вызов с использованием aiohttp.

import aiohttpasync def get_completion(content):    async with aiohttp.ClientSession() as session:        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            return response_json["choices"][0]['message']["content"]await get_completion("Ping")

Pong!

Как уже было сказано ранее, для выполнения асинхронных запросов нам необходимо использовать метод asyncio.gather.

async def get_completion_list(content_list):    return await asyncio.gather(*[get_completion(content) for content in content_list])await get_completion_list(["ping", "pong"]*5)

['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']

Хотя это работает, выполнять вызовы таким образом не является идеальным, поскольку мы создаем объект сеанса для каждого вызова. Мы можем сэкономить ресурсы и время, повторно используя тот же объект сеанса, вот так:

async def get_completion(content, session):    async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={        "model": "gpt-3.5-turbo",        "messages": [{"role": "user", "content": content}],        "temperature": 0    }) as resp:        response_json = await resp.json()        return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list):    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session) for content in content_list])await get_completion_list(["ping", "pong"]*5)

Просто, верно? С помощью этого вы можете легко выполнять несколько вызовов. Однако одна проблема заключается в том, что часто не рекомендуется выполнять неограниченное количество вызовов таким образом, так как вы можете перегрузить систему и быть наказанным запретом на дополнительные запросы в течение некоторого времени (верьте мне, это произойдет). Поэтому хорошей идеей является ограничение количества вызовов, которые можно выполнять одновременно. Вы можете легко сделать это с помощью класса asyncio.Semaphore.

Класс Semaphore создает контекстный менеджер, который будет управлять количеством асинхронных вызовов, которые в настоящее время выполняются в его контексте. Если достигнуто максимальное количество, он будет блокировать выполнение до завершения некоторых вызовов.

async def get_completion(content, session, semaphore):    async with semaphore:        await asyncio.sleep(1)        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session, semaphore) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("Время выполнения: ", time.perf_counter() - start_time, "секунд.")print(completion_list)

Время выполнения:  1.8094507199984946 секунд.['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']

Дополнительным моментом здесь является отчет о ходе выполнения вызовов. Вы можете сделать это, создав небольшой класс, который будет хранить прогресс и разделяться между всеми вызовами. Вы можете сделать это следующим образом:

class ProgressLog:    def __init__(self, total):        self.total = total        self.done = 0    def increment(self):        self.done = self.done + 1    def __repr__(self):        return f"Выполнено {self.done}/{self.total}."async def get_completion(content, session, semaphore, progress_log):    async with semaphore:        await asyncio.sleep(1)        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            progress_log.increment()            print(progress_log)            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    progress_log = ProgressLog(len(content_list))    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("Время выполнения: ", time.perf_counter() - start_time, "секунд.")print(completion_list)

Выполнено 1/10.Выполнено 2/10.Выполнено 3/10.Выполнено 4/10.Выполнено 5/10.Выполнено 6/10.Выполнено 7/10.Выполнено 8/10.Выполнено 9/10.Выполнено 10/10.Время выполнения:  1.755018908999773 секунд.['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']

Этот раздел посвящен тому, как выполнять несколько асинхронных запросов. С его помощью вы можете выполнять несколько асинхронных вызовов, ограничивать количество вызовов за раз и отображать прогресс. Однако все еще есть несколько проблем, которые нужно решить.

Сделанные запросы могут не выполниться по нескольким причинам, таким как перегрузка сервера, обрыв соединения, неправильные запросы и т. д. Это может вызывать исключения или возвращать непредсказуемые ответы, поэтому нам нужно обрабатывать эти случаи и автоматически повторять неудачные вызовы.

2. Повторить вызовы в случае неудачи

Для обработки неудачных вызовов мы будем использовать библиотеку tenacity. Tenacity предоставляет декораторы функций, которые автоматически повторяют вызов нашей функции в случае возникновения исключения.

from tenacity import (    retry,    stop_after_attempt,    wait_random_exponential,)

Чтобы добавить функциональность повтора к нашим вызовам, нам понадобится разместить декоратор @retry. Использование его без дополнительных параметров будет означать, что функция будет повторяться немедленно и бесконечно при каждой неудаче. Это не очень хорошо по нескольким причинам.

Одна из них заключается в том, что наш вызов функции может не выполниться из-за перегрузки сервера, поэтому разумно подождать некоторое время перед повторной попыткой. Для указания времени ожидания мы будем использовать подход экспоненциальной задержки с помощью параметра wait=wait_random_exponential(min=min_value, max=max_value). Это увеличит время ожидания с каждой неудачей функции.

Еще одна опциональная вещь – это запись сообщений при каждой повторной попытке. Мы можем сделать это, предоставив некоторую функцию параметру before_sleep. Здесь мы будем использовать функцию print, однако лучше использовать модуль logging и передать функцию logging.error или logging.debug в этот параметр.

Чтобы продемонстрировать это, мы сгенерируем случайные исключения.

import randomclass ProgressLog:    def __init__(self, total):        self.total = total        self.done = 0    def increment(self):        self.done = self.done + 1    def __repr__(self):        return f"Выполнено {self.done}/{self.total}."@retry(wait=wait_random_exponential(min=1, max=60), before_sleep=print)async def get_completion(content, session, semaphore, progress_log):    async with semaphore:        #await asyncio.sleep(1)        if random.random() < 0.2:            raise Exception("Случайное исключение")        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            progress_log.increment()            print(progress_log)            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    progress_log = ProgressLog(len(content_list))    async with aiohttp.ClientSession() as session:        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])start_time = time.perf_counter()completion_list = await get_completion_list(["ping", "pong"]*5, 100)print("Затраченное время: ", time.perf_counter() - start_time, "секунд.")print(completion_list)

<RetryCallState 133364377433616: попытка №1; время ожидания 0.74; последний результат: неудача (Исключение: Случайное исключение)><RetryCallState 133364377424496: попытка №1; время ожидания 0.79; последний результат: неудача (Исключение: Случайное исключение)>Выполнено 1/10.Выполнено 2/10.Выполнено 3/10.Выполнено 4/10.Выполнено 5/10.Выполнено 6/10.Выполнено 7/10.Выполнено 8/10.Выполнено 9/10.Выполнено 10/10.Затраченное время:  1.1305301820011664 секунд.['Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!', 'Pong!', 'Ping!']

Таким образом, наша функция будет ждать некоторое время перед повторной попыткой. Однако причиной неудачи может быть систематическая ошибка из-за простоя сервера или неправильной нагрузки, например. В этом случае мы хотим ограничить количество повторных попыток. Мы можем сделать это с помощью параметра stop=stop_after_attempt(n).

import random

class ProgressLog:
    def __init__(self, total):
        self.total = total
        self.done = 0
    
    def increment(self):
        self.done = self.done + 1
    
    def __repr__(self):
        return f"Выполнено {self.done}/{self.total}."

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print)
async def get_completion(content, session, semaphore, progress_log):
    async with semaphore:
        #await asyncio.sleep(1)
        if random.random() < 0.9:
            raise Exception("Случайное исключение")
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": content}],
            "temperature": 0
        }) as resp:
            response_json = await resp.json()
            progress_log.increment()
            print(progress_log)
            return response_json["choices"][0]['message']["content"]

async def get_completion_list(content_list, max_parallel_calls):
    semaphore = asyncio.Semaphore(value=max_parallel_calls)
    progress_log = ProgressLog(len(content_list))
    async with aiohttp.ClientSession() as session:
        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Времени затрачено: ", time.perf_counter() - start_time, "секунд.")
print(completion_list)

<RetryCallState 133364608660048: попытка #1; спал 0.1; последний результат: неудача (Случайное исключение)>
<RetryCallState 133364377435680: попытка #1; спал 0.71; последний результат: неудача (Случайное исключение)>
<RetryCallState 133364377421472: попытка #1; спал 0.17; последний результат: неудача (Случайное исключение)>
<RetryCallState 133364377424256: попытка #1; спал 0.37; последний результат: неудача (Случайное исключение)>
<RetryCallState 133364377430928: попытка #1; спал 0.87; последний результат: неудача (Случайное исключение)>
<RetryCallState 133364377420752: попытка #1; спал 0.42; последний результат: неудача (Случайное исключение)>
<RetryCallState 133364377422576: попытка #1; спал 0.47; последний результат: неудача (Случайное исключение)>
<RetryCallState 133364377431312: попытка #1; спал 0.11; последний результат: неудача (Случайное исключение)>
<RetryCallState 133364377425840: попытка #1; спал 0.69; последний результат: неудача (Случайное исключение)>
<RetryCallState 133364377424592: попытка #1; спал 0.89; последний результат: неудача (Случайное исключение)>

---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/_asyncio.py in __call__(self, fn, *args, **kwargs)
     49                 try:
---> 50                     result = await fn(*args, **kwargs)
     51                 except BaseException:  # noqa: B9025 frames

Exception: Случайное исключение

The above exception was the direct cause of the following exception:

RetryError                                Traceback (most recent call last)
/usr/local/lib/python3.10/dist-packages/tenacity/__init__.py in iter(self, retry_state)
    324             if self.reraise:
    325                 raise retry_exc.reraise()
--> 326             raise retry_exc from fut.exception()
    327 
    328         if self.wait:

RetryError: RetryError[]

При заданном параметре RetryError будет возникать, когда количество попыток достигнет максимального значения. Однако, может быть так, что мы хотим продолжить выполнение без генерации исключения, сохраняя только значение None возвращаемого вызова для обработки его позже. Для этого мы можем использовать функцию обратного вызова retry_error_callback, чтобы вернуть значение None в случае возникновения ошибки RetryError:

import random

class ProgressLog:
    def __init__(self, total):
        self.total = total
        self.done = 0
    
    def increment(self):
        self.done = self.done + 1
    
    def __repr__(self):
        return f"Завершено {self.done}/{self.total}."

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(2), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
    async with semaphore:
        #await asyncio.sleep(1)
        if random.random() < 0.7:
            raise Exception("Произошло случайное исключение")
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": content}],
            "temperature": 0
        }) as resp:
            response_json = await resp.json()
            progress_log.increment()
            print(progress_log)
            return response_json["choices"][0]['message']["content"]

async def get_completion_list(content_list, max_parallel_calls):
    semaphore = asyncio.Semaphore(value=max_parallel_calls)
    progress_log = ProgressLog(len(content_list))
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(1)) as session:
        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*5, 100)
print("Затраченное время: ", time.perf_counter() - start_time, "секунд.")
print(completion_list)

<RetryCallState 133364377805024: попытка #1; время ожидания 0.22; последний результат: ошибка (Exception Произошло случайное исключение)>
<RetryCallState 133364377799456: попытка #1; время ожидания 0.53; последний результат: ошибка (Exception Произошло случайное исключение)>
<RetryCallState 133364377801328: попытка #1; время ожидания 0.24; последний результат: ошибка (Exception Произошло случайное исключение)>
<RetryCallState 133364377810208: попытка #1; время ожидания 0.38; последний результат: ошибка (Exception Произошло случайное исключение)>
<RetryCallState 133364377801616: попытка #1; время ожидания 0.54; последний результат: ошибка (Exception Произошло случайное исключение)>
<RetryCallState 133364377422096: попытка #1; время ожидания 0.59; последний результат: ошибка (Exception Произошло случайное исключение)>
<RetryCallState 133364377430592: попытка #1; время ожидания 0.07; последний результат: ошибка (Exception Произошло случайное исключение)>
<RetryCallState 133364377425648: попытка #1; время ожидания 0.05; последний результат: ошибка (Exception Произошло случайное исключение)>
Завершено 1/10.
Завершено 2/10.
Завершено 3/10.
Затраченное время:  2.6409040250000544 секунд.
['Pong!', 'Ping!', None, None, None, None, None, 'Ping!', None, None]

С этим кодом вместо возникновения ошибок будут возвращаться значения None.

Один из еще не решенных проблем – это проблема затруднения соединения. Она возникает, когда мы выполняем запрос и по какой-то причине хост удерживает соединение, но ни отказывается, ни возвращает какой-либо результат. Для обработки таких случаев нам нужно установить время ожидания для возврата, если вызов не возвращает значение в течение определенного периода. Для этого мы можем использовать параметр timeout из библиотеки aiohttp вместе с классом aiohttp.ClientTimeout. В случае возникновения тайм-аута здесь будет вызвано исключение TimeoutError, которое затем будет обработано декоратором retry из tenacity и автоматически выполнит функцию снова.

class ProgressLog:
    def __init__(self, total):
        self.total = total
        self.done = 0
    
    def increment(self):
        self.done = self.done + 1
    
    def __repr__(self):
        return f"Завершено {self.done}/{self.total}."

@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)
async def get_completion(content, session, semaphore, progress_log):
    async with semaphore:
        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={
            "model": "gpt-3.5-turbo",
            "messages": [{"role": "user", "content": content}],
            "temperature": 0
        }) as resp:
            response_json = await resp.json()
            progress_log.increment()
            print(progress_log)
            return response_json["choices"][0]['message']["content"]

async def get_completion_list(content_list, max_parallel_calls):
    semaphore = asyncio.Semaphore(value=max_parallel_calls)
    progress_log = ProgressLog(len(content_list))
    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(10)) as session:
        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

start_time = time.perf_counter()
completion_list = await get_completion_list(["ping", "pong"]*100, 100)
print("Затраченное время: ", time.perf_counter() - start_time, "секунд.")

<RetryCallState 133364375201936: попытка #1; время ожидания 0.57; последний результат: ошибка (TimeoutError )>
Затраченное время:  12.705538211999738 секунд.

Отлично! Теперь у нас есть надежный способ выполнения нескольких параллельных запросов, который автоматически повторяется в случае возникновения сбоев и возвращает значения None в случае систематической ошибки. Итак, окончательный код будет выглядеть следующим образом:

import asyncioimport aiohttpfrom tenacity import (    retry,    stop_after_attempt,    wait_random_exponential,)headers = {    "Content-Type": "application/json",    "Authorization": f"Bearer {OPENAI_API_KEY}"}class ProgressLog:    def __init__(self, total):        self.total = total        self.done = 0    def increment(self):        self.done = self.done + 1    def __repr__(self):        return f"Завершено {self.done}/{self.total}."@retry(wait=wait_random_exponential(min=1, max=60), stop=stop_after_attempt(20), before_sleep=print, retry_error_callback=lambda _: None)async def get_completion(content, session, semaphore, progress_log):    async with semaphore:        async with session.post("https://api.openai.com/v1/chat/completions", headers=headers, json={            "model": "gpt-3.5-turbo",            "messages": [{"role": "user", "content": content}],            "temperature": 0        }) as resp:            response_json = await resp.json()            progress_log.increment()            print(progress_log)            return response_json["choices"][0]['message']["content"]async def get_completion_list(content_list, max_parallel_calls, timeout):    semaphore = asyncio.Semaphore(value=max_parallel_calls)    progress_log = ProgressLog(len(content_list))    async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(timeout)) as session:        return await asyncio.gather(*[get_completion(content, session, semaphore, progress_log) for content in content_list])

В итоге, мы реализовали следующие функции:

  1. Асинхронные вызовы для сокращения времени ожидания.
  2. Ведение журнала прогресса асинхронных вызовов.
  3. Автоматическое повторение вызовов при сбое.
  4. Возвращение значений None в случае систематических ошибок.
  5. Повторный вызов, если истекло время ожидания и ничего не вернулось.

Если у вас есть вопросы, обнаружены ошибки или есть идеи по улучшению, оставьте комментарий ниже!