Search code examples
pythonfastapilangchainllamacpp

Streaming local LLM with FastAPI, Llama.cpp and Langchain


I have setup FastAPI with Llama.cpp and Langchain. Now I want to enable streaming in the FastAPI responses. Streaming works with Llama.cpp in my terminal, but I wasn't able to implement it with a FastAPI response.

Most tutorials focused on enabling streaming with an OpenAI model, but I am using a local LLM (quantized Mistral) with llama.cpp. I think I have to modify the Callbackhandler, but no tutorial worked. Here is my code:

from fastapi import FastAPI, Request, Response
from langchain_community.llms import LlamaCpp
from langchain.callbacks.manager import CallbackManager
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
import copy
from langchain.chains import LLMChain
from langchain.prompts import PromptTemplate

model_path = "../modelle/mixtral-8x7b-instruct-v0.1.Q5_K_M.gguf"

prompt= """
<s> [INST] Im folgenden bekommst du eine Aufgabe. Erledige diese anhand des User Inputs.

### Hier die Aufgabe: ###
{typescript_string}

### Hier der User Input: ###
{input}

Antwort: [/INST]
"""

def model_response_prompt():
    return PromptTemplate(template=prompt, input_variables=['input', 'typescript_string'])

def build_llm(model_path, callback=None):
        callback_manager = CallbackManager([StreamingStdOutCallbackHandler()])
        #callback_manager = CallbackManager(callback)
        
        n_gpu_layers = 1 # Metal set to 1 is enough. # ausprobiert mit mehreren
        n_batch = 512#1024 # Should be between 1 and n_ctx, consider the amount of RAM of your Apple Silicon Chip.
   
        llm = LlamaCpp(
                max_tokens =1000,
                n_threads = 6,
                model_path=model_path,
                temperature= 0.8,
                f16_kv=True,
                n_ctx=28000, 
                n_gpu_layers=n_gpu_layers,
                n_batch=n_batch,
                callback_manager=callback_manager, 
                verbose=True,
                top_p=0.75,
                top_k=40,
                repeat_penalty = 1.1,
                streaming=True,
                model_kwargs={
                        'mirostat': 2,
                },
        )
        
        return llm

# caching LLM
@lru_cache(maxsize=100)
def get_cached_llm():
        chat = build_llm(model_path)
        return chat

chat = get_cached_llm()

app = FastAPI(
    title="Inference API for Mistral and Mixtral",
    description="A simple API that use Mistral or Mixtral",
    version="1.0",
)

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

def bullet_point_model():          
    llm = build_llm(model_path=model_path)
    llm_chain = LLMChain(
        llm=llm,
        prompt=model_response_prompt(),
        verbose=True,
    )
    return llm_chain

@app.get('/model_response')
async def model(question : str, prompt: str):
    model = bullet_point_model()
    res = model({"typescript_string": prompt, "input": question})
    result = copy.deepcopy(res)
    return result

In a example notebook, I am calling FastAPI like this:

import  subprocess
import urllib.parse
import shlex
query = input("Insert your bullet points here: ")
task = input("Insert the task here: ")
#Safe Encode url string
encodedquery =  urllib.parse.quote(query)
encodedtask =  urllib.parse.quote(task)
#Join the curl command textx
command = f"curl -X 'GET' 'http://127.0.0.1:8000/model_response?question={encodedquery}&prompt={encodedtask}' -H 'accept: application/json'"
print(command)
args = shlex.split(command)
process = subprocess.Popen(args, shell=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = process.communicate()
print(stdout)

So with this code, getting responses from the API works. But I only see streaming in my terminal (I think this is because of the StreamingStdOutCallbackHandler. After the streaming in the terminal is complete, I am getting my FastAPI response.

What do I have to change now that I can stream token by token with FastAPI and a local llama.cpp model?


Solution

  • I was doing the same and hit similar issue that FastAPI was not streaming the response even I am using the StreamingResponse API and eventually I got the following code work. There are three important part:

    1. Make sure using StreamingResponse to wrap an Iterator.

    2. Make sure the Iterator sends newline character \n in each streaming response.

    3. Make sure using streaming APIs to connect to your LLMs. For example, _client.chat function in my example is using httpx to connect to REST APIs for LLMs. If you use requests package, it won't work as it doesn't support streaming.

      async def chat(self, request: Request):
      """
      Generate a chat response using the requested model.
      """
      
      # Passing request body JSON to parameters of function _chat
      # Request body follows ollama API's chat request format for now.
      params = await request.json()
      self.logger.debug("Request data: %s", params)
      
      chat_response = self._client.chat(**params)
      
      # Always return as streaming
      if isinstance(chat_response, Iterator):
          def generate_response():
              for response in chat_response:
                  yield json.dumps(response) + "\n"
          return StreamingResponse(generate_response(), media_type="application/x-ndjson")
      elif chat_response is not None:
          return json.dumps(chat_response)
      

    You can find detailed explanation here:

    https://kontext.tech/article/1377/fastapi-streaming-response-error-did-not-receive-done-or-success-response-in-stream