haskellconduit# Partially processing a conduit using another conduit

I'm looking to make a function with the following signature (I think):

```
partialProcessConduit :: forall m a b r. Monad m
=> (a -> (b, Maybe (ConduitT () a m ()) ))
-> ConduitT b Void m r
-> ConduitT () a m ()
-> m (r, ConduitT () a m ())
partialProcessConduit splitFunc consumingConduit sourceConduit
```

Which basically does the following:

- Repeatedly gets a value of type
`a`

out of the the conduit`sourceConduit`

. - Applies the functions
`splitFunc`

to that value`a`

. - Pushes the value
`b`

from`splitFunc`

into`consumingConduit`

- IF
`splitFunc`

returns`Just (some conduit)`

(i.e. not`Nothing`

) for the second part of it's returned pair THEN- "close up"
`consumingConduit`

, and get the result value`r`

- Return a conduit with the "rest" of
`sourceConduit`

, but with conduit in the Just appended in front of it.

- "close up"

I've actually achieved something close to this (apologies in advance for crappy naming). See here:

```
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE ScopedTypeVariables #-}
import Conduit (ConduitT, SealedConduitT, unsealConduitT, ($$+), await)
import Data.Void (Void)
import qualified Control.Arrow as Arrow
import Data.DList (DList)
partialProcessConduitInMemory :: forall m a b r. Monad m
=> (a -> (b, Maybe (ConduitT () a m ())))
-> (DList b -> r)
-> ConduitT () a m ()
-> m (r, ConduitT () a m ())
partialProcessConduitInMemory splitFunc collapseDList sourceConduit = do
(sc :: SealedConduitT () a m (), (result :: r, leftOver :: ConduitT () a m ())) <- x
pure (result, leftOver >> unsealConduitT sc)
where
x :: m (SealedConduitT () a m (), (r, ConduitT () a m ()))
x = sourceConduit $$+ g
g :: ConduitT a Void m (r, ConduitT () a m ())
g = Arrow.first collapseDList <$> go mempty
go :: DList b -> ConduitT a Void m (DList b, ConduitT () a m ())
go blockList = await >>= \case
Nothing -> pure (blockList, pure ())
Just block -> case splitFunc block of
(transformedBlock, Nothing) -> go $ blockList <> pure transformedBlock
(transformedBlock, Just leftOver) -> pure (blockList <> pure transformedBlock, leftOver)
```

This is *almost* what I want. Notice the type signature here is the same as above EXCEPT for the second argument. Here, instead of passing a conduit sink that consumes the elements as the second argument, I'm collecting them in a `DList. I'd rather be able to use a conduit sink to consume the first part of the conduit source, instead of collecting all the elements in a list and processing them.

Am I able to use a conduit sink here instead of the `DList`

, and if so, what sort of adjustments do I need to make? I thought about pushing elements into the sink in the `go`

loop instead of just appending them, and then doing `runConduit`

to get the result `r`

somehow, but I wasn't able to play nice with the types. Any help appreciated.

Solution

I suppose you want something like this:

```
{-# LANGUAGE ScopedTypeVariables #-}
partialProcessConduit :: forall m a b r. Monad m
=> (a -> (b, Maybe (ConduitT () a m ()) ))
-> ConduitT b Void m r
-> ConduitT () a m ()
-> m (r, ConduitT () a m ())
partialProcessConduit f snk src = do
(rest2, (mrest1,r)) <- src $$+ fuseBoth loop snk
pure (r, maybe id (>>) mrest1 (unsealConduitT rest2))
where loop :: ConduitT a b m (Maybe (ConduitT () a m ()))
loop = do ma <- await
case ma of
Just a -> do
let (b, mrest) = f a
yield b
case mrest of
Nothing -> loop
Just rest -> pure (Just rest)
Nothing -> pure Nothing
```

The `loop`

conduit here has type `ConduitT a b m (Maybe (ConduitT () a m ())`

, so it inputs `a`

s and outputs `b`

s until `f`

(AKA `splitFunc`

) returns a prefix conduit, in which case it returns `Just`

that conduit. If `splitFunc`

never returns a conduit, it returns `Nothing`

.

Now, we can `fuseBoth loop snk`

, which has type `ConduitT a Void m (Maybe (ConduitT () a m (), r)`

. This sinks the `b`

s from `loop`

into the `snk`

, returning both the prefix conduit from `splitFunc`

, if any, and the return `r`

from `snk`

.

Finally, we can `src $$+ fuseBoth loop snk`

. This will run the whole conduit sourcing `a`

s from `src`

and sinking `b`

s into `snk`

, until `splitFunc`

returns a prefix conduit, at which point it will return:

```
(SealedConduitT () a m (), (Maybe (ConduitT () a m ()), r))
```

where, incredibly, the sealed conduit is what's left of `src`

, the `Maybe`

conduit is the "prefix" conduit returned by `splitFunc`

, and the final `r`

is the return value from `snk`

. All that's left is to glue this together into an appropriate return value.

This seems to work as per the following test:

```
main :: IO ()
main = do
(r, c) <- partialProcessConduit foo (printC >> pure 999) (yieldMany [1,2,3,4,7,8,9])
runConduit (c .| printC)
print r
where foo 4 = (42, Just (yieldMany [5,6]))
foo n = (10*n, Nothing)
```

This outputs:

```
λ> main
10
20
30
42
5
6
7
8
9
999
```

which looks right.

- Comparing lists in Haskell
- Is there a non-identity monad morphism M ~> M that is monadically natural in M?
- Problem with loading module ‘Distribution.Simple’
- Improving efficiency in Stirling numbers calculation
- Does sequencing an infinite list of IO actions by definition result in a never-ending action? Or is there a way to bail out?
- How to call pgQuery from postgresql-query?
- How to avoid whitespace after a tag (link) in Hamlet templates?
- Understanding type-directed resolution in Haskell with existential types
- Why is seq bad?
- Understanding bind function in Haskell
- How to create route that will trigger on any path in Servant?
- How do I use a global state in WAI middleware?
- nixos 23.11 cabal install mysql-simple problem - "Missing (or bad) C libraries"
- Is there a way to kill all forked threads in a GHCi session without restarting it?
- Why can an invalid list expression such as 2:1 be assigned to a variable, but not printed?
- Iterate over a type level list and call a function based on each type in the list
- How does this solution of Project Euler Problem 27 in the Haskell Wiki work?
- Why `Monad` is required to use `pure`?
- Can't do partial function definitions in GHCi
- recommended way to convert Double -> Float in Haskell
- Haskell profiling understanding cost centre summary for anonymous lambda
- Why is Haskell fully declarative?
- GHC Generating Redundant Core Operations
- Question about Event firing in reflex-frp
- Using Haskell's "Maybe", type declarations
- How can I elegantly invert a Map's keys and values?
- Why there is no output for wrapped IO in Haskell?
- What are the definitions of Weather and Memory in xmobar repo?
- Serializing a Data.Text value to a ByteString without unnecessary \NUL bytes
- Using Haskell with VS Code