Search code examples
haskellparallel-processingpipeline

How do you parallelize lazily read information from stdin in Haskell?


I'm working with this code I wrote, and for some reason threadscope keeps telling me that it's almost never using more than one core at a time. I think the problem is that in order to get the second line it needs to fully evaluate the first line, but I can't figure out an easy way to get it to read in 11 lines at a time.

module Main where

import Control.Parallel
import Control.Parallel.Strategies
import System.IO
import Data.List.Split
import Control.DeepSeq

process :: [String] -> [String]
process lines = do
    let xs = map (\x -> read x :: Double) lines
        ys = map (\x -> 1.0 / (1.0 + (exp (-x)))) xs
        retlines = map (\x -> (show x ) ++ "\n") ys
    retlines

main :: IO ()
main = do
    c <- getContents
    let xs = lines c
        ys = (process xs) `using` parBuffer 11 rdeepseq
    putStr (foldr (++) [] ys)


Solution

  • If I am reading this code right, parBuffer n only sparks the first n elements -- all the rest are evaluated in the usual Haskell way.

    parBuffer :: Int -> Strategy a -> Strategy [a]
    parBuffer n strat = parBufferWHNF n . map (withStrategy strat)
    
    parBufferWHNF :: Int -> Strategy [a]
    parBufferWHNF n0 xs0 = return (ret xs0 (start n0 xs0))
      where -- ret :: [a] -> [a] -> [a]
               ret (x:xs) (y:ys) = y `par` (x : ret xs ys)
               ret xs     _      = xs
    
            -- start :: Int -> [a] -> [a]
               start 0   ys     = ys
               start !_n []     = []
               start !n  (y:ys) = y `par` start (n-1) ys
    

    Note in particular that start 0 ys = ys and not, say, start 0 ys = evaluateThePreviousChunk `pseq` start n0 ys or something that would start up more sparks. The documentation definitely doesn't make this clear -- I don't think "rolling buffer strategy" obviously implies this behavior, and I agree it's a bit surprising, to the point that I wonder whether this is just a bug in the parallel library that nobody caught yet.

    You probably want parListChunk instead.