I'm using the streaming-utils streaming-utils to stream a HTTP response body. I want to track the progress similar to how bytestring-progress allows with lazy ByteString
s. I suspect something like toChunks
would be necessary, then reducing some cumulative bytes read and returning the original stream unmodified. But I cannot figure it out, and the streaming documentation is very unhelpful, mostly full of grandiose comparisons to alternative libraries.
Here's some code with my best effort so far. It doesn't include the counting yet, and just tries to print the size of chunks as they stream past (and doesn't compile).
download :: ByteString -> FilePath -> IO ()
download i file = do
req <- parseRequest . C.unpack $ i
m <- newHttpClientManager
runResourceT $ do
resp <- http req m
lift . traceIO $ "downloading " <> file
let body = SBS.fromChunks $ mapsM step $ SBS.toChunks $ responseBody resp
SBS.writeFile file body
step bs = do
traceIO $ "got " <> show (C.length bs) <> " bytes"
return bs
What we want is to traverse the Stream (Of ByteString) IO ()
in two ways:
ByteString
s and prints updates to console.We can do that with the help of the copy
function, which has type:
copy :: Monad m => Stream (Of a) m r -> Stream (Of a) (Stream (Of a) m) r
copy
takes a stream and duplicates it into two different monadic layers, where each element of the original stream is emitted by both layers of the new dissociated stream.
(Notice that we are changing the base monad, not the functor. What changing the functor to another Stream
does is to delimit groups in a single stream, and we aren't interested in that here.)
The following function takes a stream, copies it, accumulates the length of incoming strings with S.scan
, prints them, and returns another stream that you can still work with, for example writing it to a file:
{-# LANGUAGE OverloadedStrings #-}
import Streaming
import qualified Streaming.Prelude as S
import qualified Data.ByteString as B
track :: Stream (Of B.ByteString) IO r -> Stream (Of B.ByteString) IO r
track stream =
S.mapM_ (liftIO . print) -- brings us back to the base monad, here another stream
. S.scan (\s b -> s + B.length b) (0::Int) id
$ S.copy stream
This will print the ByteString
s along with the accumulated lengths:
main :: IO ()
main = S.mapM_ B.putStr . track $ S.each ["aa","bb","c"]