What I'm trying to do is use takeWhile to split a bytestring by some character.
import qualified Data.ByteString.Internal as BS (c2w, w2c)
import Pipes
import Pipes.ByteString as PB
import Pipes.GZip
import Pipes.Prelude as PP
import System.IO
newline = BS.c2w '\n'
splitter = PB.takeWhile (\myWord -> myWord /= newline)
myPipe fileHandle = PP.toListM $ decompress fileProducer >-> splitter
where
fileProducer = PB.fromHandle fileHandle
run = do
dat <- withFile "somefile.blob" ReadMode myPipe
pure dat
This gets me the first line, but what I really want is to effectively yield each chunk up to a newline character at a time. How do I do that?
pipes-bytestring
and pipes-group
are arranged so that repeatedly breaking a Producer ByteString m r
yields a FreeT (Producer ByteString m) m r
. FreeT
can here be read to mean A_Succession_Of
, so the result can be thought of as 'a succession of bytestring-producer segments returning an r'. This way if one of the segments is, say, 10 gibabytes long, we still have streaming rather than a 10 gigabyte strict bytestring.
It looks to me that you want to break the bytestring producer on newlines, but I couldn't tell if you wanted to keep the newlines. If you are throwing them out, this is the same as splitting the bytestring producer with view PB.lines
, followed by concatenating each subordinate producer into a single strict bytestring - the individual line. I wrote this below as accumLines
. It is straightforward, but makes a tiny use of Lens.view
to turn the fancy PB.lines
lens into a regular function. (Many operations are written as lenses in pipes-bytestring
because then they can be re-used for other purpose, especially the kind of producer parsing pipes
favors.)
import Pipes
import qualified Pipes.Prelude as P
import Pipes.ByteString as PB
import qualified Pipes.Group as PG
import Pipes.GZip
import qualified Data.ByteString.Internal as BS (c2w, w2c)
import System.IO
import Lens.Simple (view) -- or Control.Lens or whatever
import Data.Monoid
main = run >>= mapM_ print
myPipe fileHandle = P.toListM $ accumLines (decompress fileProducer)
where
fileProducer = PB.fromHandle fileHandle
run = do
dat <- withFile "a.gz" ReadMode myPipe
pure dat
-- little library additions
accumLines :: Monad m => Producer ByteString m r -> Producer ByteString m r
accumLines = mconcats . view PB.lines
accumSplits :: Monad m => Char -> Producer ByteString m r -> Producer ByteString m r
accumSplits c = mconcats . view (PB.splits (BS.c2w c))
-- this is convenient, but the operations above could
-- be more rationally implemented using e.g. BL.fromChunks and toListM
mconcats :: (Monad m, Monoid b) => FreeT (Producer b m) m r -> Producer b m r
mconcats = PG.folds (<>) mempty id
Ideally you would not write a new bytestring at each line break. Whether you have to depends on what you were going to do with the lines.