Search code examples
haskellhaskell-pipes

Signalling to downstream that upstream is exhausted


Question

Using the Haskell pipes library, I'm trying to define a Pipe with the following type:

signalExhausted :: Monad m => Pipe a (Value a) m r 

where the Value data type is defined by:

data Value a = Value a | Exhausted

The pipe should obey the following laws:

toList (each [] >-> signalExhausted) ==                 [Exhausted]
toList (each xs >-> signalExhausted) == map Value xs ++ [Exhausted]

In other words, the pipe should be equivalent to Pipes.Prelude.map Value, except that it should yield an additional Exhausted after all upstream values have been processed, giving downstream a chance to perform some final action.

Can such a Pipe be defined?

Example

> let xs = words "hubble bubble toil and trouble"
> toList $ each xs >-> signalExhausted
[Value "hubble", Value "bubble", Value "toil", Value "and", Value "trouble", Exhausted]

Notes

I'm aware that the pipes-parse library provides the functions draw and parseForever. These look useful, but I can't quite see how to combine them into a Pipe that matches the specification above.


Solution

  • A pipe like signalExhausted can't be defined, but a function equivalent to (>-> signalExhausted) can.

    >-> is a specialized version of the pull category. Execution is driven by the downstream proxies pulling data from upstream proxies. The downstream proxy sends an empty request () upstream and blocks until a response holding a value comes back from the upstream proxy. When the upstream proxy is exhausted and doesn't have any more values to send back, it returns. You can see the return that matters for these examples in the definition of each.

    each = F.foldr (\a p -> yield a >> p) (return ())
    -- what to do when the data's exhausted ^                       
    

    The downstream proxy needs a value to continue running, but there's no value the pipes library can possibly provide it, so the downstream proxy never runs again. Since it never runs again, there's no way it can modify or react to the data.

    There are two solutions to this problem. The simplest is to map Value over the upstream pipe and add a yield Exhausted after it's done.

    import Pipes
    import qualified Pipes.Prelude as P
    
    data Value a = Value a | Exhausted
        deriving (Show)
    
    signalExhausted p = p >-> P.map Value >> yield Exhausted
    

    This does exactly what you're looking for except the function signalExhausted takes the place of (>-> signalExhausted).

    let xs = words "hubble bubble toil and trouble"
    print . P.toList . signalExhausted $ each xs
    
    [Value "hubble",Value "bubble",Value "toil",Value "and",Value "trouble",Exhausted]
    

    The more general solution to this problem is to stop the upstream proxy from returning and instead signal downstream when it is exhausted. I demonstrated how to do so in an answer to a related question.

    import Control.Monad
    import Pipes.Core
    
    returnDownstream :: Monad m => Proxy a' a b' b m r -> Proxy a' a b' (Either r b) m r'
    returnDownstream = (forever . respond . Left =<<) . (respond . Right <\\)
    

    This replaces each respond with respond . Right and replaces return with forever . respond . left, sending returns downstream along with responses.

    returnDownstream is more general than what you are looking for. We can demonstrate how to use it to recreate signalExhausted. returnDownstream transforms a pipe that returns into one that never returns, and instead forwards its return value downstream as the Left value of an Either.

    signalExhausted p = returnDownstream p >-> respondLeftOnce
    

    respondLeftOnce is an example downstream proxy. The downstream proxy can discern between regular values held in Right and the return value held in Left.

    respondLeftOnce :: Monad m => Pipe (Either e a) (Value a) m ()
    respondLeftOnce = go
        where
            go = do
                ea <- await
                case ea of
                    Right a -> yield (Value a) >> go                    
                    Left  _ -> yield Exhausted       -- The upstream proxy is exhausted; do something else