I am trying to use RunnableEach to make parallel requests to OpenAI. The responses are supposed to be "yes/no" plus a motivation. I want the response to be in JSON format, my code is like the following:
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables.base import RunnableEach
from langchain_openai import ChatOpenAI
class Classification(BaseModel):
flag: bool = Field(description="my descr",enum=[True, False])
answer: str = Field(description="my descr")
llm = ChatOpenAI(temperature=0, model="gpt-4o-mini", api_key=model_key).with_structured_output(
Classification)
prompt = my_prompt
tagging_chain = prompt | llm
runnable_each = RunnableEach(bound=tagging_chain)
input_list = [{"input": val} for val in mydata]
res = runnable_each.invoke(input_list)
I have around 25k elements in input list, that is 25k requests have to be processed. This has worked fine until today, it is failing because of an error in one request:
Function Classification arguments:
{"flag":false,"motivation":"The passage does not contain any information relevant to products
are not valid JSON. Received JSONDecodeError Unterminated string starting at: line 1 column 34 (char 33)
I understand that the call to the llm model returned a malformed string that when parsed as JSON (since I have constrained output), generates an error. I would like to know whether this kind of error can be handled in a way such that the entire process does not fail but only that request. Thanks in advance for your help.
Update I saw in the langchain documentation that a retry logic is implemented in the library: https://python.langchain.com/api_reference/core/runnables/langchain_core.runnables.retry.RunnableRetry.html By defining
tagging_chain = prompt | llm.with_retry()
runnable_each = RunnableEach(bound=tagging_chain)
Would it retry the single input that is failing or the entire input sequence?
I found the below in the documentation
return_exceptions (bool) – Whether to return exceptions instead of raising them. Defaults to False.
So basically you just have to pass return_exceptions
as True
to the RunnableEach
and it will just return it and not break the whole thing.