I'm looking to make a function with the following signature (I think):
partialProcessConduit :: forall m a b r. Monad m
=> (a -> (b, Maybe (ConduitT () a m ()) ))
-> ConduitT b Void m r
-> ConduitT () a m ()
-> m (r, ConduitT () a m ())
partialProcessConduit splitFunc consumingConduit sourceConduit
Which basically does the following:
a
out of the the conduit sourceConduit
.splitFunc
to that value a
.b
from splitFunc
into consumingConduit
splitFunc
returns Just (some conduit)
(i.e. not Nothing
) for the second part of it's returned pair THEN
consumingConduit
, and get the result value r
sourceConduit
, but with conduit in the Just appended in front of it.I've actually achieved something close to this (apologies in advance for crappy naming). See here:
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
import Conduit (ConduitT, SealedConduitT, unsealConduitT, ($$+), await)
import Data.Void (Void)
import qualified Control.Arrow as Arrow
import Data.DList (DList)
partialProcessConduitInMemory :: forall m a b r. Monad m
=> (a -> (b, Maybe (ConduitT () a m ())))
-> (DList b -> r)
-> ConduitT () a m ()
-> m (r, ConduitT () a m ())
partialProcessConduitInMemory splitFunc collapseDList sourceConduit = do
(sc :: SealedConduitT () a m (), (result :: r, leftOver :: ConduitT () a m ())) <- x
pure (result, leftOver >> unsealConduitT sc)
where
x :: m (SealedConduitT () a m (), (r, ConduitT () a m ()))
x = sourceConduit $$+ g
g :: ConduitT a Void m (r, ConduitT () a m ())
g = Arrow.first collapseDList <$> go mempty
go :: DList b -> ConduitT a Void m (DList b, ConduitT () a m ())
go blockList = await >>= \case
Nothing -> pure (blockList, pure ())
Just block -> case splitFunc block of
(transformedBlock, Nothing) -> go $ blockList <> pure transformedBlock
(transformedBlock, Just leftOver) -> pure (blockList <> pure transformedBlock, leftOver)
This is almost what I want. Notice the type signature here is the same as above EXCEPT for the second argument. Here, instead of passing a conduit sink that consumes the elements as the second argument, I'm collecting them in a `DList. I'd rather be able to use a conduit sink to consume the first part of the conduit source, instead of collecting all the elements in a list and processing them.
Am I able to use a conduit sink here instead of the DList
, and if so, what sort of adjustments do I need to make? I thought about pushing elements into the sink in the go
loop instead of just appending them, and then doing runConduit
to get the result r
somehow, but I wasn't able to play nice with the types. Any help appreciated.
I suppose you want something like this:
{-# LANGUAGE ScopedTypeVariables #-}
partialProcessConduit :: forall m a b r. Monad m
=> (a -> (b, Maybe (ConduitT () a m ()) ))
-> ConduitT b Void m r
-> ConduitT () a m ()
-> m (r, ConduitT () a m ())
partialProcessConduit f snk src = do
(rest2, (mrest1,r)) <- src $$+ fuseBoth loop snk
pure (r, maybe id (>>) mrest1 (unsealConduitT rest2))
where loop :: ConduitT a b m (Maybe (ConduitT () a m ()))
loop = do ma <- await
case ma of
Just a -> do
let (b, mrest) = f a
yield b
case mrest of
Nothing -> loop
Just rest -> pure (Just rest)
Nothing -> pure Nothing
The loop
conduit here has type ConduitT a b m (Maybe (ConduitT () a m ())
, so it inputs a
s and outputs b
s until f
(AKA splitFunc
) returns a prefix conduit, in which case it returns Just
that conduit. If splitFunc
never returns a conduit, it returns Nothing
.
Now, we can fuseBoth loop snk
, which has type ConduitT a Void m (Maybe (ConduitT () a m (), r)
. This sinks the b
s from loop
into the snk
, returning both the prefix conduit from splitFunc
, if any, and the return r
from snk
.
Finally, we can src $$+ fuseBoth loop snk
. This will run the whole conduit sourcing a
s from src
and sinking b
s into snk
, until splitFunc
returns a prefix conduit, at which point it will return:
(SealedConduitT () a m (), (Maybe (ConduitT () a m ()), r))
where, incredibly, the sealed conduit is what's left of src
, the Maybe
conduit is the "prefix" conduit returned by splitFunc
, and the final r
is the return value from snk
. All that's left is to glue this together into an appropriate return value.
This seems to work as per the following test:
main :: IO ()
main = do
(r, c) <- partialProcessConduit foo (printC >> pure 999) (yieldMany [1,2,3,4,7,8,9])
runConduit (c .| printC)
print r
where foo 4 = (42, Just (yieldMany [5,6]))
foo n = (10*n, Nothing)
This outputs:
λ> main
10
20
30
42
5
6
7
8
9
999
which looks right.