Search code examples
haskellconcurrencyhaskell-pipes

How can I make a Pipe concurrent with Haskell's Pipe library?


I have some Haskell code that uses Pipes:

module Main(main) where
import Pipes

a :: Producer Int IO ()
a = each [1..10]

b :: Pipe Int Int IO ()
b = do
  x <- await
  yield (x*2)
  b

c :: Consumer Int IO ()
c = do
  x <- await
  lift $ print x
  c

main :: IO ()
main = runEffect $ a >-> b >-> c

The Pipes.Concurrent tutorial demonstrates using multiple workers along with work stealing. How can I do something similar inside of b? I would like b to perform it's work concurrently using a set number of workers.

Obviously, concurrency isn't useful in this exact case, but it's the simplest example I could come up with. In my real use case I'd like to make some web requests concurrently using a limited number of workers.


Solution

  • EDIT: I misunderstood what you were asking; You may be able to do this inside a pipe, but I'm not really sure what the motivation would be. I'd recommend building re-usable pipe chains and just dispatching to them using workers rather than trying to build workers INSIDE the pipe. You'll lose any ordering guarantees that the first in is the first out if you build it into the pipe itself.

    The section on Work Stealing is what you're looking for, this code is basically verbatim from the tutorial, but let's break down how it works. Here's one way we could do what you want:

    module Main(main) where
    import Pipes
    import Pipes.Concurrent
    
    import Control.Concurrent.Async (async, wait)
    import Control.Concurrent (threadDelay)
    import Control.Monad (forM)
    
    a :: Producer Int IO ()
    a = each [1..10]
    
    b :: Pipe Int Int IO ()
    b = do
      x <- await
      yield (x*2)
      b
    
    c :: Consumer Int IO ()
    c = do
      x <- await
      lift $ print x
      c
    
    main :: IO ()
    main = do
      (output, input) <- spawn unbounded
      feeder <- async $ do runEffect $ a >-> toOutput output
                           performGC
    
      workers <- forM [1..3] $ \i ->
        async $ do runEffect $ fromInput input  >-> b >-> c
                   performGC
    
      mapM_ wait (feeder:workers)
    

    The first line spawn unbounded is from Pipes.Concurrent, it initializes a 'mailbox' that has a handle for input and output. It confused me at first, but in this case we send messages TO the output and pull them FROM the input. This resembles a push-pull message channel in languages like golang.

    We specify a Buffer to say how many messages we can store, in this case we set no-limit with unbounded.

    Okay, so the mailbox is initialized, we can now create Effects which send messages to it. The mailbox channels are implemented using the STM, so that's how it can collect messages asynchronously.

    Let's create an asynchronous job that feeds the mailbox;

    feeder <- async $ do runEffect $ a >-> toOutput output
                         performGC
    

    The a >-> toOutput output is just normal pipe composition, we need toOutput to convert output into a pipe. Note the performGC call that's also part of the IO, it allows Pipes.Concurrent to know to clean up after the job has completed. We could run this using forkIO if we like, but in this case we use async so that we can wait for the result to finish later on. Okay, so our mailbox should be asynchronously receiving messages, let's pull them out and do some work.

    workers <- forM [1..3] $ \i ->
      async $ do runEffect $ fromInput input  >-> b >-> c
                 performGC
    

    Same idea as before, but this time we're just spawning a few of them. We read from the input just like a normal pipe using fromInput and then run it through the rest of our chain, cleaning up when we're done. input will ensure that each time a value is pulled out that only one worker receives it. When all the jobs feeding into output complete (it keeps track of all the open jobs) then it will close the input pipe and the workers will finish.

    If you're using this in a web-worker scenario you would have a main loop which keeps sending requests to the toOutput output channel, and then spawn as many workers as you like who pull into their pipeline from fromInput input.