Search code examples
pythonasynchronousasync-awaitpython-asynciofastapi

How to read async stream from non-async method?


I use FastAPI to create the app. One of features is upload of files with PUT request. Fast API supports uploading files with POST requests (multipart form). But I need the PUT request where a file is just a body of a request.

I have found that I can use a Request object and there is the stream() method to access the request body.

However, I have got a problem with async code. The rest of my code is not async. Originally it was the WSGI application.

@f_app.put("/share/{share_name}/file/upload")
def upload_file(request: Request, share_name:str):
    path = request.headers.get("x-file-path","")

    request.state.api_app.router.input.with_stream(request.stream())
    
    return UploadAPIEndpoint(request.state.api_app).action("put",
                            share_name = share_name, path = path)

This code fails with the error

'async_generator' object has no attribute 'read'

Before this code worked with WSGI and it was

request.state.api_app.router.input.with_stream(environment['wsgi.input'])

Internally in that method there is executed read() operation on this object (with argument of with_stream())

request.stream() is "async" in FastAPI.

How can I solve this without changing my code to be async in many places?

Maybe it is possible to have some extra class that will work like a wrapper over async stream? Or maybe it is possible to use some tricks like "channels" or "queues" to run two parallel coroutines? One will read from async stream and put to the queue/channel and my main code will read from that shares queue/channel?

Do you have any examples where it is solved?


Solution

  • I didn't find the "common" solution for this. However, i have found how to solve this particular problem with FastAPI async stream and my existent "sync" code.

    Finally, the code looks like

    @f_app.put("/share/{share_name}/file/upload")
    async def upload_file(request: Request, share_name:str):
        path = request.headers.get("x-file-path","")
    
        upload_obj = UploadAPIEndpoint(request.state.api_app)    
    
        # Get special "stream receiver" object
        data_receiver = upload_obj.get_stream_receiver(share_name, path)
    
        async for chunk in request.stream():
            data_receiver.write_to_stream(chunk)
        
        # Do final job
        return upload_obj.finish_upload()