I am using Python 3.10 and FastAPI 0.92.0
to write a Server-Sent Events (SSE) stream api. This is how the Python code looks like:
from fastapi import APIRouter, FastAPI, Header
from src.chat.completions import chat_stream
from fastapi.responses import StreamingResponse
router = APIRouter()
@router.get("/v1/completions",response_class=StreamingResponse)
def stream_chat(q: str, authorization: str = Header(None)):
auth_mode, auth_token = authorization.split(' ')
if auth_token is None:
return "Authorization token is missing"
answer = chat_stream(q, auth_token)
return StreamingResponse(answer, media_type="text/event-stream")
and this is the chat_stream
function:
import openai
def chat_stream(question: str, key: str):
openai.api_key = key
# create a completion
completion = openai.Completion.create(model="text-davinci-003",
prompt=question,
stream=True)
return completion
When I am using this command to invoke the api:
curl -N -H "Authorization: Bearer sk-the openai key" https://chat.poemhub.top/v1/completions?q=hello
the server side shows the following error:
INFO: 123.146.17.54:0 - "GET /v1/completions?q=hello HTTP/1.0" 200 OK
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/usr/local/lib/python3.10/dist-packages/uvicorn/protocols/http/h11_impl.py", line 429, in run_asgi
result = await app( # type: ignore[func-returns-value]
File "/usr/local/lib/python3.10/dist-packages/uvicorn/middleware/proxy_headers.py", line 78, in __call__
return await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/fastapi/applications.py", line 276, in __call__
await super().__call__(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/applications.py", line 122, in __call__
await self.middleware_stack(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 184, in __call__
raise exc
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/errors.py", line 162, in __call__
await self.app(scope, receive, _send)
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 79, in __call__
raise exc
File "/usr/local/lib/python3.10/dist-packages/starlette/middleware/exceptions.py", line 68, in __call__
await self.app(scope, receive, sender)
File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 21, in __call__
raise e
File "/usr/local/lib/python3.10/dist-packages/fastapi/middleware/asyncexitstack.py", line 18, in __call__
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 718, in __call__
await route.handle(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 276, in handle
await self.app(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/routing.py", line 69, in app
await response(scope, receive, send)
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 270, in __call__
async with anyio.create_task_group() as task_group:
File "/usr/local/lib/python3.10/dist-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
raise exceptions[0]
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 273, in wrap
await func()
File "/usr/local/lib/python3.10/dist-packages/starlette/responses.py", line 264, in stream_response
chunk = chunk.encode(self.charset)
File "/usr/local/lib/python3.10/dist-packages/openai/openai_object.py", line 61, in __getattr__
raise AttributeError(*err.args)
AttributeError: encode
Why did this error happen? What should I do to fixed it?
As described in FastAPI's documentation, StreamingResponse
takes an asynchronous (async def
) generator or a normal (def
) generator/iterator and streams the response body. As explained in this answer, in either case, FastAPI will still work asynchronously—if the generator that is passed to StreamingResponse
isn't asynchronous, FastAPI/Starlette will then run the generator in a separate thread (see the relevant implementation here), using iterate_in_threadpool()
, which will then be await
ed (see iterate_in_threadpool()
implementation). For more details on def
vs async def
in FastAPI, as well as solutions for running blocking operations inside async def
endpoints, please have a look at this detailed answer.
StreamingResponse
, a subclass of Response
, streams the response body in bytes
. Hence, if the content passed through the generator is not in bytes
format, FastAPI/Starlette will attempt to encode/convert it into bytes
(using the default utf-8
encoding scheme). Below is a code snippet from the relevant implementation:
async for chunk in self.body_iterator:
if not isinstance(chunk, bytes):
chunk = chunk.encode(self.charset)
await send({"type": "http.response.body", "body": chunk, "more_body": True})
However, if a chunk
in the iterator/generator is not in str
format that could be encoded, an AttributeError
will be raised (e.g., AttributeError: ... has no attribute 'encode'
), similar to what is described in this answer (see Option 2, Note 3). Also, if a chunk includes characters outside the range of utf-8
encoding, you might as well be faced with UnicodeEncodeError: ... codec can't encode character
. Thus, given the AttributeError: encode
in the error traceback you provided, it is very likely that you are passing an object other than str
.
The example below demonstrates an asynchronous generator (i.e., async def gen()
), streaming JSON data, which, in this case, requires dict
objects first to be converted into str
(using json.dumps()
, or any other JSON encoder, such as orjson
, see here) and then, optionally, to the corrsponding byte value (using .encode('utf-8')
), which, as explained earlier, if omitted, FastAPI/Starlette will take care of that. In addition, the example below uses text/event-stream
media type (also known as MIME type), which is usually used when sending events from the server (see event stream format as well). If you are confident that the data sent from the server are always in JSON format, you could use application/json
media type as well. Note that, as explained in this answer, using text/plain
media type instead might result in not displaying the streaming data in the browser immediately, as browsers use to buffer text/plain
responses for what is called "MIME Sniffing" (see the linked answer above on how to disable it, if you would like to use text/plain
).
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
@app.get('/')
async def main():
async def gen():
while True:
#yield (json.dumps({'msg': 'Hello World!'}) + '\n\n').encode('utf-8')
# or, simply use the below, and FastAPI/Starlette will take care of the encoding
yield json.dumps({'msg': 'Hello World!'}) + '\n\n'
await asyncio.sleep(0.5)
return StreamingResponse(gen(), media_type='text/event-stream')
If you are confident that the data sent from the server are in JSON format, you could instead use:
# ...
@app.get('/')
async def main():
async def gen():
while True:
yield json.dumps({'msg': 'Hello World!'})
await asyncio.sleep(0.5)
return StreamingResponse(gen(), media_type='application/json')