By default pipes are pull based. This is due to the operator >->
which is implemented via +>>
which is the pointful bind
operator for his pull category. My understanding is that this means that if you have code like producer >-> consumer
, the consumer's body will be called first, then once it awaits data, the producer will be called.
I've seen in the pipes
documentation here that you can use the code (reflect .)
from Pipes.Core
to turn a pull based pipe into a push based pipe. That means instead (correct me if I'm wrong) that in the code above producer >-> consumer
, the producer is run first, produces a value, then the consumer tries to consume. That seems really useful and I'd like to know how to do it.
I've also seen in discussions here that there is no push based counterpart to >->
because it is easy to turn any pipe around (I assume with reflect?), but I can't really figure how to do it or find any examples.
Here's some code I've attempted:
stdin :: Producer String IO r
stdin = forever $ do
lift $ putStrLn "stdin"
str <- lift getLine
yield str
countLetters :: Consumer String IO r
countLetters = forever $ do
lift $ putStrLn "countLetters"
str <- await
lift . putStrLn . show . length $ str
-- this works in pull mode
runEffect (stdin >-> countLetters)
-- equivalent to above, works
runEffect ((\() -> stdin) +>> countLetters)
-- push based operator, doesn't do what I hoped
runEffect (stdin >>~ (\_ -> countLetters))
-- does not compile
runEffect (countLetters >>~ (\() -> stdin))
-- push based operator, doesn't do what I hoped runEffect (stdin >>~ (\_ -> countLetters))
I gather the problem here is that, while the producer is ran first as expected, the first produced value is dropped. Compare...
GHCi> runEffect (stdin >-> countLetters)
countLetters
stdin
foo
3
countLetters
stdin
glub
4
countLetters
stdin
... with:
GHCi> runEffect (stdin >>~ (\_ -> countLetters))
stdin
foo
countLetters
stdin
glub
4
countLetters
stdin
This issue is discussed in detail by Gabriella Gonzalez's answer to this question. It boils down to how the argument to the function you give to (>>~)
is the "driving" input in the push-based flow, and so if you const
it away you end up dropping the first input. The solution is to reshape countLetters
accordingly:
countLettersPush :: String -> Consumer String IO r
countLettersPush str = do
lift $ putStrLn "countLetters"
lift . putStrLn . show . length $ str
str' <- await
countLettersPush str'
GHCi> runEffect (stdin >>~ countLettersPush)
stdin
foo
countLetters
3
stdin
glub
countLetters
4
stdin
I've also seen in discussions here that there is no push based counterpart to
>->
because it is easy to turn any pipe around (I assume with reflect?)
I'm not fully sure of my ground, but it seems that doesn't quite apply to the solution above. What we can do, now that we have the push-based flow working correctly, is using reflect
to turn it around back to a pull-based flow:
-- Preliminary step: switching to '(>~>)'.
stdin >>~ countLettersPush
(const stdin >~> countLettersPush) ()
-- Applying 'reflect', as the documentation suggests.
reflect . (const stdin >~> countLettersPush)
reflect . const stdin <+< reflect . countLettersPush
const (reflect stdin) <+< reflect . countLettersPush
-- Rewriting in terms of '(+>>)'.
(reflect . countLettersPush >+> const (reflect stdin)) ()
reflect . countLettersPush +>> reflect stdin
This is indeed pull-based, as the flow is driven by reflect stdin
, the downstream Client
:
GHCi> :t reflect stdin
reflect stdin :: Proxy String () () X IO r
GHCi> :t reflect stdin :: Client String () IO r
reflect stdin :: Client String () IO r :: Client String () IO r
The flow, however, involves sending String
s upstream, and so it cannot be expressed in terms of (>->)
, which is, so to say, downstream-only:
GHCi> -- Compare the type of the second argument with that of 'reflect stdin'
GHCi> :t (>->)
(>->)
:: Monad m =>
Proxy a' a () b m r -> Proxy () b c' c m r -> Proxy a' a c' c m