Partially processing a conduit using another conduit

I'm looking to make a function with the following signature (I think):

partialProcessConduit :: forall m a b r. Monad m 
  => (a -> (b, Maybe (ConduitT () a m ()) )) 
  -> ConduitT b Void m r 
  -> ConduitT () a m () 
  -> m (r, ConduitT () a m ())
partialProcessConduit splitFunc consumingConduit sourceConduit

Which basically does the following:

  1. Repeatedly gets a value of type a out of the the conduit sourceConduit.
  2. Applies the functions splitFunc to that value a.
  3. Pushes the value b from splitFunc into consumingConduit
  4. IF splitFunc returns Just (some conduit) (i.e. not Nothing) for the second part of it's returned pair THEN
    1. "close up" consumingConduit, and get the result value r
    2. Return a conduit with the "rest" of sourceConduit, but with conduit in the Just appended in front of it.

I've actually achieved something close to this (apologies in advance for crappy naming). See here:

{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}

import Conduit (ConduitT, SealedConduitT, unsealConduitT, ($$+), await)
import Data.Void (Void)
import qualified Control.Arrow as Arrow
import Data.DList (DList)

partialProcessConduitInMemory :: forall m a b r. Monad m 
  => (a -> (b, Maybe (ConduitT () a m ()))) 
  -> (DList b -> r) 
  -> ConduitT () a m () 
  -> m (r, ConduitT () a m ())
partialProcessConduitInMemory splitFunc collapseDList sourceConduit = do
  (sc :: SealedConduitT () a m (), (result :: r, leftOver :: ConduitT () a m ())) <- x
  pure (result, leftOver >> unsealConduitT sc)
    x :: m (SealedConduitT () a m (), (r, ConduitT () a m ()))
    x = sourceConduit $$+ g
    g :: ConduitT a Void m (r, ConduitT () a m ())
    g = Arrow.first collapseDList <$> go mempty
    go :: DList b -> ConduitT a Void m (DList b, ConduitT () a m ())
    go blockList = await >>= \case
      Nothing -> pure (blockList, pure ())
      Just block -> case splitFunc block of
        (transformedBlock, Nothing) -> go $ blockList <> pure transformedBlock
        (transformedBlock, Just leftOver) -> pure (blockList <> pure transformedBlock, leftOver)

This is almost what I want. Notice the type signature here is the same as above EXCEPT for the second argument. Here, instead of passing a conduit sink that consumes the elements as the second argument, I'm collecting them in a `DList. I'd rather be able to use a conduit sink to consume the first part of the conduit source, instead of collecting all the elements in a list and processing them.

Am I able to use a conduit sink here instead of the DList, and if so, what sort of adjustments do I need to make? I thought about pushing elements into the sink in the go loop instead of just appending them, and then doing runConduit to get the result r somehow, but I wasn't able to play nice with the types. Any help appreciated.


  • I suppose you want something like this:

    {-# LANGUAGE ScopedTypeVariables #-}
    partialProcessConduit :: forall m a b r. Monad m
      => (a -> (b, Maybe (ConduitT () a m ()) ))
      -> ConduitT b Void m r
      -> ConduitT () a m ()
      -> m (r, ConduitT () a m ())
    partialProcessConduit f snk src = do
      (rest2, (mrest1,r)) <- src $$+ fuseBoth loop snk
      pure (r, maybe id (>>) mrest1 (unsealConduitT rest2))
      where loop :: ConduitT a b m (Maybe (ConduitT () a m ()))
            loop = do ma <- await
                      case ma of
                        Just a -> do
                          let (b, mrest) = f a
                          yield b
                          case mrest of
                            Nothing -> loop
                            Just rest -> pure (Just rest)
                        Nothing -> pure Nothing

    The loop conduit here has type ConduitT a b m (Maybe (ConduitT () a m ()), so it inputs as and outputs bs until f (AKA splitFunc) returns a prefix conduit, in which case it returns Just that conduit. If splitFunc never returns a conduit, it returns Nothing.

    Now, we can fuseBoth loop snk, which has type ConduitT a Void m (Maybe (ConduitT () a m (), r). This sinks the bs from loop into the snk, returning both the prefix conduit from splitFunc, if any, and the return r from snk.

    Finally, we can src $$+ fuseBoth loop snk. This will run the whole conduit sourcing as from src and sinking bs into snk, until splitFunc returns a prefix conduit, at which point it will return:

    (SealedConduitT () a m (), (Maybe (ConduitT () a m ()), r))

    where, incredibly, the sealed conduit is what's left of src, the Maybe conduit is the "prefix" conduit returned by splitFunc, and the final r is the return value from snk. All that's left is to glue this together into an appropriate return value.

    This seems to work as per the following test:

    main :: IO ()
    main = do
      (r, c) <- partialProcessConduit foo (printC >> pure 999) (yieldMany [1,2,3,4,7,8,9])
      runConduit (c .| printC)
      print r
      where foo 4 = (42, Just (yieldMany [5,6]))
            foo n = (10*n, Nothing)

    This outputs:

    λ> main

    which looks right.