I am trying to send requests to the OpenAI API to translate a list of texts via a Google Cloud Function. Since the API takes a little while to answer and I am trying to translate a few hundred texts per day, I tried speeding up the process by sending the requests concurrently from the GCF to the OpenAI API.
So, I prototyped the function in Python in a Jupyter Notebook and managed to get a concurrent function running, knowing that there are some differences in the event loop management, but in the end reducing the run time from about 30 minutes down to 4-5. Then I tried to figure out the differences between running the function in Jupyter and in the Google Cloud Function and in the end managed to get it running there as well.
However, it just doesn't seem to speed up the GCF and still runs for 30 minutes and more, causing TimeoutErrors in other parts of the platform.
I have checked:
Here is my code for the cloud function
import pandas as pd
import os
import asyncio
from langchain.chat_models import ChatOpenAI
from langchain.prompts.chat import HumanMessagePromptTemplate, ChatPromptTemplate
from langchain.output_parsers import PydanticOutputParser
from langchain.schema.output_parser import OutputParserException
from langchain.schema.messages import get_buffer_string
from langchain.chains import LLMChain
from functools import wraps
import platform
if platform.system() == "Windows":
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
OPEN_AI_KEY = os.environ.get("OPENAI_TOKEN")
OPENAI_MODEL = "gpt-3.5-turbo-1106"
def request_concurrency_limit_decorator(limit=5):
# Bind the default event loop
sem = asyncio.Semaphore(limit)
def executor(func):
@wraps(func)
async def wrapper(*args, **kwargs):
async with sem:
return await func(*args, **kwargs)
return wrapper
return executor
async def translate_concurrently(
df: pd.DataFrame, openai_api_key: str = OPEN_AI_KEY, model_name: str = OPENAI_MODEL
):
# Creating the prompt
PROMPT_TRANSLATE_REVIEW = """Translate this into English: '{input}'"""
llm = ChatOpenAI(openai_api_key=openai_api_key, model_name=model_name)
message = HumanMessagePromptTemplate.from_template(
template=PROMPT_TRANSLATE_REVIEW,
)
chat_prompt = ChatPromptTemplate.from_messages([message])
chain = LLMChain(llm=llm, prompt=chat_prompt)
# start async translation requests
@request_concurrency_limit_decorator(limit=5)
async def async_translate(chain: LLMChain, input: str):
resp = await chain.arun({"input": input})
return resp
tasks = [async_translate(chain, input=review) for review in df["original_text"]]
# Row order is guaranteed by asyncio.gather
df["text_translation"] = await asyncio.gather(*tasks)
def main(ctx):
# preparing the translation_df with a column "original_text"
translation_df = pd.DataFrame(...)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(translate_concurrently(translation_df))
loop.close()
return "OK", 200
I would appreciate any hints to where I went wrong and why the GCF doesn't speed up as much as my prototyped function in Jupyter Notebook
As far as I can tell now, the OpenAI API sometimes needs an extremely long time to answer a request - even upwards of 10 minutes. These calls don't fail and therefore cannot be handled gracefully.
So, the absolute majority of requests are answered within seconds, but every 20 or so prompts, it takes forever and brings down the performance of the whole Cloud Function.
I couldn't find a way to effectively force a timeout within the framework of the langchain library.
In the end, I rewrote the error handling and retrying logic without using langchain. Some requests still run forever, but using asyncio, I can force a timeout and start a new request, which usually runs much faster.