Search code examples
haskellhaskell-pipes

Forking the streaming flow in haskell-pipes


I'm having trouble directing flow though a pipeline with haskell-pipes. Basically, I analyze a bunch of files and then I have to either

  1. print results to the terminal in a human-friendly way
  2. encode results to JSON

The chosen path depends upon a command line option.
In the second case, I have to output an opening bracket, then every incoming value followed by a comma and then a closing bracket. Currently insertCommas never terminates, so the closing bracket is never outputted.

import Pipes
import Data.ByteString.Lazy as B
import Data.Aeson (encode)

insertCommas :: Consumer B.ByteString IO ()
insertCommas = do
    first <- await
    lift $ B.putStr first
    for cat $ \obj -> lift $ do
        putStr ","
        B.putStr obj

jsonExporter :: Consumer (FilePath, AnalysisResult) IO ()
jsonExporter = do
    lift $ putStr "["
    P.map encode >-> insertCommas
    lift $ putStr "]"

exportStream :: Config -> Consumer (FilePath, AnalysisResult) IO ()
exportStream conf =
    case outputMode conf of
      JSON -> jsonExporter
      _    -> P.map (export conf) >-> P.stdoutLn

main :: IO ()
main = do
    -- The first two lines are Docopt stuff, not relevant
    args <- parseArgsOrExit patterns =<< getArgs
    ins  <- allFiles $ args `getAllArgs` argument "paths"
    let conf = readConfig args
    runEffect $ each ins
             >-> P.mapM analyze
             >-> P.map (filterResults conf)
             >-> P.filter filterNulls
             >-> exportStream conf

Solution

  • I think you should 'commify' with pipes-group. It has an intercalates, but not an intersperse, but it's not a big deal to write. You should stay away from the Consumer end, I think, for this sort of problem.

    {-#LANGUAGE OverloadedStrings #-}
    import Pipes
    import qualified Pipes.Prelude as P
    import qualified Data.ByteString.Lazy.Char8 as B
    import Pipes.Group
    import Lens.Simple  -- or Control.Lens or Lens.Micro or anything with view/^.
    import System.Environment
    
    intersperse_ :: Monad m => a -> Producer a m r -> Producer a m r
    intersperse_ a producer = intercalates (yield a) (producer ^. chunksOf 1) 
    
    main = do 
      args <- getArgs
      let op prod = case args of 
            "json":_ -> yield "[" *> intersperse_ "," prod <* yield "]"
            _        -> intersperse_ " " prod
      runEffect $ op producer >-> P.mapM_ B.putStr
      putStrLn ""
      where 
        producer = mapM_ yield (B.words "this is a test")
    

    which give me this

        >>> :main json
        [this,is,a,test]
        >>> :main ---
        this is a test