Search code examples
haskellstreaminghaskell-pipes

haskell pipes - how to repeatedly perform a takeWhile operation on a bytestring pipe?


What I'm trying to do is use takeWhile to split a bytestring by some character.

import qualified Data.ByteString.Internal as BS (c2w, w2c)
import Pipes
import Pipes.ByteString as PB
import Pipes.GZip
import Pipes.Prelude as PP
import System.IO

newline = BS.c2w '\n'

splitter = PB.takeWhile (\myWord -> myWord /= newline)

myPipe fileHandle = PP.toListM $ decompress fileProducer >-> splitter
  where
    fileProducer = PB.fromHandle fileHandle       

run = do
  dat <- withFile "somefile.blob" ReadMode myPipe
  pure dat

This gets me the first line, but what I really want is to effectively yield each chunk up to a newline character at a time. How do I do that?


Solution

  • pipes-bytestring and pipes-group are arranged so that repeatedly breaking a Producer ByteString m r yields a FreeT (Producer ByteString m) m r. FreeT can here be read to mean A_Succession_Of, so the result can be thought of as 'a succession of bytestring-producer segments returning an r'. This way if one of the segments is, say, 10 gibabytes long, we still have streaming rather than a 10 gigabyte strict bytestring.

    It looks to me that you want to break the bytestring producer on newlines, but I couldn't tell if you wanted to keep the newlines. If you are throwing them out, this is the same as splitting the bytestring producer with view PB.lines, followed by concatenating each subordinate producer into a single strict bytestring - the individual line. I wrote this below as accumLines. It is straightforward, but makes a tiny use of Lens.view to turn the fancy PB.lines lens into a regular function. (Many operations are written as lenses in pipes-bytestring because then they can be re-used for other purpose, especially the kind of producer parsing pipes favors.)

    import Pipes
    import qualified Pipes.Prelude as P
    import Pipes.ByteString as PB
    import qualified Pipes.Group as PG
    import Pipes.GZip
    
    import qualified Data.ByteString.Internal as BS (c2w, w2c)
    
    import System.IO
    import Lens.Simple (view) -- or Control.Lens or whatever
    import Data.Monoid
    
    main = run >>= mapM_ print
    
    myPipe fileHandle = P.toListM $ accumLines (decompress fileProducer)
      where
        fileProducer = PB.fromHandle fileHandle
    
    run = do
      dat <- withFile "a.gz" ReadMode myPipe
      pure dat
    
    -- little library additions
    
    accumLines :: Monad m => Producer ByteString m r -> Producer ByteString m r
    accumLines = mconcats . view PB.lines 
    
    accumSplits :: Monad m => Char -> Producer ByteString m r -> Producer ByteString m r
    accumSplits c  = mconcats . view (PB.splits (BS.c2w c)) 
    
    -- this is convenient, but the operations above could 
    -- be more rationally implemented using e.g. BL.fromChunks and toListM 
    mconcats :: (Monad m, Monoid b) => FreeT (Producer b m) m r -> Producer b m r
    mconcats = PG.folds (<>) mempty id
    

    Ideally you would not write a new bytestring at each line break. Whether you have to depends on what you were going to do with the lines.