Search code examples
haskellenumeratorloops

Filtering / branching enumeratee


I am using enumerator-0.4.10, and I need to distribute processing of different parts of the incoming stream to different iteratees (I am parsing a huge XML file, and different sub-trees have different processing logic). Only a single iteratee will be active at a time since the sub-trees don't intersect.

I wrote a simple example that filters the stream and passes the result to one iteratee; please see below. However, with multiple nested iteratees it seems to me that I can no longer use an enumeratee. Do I need to write my own multi-enumeratee that holds multiple inner iteratees? Any better ideas?

Here is my (beginner's) code for a single nested iteratee:

module Main ( main ) where

import qualified Data.Enumerator as E ( Enumeratee, Step(..), Stream(..),
  checkDone, checkDoneEx, continue, enumList, joinI, run_, yield )
import Data.Enumerator ( ($$), (>>==) )
import qualified Data.Enumerator.List as EL ( consume )

-- cribbed from EL.concatMap
concatMapAccum :: Monad m => (s -> ao -> (s, [ai])) -> s ->
E.Enumeratee ao ai m b
concatMapAccum f s0 = E.checkDone (E.continue . step s0)
  where
    step _ k E.EOF = E.yield (E.Continue k) E.EOF
    step s k (E.Chunks xs) = loop s k xs
    loop s k [] = E.continue (step s k)
    loop s k (x:xs) = case f s x of
      (s', ais) -> k (E.Chunks $ ais) >>==
        E.checkDoneEx (E.Chunks xs) (\k' -> loop s' k' xs)

passFromTo :: Monad m => ((a -> Bool), (a -> Bool)) -> Bool -> E.Enumeratee a a m b
passFromTo (from, to) pass0 =
  concatMapAccum updatePass pass0
    where
      updatePass pass el = case (pass, from el, to el) of
        (True, _, to_el) -> (not to_el, [el])
        (False, True, _) -> (True, [el])
        (False, False, _) -> (False, [])

main :: IO()
main = do
  E.run_ (E.enumList 3 [1..20] $$
    E.joinI $ passFromTo ((\e -> e == 3 || e == 13), (\e -> e == 7 || e == 17)) False $$
    EL.consume) >>= print

$ ./dist/build/StatefulEnumeratee/StatefulEnumeratee
[3,4,5,6,7,13,14,15,16,17]

Solution

  • Yes, you need an enumeratee that passes the stream to multiple iteratees, like Data.Iteratee.sequence_ and Data.Iteratee.Parallel.psequence_ from iteratee-0.8.6. sequence_ takes a list of iteratees to run simultaneously, and handles each input chunk by mapM across that list. psequence_ takes similar arguments, but runs each input iteratee in a separate forkIO thread.

    There has been some discussion on haskell-cafe and the iteratee mailing lists about these over the past year, eg: http://www.haskell.org/pipermail/haskell-cafe/2011-January/088319.html The main thing to be careful about is handling errors from the inner iteratees: in your application, if one inner iteratee fails do you want to terminate all iteratees or just that one, and [how] do you want to propagate those errors.