Search code examples
haskellpipepipelinebytestringhaskell-streaming

How to track progress through a streaming ByteString?


I'm using the streaming-utils streaming-utils to stream a HTTP response body. I want to track the progress similar to how bytestring-progress allows with lazy ByteStrings. I suspect something like toChunks would be necessary, then reducing some cumulative bytes read and returning the original stream unmodified. But I cannot figure it out, and the streaming documentation is very unhelpful, mostly full of grandiose comparisons to alternative libraries.

Here's some code with my best effort so far. It doesn't include the counting yet, and just tries to print the size of chunks as they stream past (and doesn't compile).

download :: ByteString -> FilePath -> IO ()
download i file = do
  req <- parseRequest . C.unpack $ i
  m <- newHttpClientManager
  runResourceT $ do
    resp <- http req m
    lift . traceIO $ "downloading " <> file
    let body = SBS.fromChunks $ mapsM step $ SBS.toChunks $ responseBody resp
    SBS.writeFile file body

step bs = do
  traceIO $ "got " <> show (C.length bs) <> " bytes"
  return bs

Solution

  • What we want is to traverse the Stream (Of ByteString) IO () in two ways:

    • One that accumulates the incoming lengths of the ByteStrings and prints updates to console.
    • One that writes the stream to a file.

    We can do that with the help of the copy function, which has type:

    copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
    

    copy takes a stream and duplicates it into two different monadic layers, where each element of the original stream is emitted by both layers of the new dissociated stream.

    (Notice that we are changing the base monad, not the functor. What changing the functor to another Stream does is to delimit groups in a single stream, and we aren't interested in that here.)

    The following function takes a stream, copies it, accumulates the length of incoming strings with S.scan, prints them, and returns another stream that you can still work with, for example writing it to a file:

    {-# LANGUAGE OverloadedStrings #-}
    import Streaming
    import qualified Streaming.Prelude as S
    import qualified Data.ByteString as B
    
    track :: Stream (Of B.ByteString) IO r -> Stream (Of B.ByteString) IO r
    track stream =
          S.mapM_ (liftIO . print) -- brings us back to the base monad, here another stream
        . S.scan (\s b -> s + B.length b) (0::Int) id
        $ S.copy stream
    

    This will print the ByteStrings along with the accumulated lengths:

    main :: IO ()
    main = S.mapM_ B.putStr . track $ S.each ["aa","bb","c"]