Search code examples
haskellparallel-processingmonads

Parallel Haskell. Rate-Limiting the Producer


In Parallel and Concurrent Programming in Haskell, Simon Marlow provides a Stream a based on the following data, together with some producer and consumer:

data IList a
  = Nil
  | Cons a (IVar (IList a))

type Stream a = IVar (IList a)

streamFromList :: NFData a => [a] -> Par (Stream a)
streamFromList xs = do
      var <- new
      fork $ loop xs var
      return var
    where
      loop [] var = put var Nil
      loop (x:xs) var = do
        tail <- new
        put var (Cons x tail)
        loop xs tail

Later, he mentions the drawbacks of this approach and proposes a solution:

In our previous example, the consumer was faster than the producer. If, instead, the producer had been faster than the consumer, then there would be nothing to stop the producer from getting a long way ahead of the consumer and building up a long IList chain in memory. This is undesirable, because large heap data structures incur overhead due to garbage collection, so we might want to rate-limit the producer to avoid it getting too far ahead. There’s a trick that adds some automatic rate-limiting to the stream API. It entails adding another constructor to the IList type:

data IList a
    = Nil
    | Cons a (IVar (IList a))
    | Fork (Par ()) (IList a)

However, he doesn't finish this approach:

I’ll leave the rest of the implementation of this idea as an exercise for you to try on your own. See if you can modify streamFromList, streamFold, and streamMap to incorporate the Fork constructor. The chunk size and fork distance should be parameters to the producers (streamFromList and streamMap).

The same question has been asked on the mailing list, but nobody gave an answer.

So how could one limit the rate of the producer?


Solution

  • The important part lies in the loop function:

      loop [] var = put var Nil
      loop (x:xs) var = do
        tail <- new
        put var (Cons x tail)
        loop xs tail
    

    We need to add the fork distance f and the chunk size c as parameters:

      loop _ _ [] var = put var Nil
      loop 0 c (x:xs) var = -- see below
      loop f c (x:xs) var = do
        tail <- new
        put var (Cons x tail)
        loop (f-1) c xs tail
    

    The fork distance gets reduced in every iteration. What do we need to do when the fork distance is zero? We provide a Fork op t, where op continues to produce the list:

      loop 0 c (x:xs) var = do
        tail <- new
        let op = loop c xs tail
        put var (Fork op (Cons x tail))
    

    Note that we don't use Fork if the list is empty. That would be possible, but is a little bit silly, after all, there's nothing to be produced left. Changing streamFromList is now simple:

    streamFromList :: NFData a => Int -> Int -> [a] -> Par (Stream a)
    streamFromList f c xs = do
      var <- new                            
      fork $ loop f c xs var                 
      return var 
    

    Now, in order to use it, we need to change the case in streamFold:

    streamFold :: (a -> b -> a) -> a -> Stream b -> Par a
    streamFold fn acc instrm = acc `seq` do
      ilst <- get instrm
      case ilst of
        Cons h t          -> streamFold fn (fn acc h) t
        Fork p (Cons h t) -> -- see below
        _                 -> return acc
    

    Remember, we didn't allow an empty list in the Fork in our streamFromList, but just in case we're matching it (and Nil) via wildcard.

    What do we need to do if we encounter a Fork with data? First of all, we need to use fork to run the Par () operation in order to propagate t, and then we can start to use it. So our last case is

        Fork p (Cons h t) -> fork p >> streamFold fn (fn acc h) t
    

    streamMap is analogous. Only in this case you use additional parameters on your loop again like in streamFromList.