Search code examples
haskellbytestringhaskell-waihaskell-warphaskell-streaming

Streaming bytestring as WAI HTTP server response body


I have a value body :: BS.ByteString (ResourceT IO) (), from a function based on BS.readFile. I want to stream that value as the response body from a Wai Application. There's a helper, streamingResponse that takes a value of the type Stream (Of ByteString) IO r. I'm able to convert my BS.ByteString (ResourceT IO) () to Stream (Of ByteString) (ResourceT IO) () through the use of BS.toChunks, but it contains an extra ResourceT monad layer. Passing the body to streamingResponse gives me:

Couldn't match type ‘ResourceT IO’ with ‘IO’
  Expected type: Stream (Of ByteString) IO ()
    Actual type: Stream (Of ByteString) (ResourceT IO) ()

I've tried various things like wrapping things in runResourceT, binding and hoisting values etc. but really have no idea how to proceed. Here's the line in the full project if extra context is required.

Update0

hoist runResourceT body seems to type check. Someone also referred me to a Haskell Pipes thread, which may be a very related problem, and possible hint toward a solution.


Solution

  • If we want to allow Streams that live in ResourceT, we can do without the functions from streaming-wai (that only work for Streams based on IO) and instead build on top of functions like responseStream from network-wai:

    import           Control.Monad.Trans.Resource
    import           Network.Wai                     
    import           Streaming                   
    import qualified Streaming.Prelude               as S
    import           Data.ByteString.Builder (byteString, Builder)
    
    streamingResponseR :: Stream (Of ByteString) (ResourceT IO) r
                       -> Status
                       -> ResponseHeaders
                       -> Response
    streamingResponseR stream status headers =
        responseStream status headers streamingBody
        where
        streamingBody writeBuilder flush =
            let writer a =
                    do liftIO (writeBuilder (byteString a))
                        -- flushes for every produced bytestring, perhaps not optimal
                       liftIO flush
             in runResourceT $ void $ S.effects $ S.for stream writer
    

    streamingBody has type StreamingBody, which is actually a type synonym for a function (Builder -> IO ()) -> IO () -> IO () that takes a write callback and a flush callback as parameters, and uses them to write the response using some source of data that is in scope. (Note that these callbacks are provided by WAI, not by the user.)

    In our case, the source of data is a Stream that lives in ResourceT. We need to lift the write and flush callbacks (that live in IO) using liftIO, an also remember to invoke runResourceT to return a plain IO action at the end.


    What if we wanted to flush the response only after the accumulated length of the emitted bytestrings reached some limit?

    We would need a function (not implemented here) to create a division each time the limit is reached:

    breaks' :: Monad m 
            => Int 
            -> Stream (Of ByteString) m r 
            -> Stream (Stream (Of ByteString) m) m r
    breaks' breakSize = undefined
    

    And then we could intercalate the flushing action between each group using intercalates, before writing the stream:

    streamingBodyFrom :: Stream (Of ByteString) (ResourceT IO) () 
                      -> Int 
                      -> StreamingBody
    streamingBodyFrom stream breakSize writeBuilder flush =
        let writer a = liftIO (writeBuilder (byteString a))
            flusher = liftIO flush
            broken = breaks' breakSize stream
         in runResourceT . S.mapM_ writer . S.intercalates flusher $ broken