Search code examples
coroutinepurescript

What's the difference between `pullFrom` and `connect` in purescript-coroutines?


I'm not sure I understand the reason why there is a connect and a pullFrom function in purescript-coroutines, and when you would use both of them. Looking at the types, it seems like they're used to the change the "direction of communication" (I'm not sure if that's the right way of thinking about it).

pullFrom :: forall o m a. MonadRec m => Consumer o m a -> Producer o m a -> Process m a

connect :: forall o f m a. (MonadRec m, Parallel f m) => Producer o m a -> Consumer o m a -> Process m a

So if I have a consumer and a producer...

consumer :: forall e a. (Show a) => Consumer a (Eff (console :: CONSOLE | e)) Unit
consumer = forever do
  s <- await
  lift (log $ show s)

numberProducer :: forall m. (Monad m) => Producer Int m Unit
numberProducer = go 0
  where
  go i = do emit i
            go (i + 1)

It makes sense to me that the consumer can pull from the producer, and if I run this, I can see the numbers being shown...

main = do
  runProcess (pullFrom consumer numberProducer)

But if I connect the producer to the consumer, it doesn't seem to do anything. I'm assuming when you connect the producer to the consumer, the signaling is going in the opposite direction from pullFrom, but I'm not sure if that idea is correct.

main = do
  runProcess (connect producer consumer)

Solution

  • Well, this has a fun little surprise in it... I'll come to that in a minute though.

    pullFrom was introduced so that the consumer is "in charge" when a process is formed - the process exists for as long as the consumer is open (awaiting an input).

    connect runs as long as either the producer or consumer are open, and the process only terminates when they both complete.

    To achieve this, connect has a Parallel class constraint, as the created process depends on both consumer and producer - pullFrom does not require this, since the process is dependent only on the consumer.

    This is where the "fun" surprise comes in - and confused me for a minute. Eff is not Parallel... so how does your code work? It's because it's inferring this type for main:

    main :: forall t. (Parallel t (Eff (console :: CONSOLE))) => Eff (console :: CONSOLE) Unit
    

    So nothing happens when your program is run, because in the JS, main is expecting a dictionary to be passed for the Parallel constraint, then the Eff to be evaluated. The generated call to main is just Main.main();, so it never actually evaluates the Eff, as it would need to be Main.main(impossibleParallelDictionary)();.

    Try adding this type to your main:

    main :: Eff (console :: CONSOLE) Unit
    

    And you'll see that it doesn't type check anymore.

    You can use Aff for this though, and with Aff the difference between connect and pullFrom is indistinguishable for this example:

    import Prelude
    
    import Control.Coroutine (Consumer, Producer, await, connect, emit, runProcess)
    import Control.Monad.Aff (Aff, launchAff)
    import Control.Monad.Aff.Console (CONSOLE, log)
    import Control.Monad.Eff (Eff)
    import Control.Monad.Eff.Exception (EXCEPTION)
    import Control.Monad.Rec.Class (forever)
    import Control.Monad.Trans.Class (lift)
    
    consumer :: forall e a. (Show a) => Consumer a (Aff (console :: CONSOLE | e)) Unit
    consumer = forever do
      s <- await
      lift (log $ show s)
    
    numberProducer :: forall m. (Monad m) => Producer Int m Unit
    numberProducer = go 0
      where
      go i = do emit i
                go (i + 1)
    
    main :: Eff (err :: EXCEPTION, console :: CONSOLE) Unit
    main = void $ launchAff $ runProcess (connect numberProducer consumer)
    

    If we modify the example slightly, we can see an illustration of the difference:

    import Prelude
    
    import Control.Coroutine (Consumer, Producer, await, emit, connect, runProcess)
    import Control.Monad.Aff (Aff, launchAff, later')
    import Control.Monad.Aff.Console (CONSOLE, log)
    import Control.Monad.Eff (Eff)
    import Control.Monad.Eff.Exception (EXCEPTION)
    import Control.Monad.Trans.Class (lift)
    
    consumer :: forall e a. (Show a) => Consumer a (Aff (console :: CONSOLE | e)) Unit
    consumer = do
      s <- await
      lift (log $ show s)
    
    numberProducer :: forall eff. Producer Int (Aff eff) Unit
    numberProducer = do
      emit 0
      lift $ later' 10000 $ pure unit
    
    main :: Eff (err :: EXCEPTION, console :: CONSOLE) Unit
    main = void $ launchAff $ runProcess (connect numberProducer consumer)
    

    With this, the program will print 0, wait 10 seconds, and quit. If you switch connect numberProducer consumer out for consumer `pullFrom` numberProducer the program will print 0 and exit immediately.