Search code examples
python-3.xstreamingfastapiserver-sent-eventsopenai-api

"AttributeError: encode" when returning StreamingResponse in FastAPI


I am using Python 3.10 and FastAPI 0.92.0 to write a Server-Sent Events (SSE) stream api. This is how the Python code looks like:

from fastapi import APIRouter, FastAPI, Header

from src.chat.completions import chat_stream
from fastapi.responses import StreamingResponse

router = APIRouter()

@router.get("/v1/completions",response_class=StreamingResponse)
def stream_chat(q: str, authorization: str = Header(None)):
    auth_mode, auth_token = authorization.split(' ')
    if auth_token is None:
        return "Authorization token is missing"
    answer = chat_stream(q, auth_token)
    return StreamingResponse(answer, media_type="text/event-stream")

and this is the chat_stream function:

import openai

def chat_stream(question: str, key: str):
    openai.api_key = key
    # create a completion
    completion = openai.Completion.create(model="text-davinci-003",
                                          prompt=question,
                                          stream=True)
    return completion

When I am using this command to invoke the api:

curl -N -H "Authorization: Bearer sk-the openai key" https://chat.poemhub.top/v1/completions?q=hello

the server side shows the following error:

INFO:     123.146.17.54:0 - "GET /v1/completions?q=hello HTTP/1.0" 200 OK
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/uvicorn/protocols/http/h11_impl.py", line 429, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
  File "/usr/local/lib/python3.10/dist-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
    return await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/fastapi/applications.py", line 276, in __call__
    await super().__call__(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/applications.py", line 122, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 184, in __call__
    raise exc
  File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 162, in __call__
    await self.app(scope, receive, _send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 79, in __call__
    raise exc
  File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 68, in __call__
    await self.app(scope, receive, sender)
  File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
    raise e
  File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 718, in __call__
    await route.handle(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 276, in handle
    await self.app(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 69, in app
    await response(scope, receive, send)
  File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 270, in __call__
    async with anyio.create_task_group() as task_group:
  File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 273, in wrap
    await func()
  File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 264, in stream_response
    chunk = chunk.encode(self.charset)
  File "/usr/local/lib/python3.10/dist-packages/openai/openai_object.py", line 61, in __getattr__
    raise AttributeError(*err.args)
AttributeError: encode

Why did this error happen? What should I do to fixed it?


Solution

  • As described in FastAPI's documentation, StreamingResponse takes an asynchronous (async def) generator or a normal (def) generator/iterator and streams the response body. As explained in this answer, in either case, FastAPI will still work asynchronously—if the generator that is passed to StreamingResponse isn't asynchronous, FastAPI/Starlette will then run the generator in a separate thread (see the relevant implementation here), using iterate_in_threadpool(), which will then be awaited (see iterate_in_threadpool() implementation). For more details on def vs async def in FastAPI, as well as solutions for running blocking operations inside async def endpoints, please have a look at this detailed answer.

    StreamingResponse, a subclass of Response, streams the response body in bytes. Hence, if the content passed through the generator is not in bytes format, FastAPI/Starlette will attempt to encode/convert it into bytes (using the default utf-8 encoding scheme). Below is a code snippet from the relevant implementation:

    async for chunk in self.body_iterator:
        if not isinstance(chunk, bytes):
            chunk = chunk.encode(self.charset)
        await send({"type": "http.response.body", "body": chunk, "more_body": True})
    

    However, if a chunk in the iterator/generator is not in str format that could be encoded, an AttributeError will be raised (e.g., AttributeError: ... has no attribute 'encode'), similar to what is described in this answer (see Option 2, Note 3). Also, if a chunk includes characters outside the range of utf-8 encoding, you might as well be faced with UnicodeEncodeError: ... codec can't encode character. Thus, given the AttributeError: encode in the error traceback you provided, it is very likely that you are passing an object other than str.

    The example below demonstrates an asynchronous generator (i.e., async def gen()), streaming JSON data, which, in this case, requires dict objects first to be converted into str (using json.dumps(), or any other JSON encoder, such as orjson, see here) and then, optionally, to the corrsponding byte value (using .encode('utf-8')), which, as explained earlier, if omitted, FastAPI/Starlette will take care of that. In addition, the example below uses text/event-stream media type (also known as MIME type), which is usually used when sending events from the server (see event stream format as well). If you are confident that the data sent from the server are always in JSON format, you could use application/json media type as well. Note that, as explained in this answer, using text/plain media type instead might result in not displaying the streaming data in the browser immediately, as browsers use to buffer text/plain responses for what is called "MIME Sniffing" (see the linked answer above on how to disable it, if you would like to use text/plain).

    Working Example

    from fastapi import FastAPI
    from fastapi.responses import StreamingResponse
    import asyncio
    import json
    
    app = FastAPI()
    
    
    @app.get('/')
    async def main():
        async def gen():
            while True:
                #yield (json.dumps({'msg': 'Hello World!'}) + '\n\n').encode('utf-8')
                # or, simply use the below, and FastAPI/Starlette will take care of the encoding
                yield json.dumps({'msg': 'Hello World!'}) + '\n\n'
                await asyncio.sleep(0.5)
    
        return StreamingResponse(gen(), media_type='text/event-stream')
    


    If you are confident that the data sent from the server are in JSON format, you could instead use:

    # ...
    
    @app.get('/')
    async def main():
        async def gen():
            while True:
                yield json.dumps({'msg': 'Hello World!'})
                await asyncio.sleep(0.5)
    
        return StreamingResponse(gen(), media_type='application/json')