Search code examples
haskellpipeconduithaskell-pipes

What is pipes/conduit trying to solve


I have seen people recommending pipes/conduit library for various lazy IO related tasks. What problem do these libraries solve exactly?

Also, when I try to use some hackage related libraries, it is highly likely there are three different versions. Example:

This confuses me. For my parsing tasks should I use attoparsec or pipes-attoparsec/attoparsec-conduit? What benefit do the pipes/conduit version give me as compared to the plain vanilla attoparsec?


Solution

  • Lazy IO

    Lazy IO works like this

    readFile :: FilePath -> IO ByteString
    

    where ByteString is guaranteed to only be read chunk-by-chunk. To do so we could (almost) write

    -- given `readChunk` which reads a chunk beginning at n
    readChunk :: FilePath -> Int -> IO (Int, ByteString)
    
    readFile fp = readChunks 0 where
      readChunks n = do
        (n', chunk) <- readChunk fp n
        chunks      <- readChunks n'
        return (chunk <> chunks)
    

    but here we note that the IO action readChunks n' is performed prior to returning even the partial result available as chunk. This means we're not lazy at all. To combat this we use unsafeInterleaveIO

    readFile fp = readChunks 0 where
      readChunks n = do
        (n', chunk) <- readChunk fp n
        chunks      <- unsafeInterleaveIO (readChunks n')
        return (chunk <> chunks)
    

    which causes readChunks n' to return immediately, thunking an IO action to be performed only when that thunk is forced.

    That's the dangerous part: by using unsafeInterleaveIO we've delayed a bunch of IO actions to non-deterministic points in the future that depend upon how we consume our chunks of ByteString.

    Fixing the problem with coroutines

    What we'd like to do is slide a chunk processing step in between the call to readChunk and the recursion on readChunks.

    readFileCo :: Monoid a => FilePath -> (ByteString -> IO a) -> IO a
    readFileCo fp action = readChunks 0 where
      readChunks n = do
        (n', chunk) <- readChunk fp n
        a           <- action chunk
        as          <- readChunks n'
        return (a <> as)
    

    Now we've got the chance to perform arbitrary IO actions after each small chunk is loaded. This lets us do much more work incrementally without completely loading the ByteString into memory. Unfortunately, it's not terrifically compositional--we need to build our consumption action and pass it to our ByteString producer in order for it to run.

    Pipes-based IO

    This is essentially what pipes solves--it allows us to compose effectful co-routines with ease. For instance, we now write our file reader as a Producer which can be thought of as "streaming" the chunks of the file when its effect gets run finally.

    produceFile :: FilePath -> Producer ByteString IO ()
    produceFile fp = produce 0 where
      produce n = do
        (n', chunk) <- liftIO (readChunk fp n)
        yield chunk
        produce n'
    

    Note the similarities between this code and readFileCo above—we simply replace the call to the coroutine action with yielding the chunk we've produced so far. This call to yield builds a Producer type instead of a raw IO action which we can compose with other Pipes types in order to build a nice consumption pipeline called an Effect IO ().

    All of this pipe building gets done statically without actually invoking any of the IO actions. This is how pipes lets you write your coroutines more easily. All of the effects get triggered at once when we call runEffect in our main IO action.

    runEffect :: Effect IO () -> IO ()
    

    Attoparsec

    So why would you want to plug attoparsec into pipes? Well, attoparsec is optimized for lazy parsing. If you are producing the chunks fed to an attoparsec parser in an effectful way then you'll be at an impasse. You could

    1. Use strict IO and load the entire string into memory only to consume it lazily with your parser. This is simple, predictable, but inefficient.
    2. Use lazy IO and lose the ability to reason about when your production IO effects will actually get run causing possible resource leaks or closed handle exceptions according to the consumption schedule of your parsed items. This is more efficient than (1) but can easily become unpredictable; or,
    3. Use pipes (or conduit) to build up a system of coroutines which include your lazy attoparsec parser allowing it to operate on as little input as it needs while producing parsed values as lazily as possible across the entire stream.