Search code examples
haskellhaskell-pipes

How to turn a pull based pipe into a push based one?


By default pipes are pull based. This is due to the operator >-> which is implemented via +>> which is the pointful bind operator for his pull category. My understanding is that this means that if you have code like producer >-> consumer, the consumer's body will be called first, then once it awaits data, the producer will be called.

I've seen in the pipes documentation here that you can use the code (reflect .) from Pipes.Core to turn a pull based pipe into a push based pipe. That means instead (correct me if I'm wrong) that in the code above producer >-> consumer, the producer is run first, produces a value, then the consumer tries to consume. That seems really useful and I'd like to know how to do it.

I've also seen in discussions here that there is no push based counterpart to >-> because it is easy to turn any pipe around (I assume with reflect?), but I can't really figure how to do it or find any examples.

Here's some code I've attempted:

stdin :: Producer String IO r
stdin = forever $ do
  lift $ putStrLn "stdin"
  str <- lift getLine
  yield str

countLetters :: Consumer String IO r
countLetters = forever $ do
  lift $ putStrLn "countLetters"
  str <- await
  lift . putStrLn . show . length $ str

-- this works in pull mode
runEffect (stdin >-> countLetters)

-- equivalent to above, works
runEffect ((\() -> stdin) +>> countLetters)

-- push based operator, doesn't do what I hoped
runEffect (stdin >>~ (\_ -> countLetters))

-- does not compile
runEffect (countLetters >>~ (\() -> stdin))

Solution

  • -- push based operator, doesn't do what I hoped
    runEffect (stdin >>~ (\_ -> countLetters))
    

    I gather the problem here is that, while the producer is ran first as expected, the first produced value is dropped. Compare...

    GHCi> runEffect (stdin >-> countLetters)
    countLetters
    stdin
    foo
    3
    countLetters
    stdin
    glub
    4
    countLetters
    stdin
    

    ... with:

    GHCi> runEffect (stdin >>~ (\_ -> countLetters))
    stdin
    foo
    countLetters
    stdin
    glub
    4
    countLetters
    stdin
    

    This issue is discussed in detail by Gabriella Gonzalez's answer to this question. It boils down to how the argument to the function you give to (>>~) is the "driving" input in the push-based flow, and so if you const it away you end up dropping the first input. The solution is to reshape countLetters accordingly:

    countLettersPush :: String -> Consumer String IO r
    countLettersPush str = do
      lift $ putStrLn "countLetters"
      lift . putStrLn . show . length $ str
      str' <- await
      countLettersPush str'
    
    GHCi> runEffect (stdin >>~ countLettersPush)
    stdin
    foo
    countLetters
    3
    stdin
    glub
    countLetters
    4
    stdin
    

    I've also seen in discussions here that there is no push based counterpart to >-> because it is easy to turn any pipe around (I assume with reflect?)

    I'm not fully sure of my ground, but it seems that doesn't quite apply to the solution above. What we can do, now that we have the push-based flow working correctly, is using reflect to turn it around back to a pull-based flow:

    -- Preliminary step: switching to '(>~>)'.
    stdin >>~ countLettersPush
    (const stdin >~> countLettersPush) ()
    
    -- Applying 'reflect', as the documentation suggests.
    reflect . (const stdin >~> countLettersPush)
    reflect . const stdin <+< reflect . countLettersPush
    const (reflect stdin) <+< reflect . countLettersPush
    
    -- Rewriting in terms of '(+>>)'.
    (reflect . countLettersPush >+> const (reflect stdin)) ()
    reflect . countLettersPush +>> reflect stdin
    

    This is indeed pull-based, as the flow is driven by reflect stdin, the downstream Client:

    GHCi> :t reflect stdin
    reflect stdin :: Proxy String () () X IO r
    GHCi> :t reflect stdin :: Client String () IO r
    reflect stdin :: Client String () IO r :: Client String () IO r
    

    The flow, however, involves sending Strings upstream, and so it cannot be expressed in terms of (>->), which is, so to say, downstream-only:

    GHCi> -- Compare the type of the second argument with that of 'reflect stdin'
    GHCi> :t (>->)
    (>->)
      :: Monad m =>
         Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m