Search code examples
pythonlangchainlanggraph

how to custom stream events in langgraph


I'm using langgraph astream_events. I defined a graph, just like:

call_llm = get_history_messages | llm.bind(tools)

def call_tool(messages: List[BaseMessage]):
    response = ...
    return ToolMessage(tool_call_id=uuid4(), content=json.dumps(response))

graph = StateGraph()
graph.add_node(Node.LLM, call_llm)
graph.add_node(Node.Tool, call_tool)
graph.add_edge(Node.LLM, Node.Tool)
graph.add_edge(Node.Tool, Node.End)
graph.compile()

human_message = HumanMessage(content=...)
async for event in graph.astream_events(human_message, version="v2", stream_mode="values"):
    print(event)

When I call this method, I receive a large amount of event data, most of which I don't need. I wanna customize the output of LLM node and Tool node. How should I do this?


Solution

  • Sounds like you're only interested in some event types, perhaps the one the contains the prompt and all the events for response stream chunks?

    event_types_to_keep = {"on_chat_model_start", "on_chat_model_stream"} # customize as necessary
    
    async def filter_events(event_stream):
        async for event in event_stream:
            if not event_types_to_keep or event.get("event") in event_types_to_keep:
                if "input" in event["data"]:
                    filtered_data = event["data"]["input"]["messages"]
                if "chunk" in event["data"]:
                    filtered_data = event["data"]["chunk"]
                
                # Yield a new event structure with the filtered data
                yield {
                    "event": event["event"],
                    "data": filtered_data,
                }
    
    async for event in filter_events(
        chain.astream_events({"topic": "parrot"}, version="v2")
    ):
        print(event)
    

    which prints:

    {'event': 'on_chat_model_start', 'data': [[HumanMessage(content='tell me a joke about parrot', additional_kwargs={}, response_metadata={})]]}
    {'event': 'on_chat_model_stream', 'data': AIMessageChunk(content='', additional_kwargs={}, response_metadata={}, id='run-ffbf3d59-13f2-49ca-8988-b90c81cf3c9b')}
    {'event': 'on_chat_model_stream', 'data': AIMessageChunk(content="Here's a par", additional_kwargs={}, response_metadata={}, id='run-ffbf3d59-13f2-49ca-8988-b90c81cf3c9b')}
    {'event': 'on_chat_model_stream', 'data': AIMessageChunk(content='rot joke for', additional_kwargs={}, response_metadata={}, id='run-ffbf3d59-13f2-49ca-8988-b90c81cf3c9b')}