Search code examples
haskellpipehaskell-pipes

Haskell Pipes -- Having a pipe consume what it yields (itself)


I'm trying to write a webscraper using Pipes and I've come to the part of following scraped links. I have a process function that downloads a url, finds links, and yields them.

process :: Pipe Item Item (StateT CState IO) ()
 ....
    for (each links) yield
 ....

Now I want to some how recursively follow these links, threading the StateT through. I realize there is probably doing something more idiomatic then using a single pipe for the bulk of the scraper (especially as I start adding more features), I'm open for suggestions. I'm probably going to have to rethink the design when I consider multithreading w/ shared state anyways.


Solution

  • You can connect a Pipe a b m r to a side-effect through the m parameter, which swaps out which Monad the pipe is operating over. You can use this to requeue links by connecting the downstream end of your pipe to another pipe that sticks the links in a queue and connecting the upstream end of your pipe to a pipe that reads links from the queue.

    Our goal is to write

    import Pipes
    
    loopLeft :: Monad m => Pipe (Either l a) (Either l b) m r -> Pipe a b m r
    

    We'll take a pipe whose downstream output, Either l b, is either an Left l to send back upstream or a Right b to send downstream, and send the ls back in the upstream input Either l a, which is either a queued up Left l or a Right a coming from upstream. We'll connect the Left ls together to make a pipe that only sees as coming from upstream and only yields bs headed downstream.

    At the downstream end we'll push the ls from Left l onto a stack. We yield the rs from Right r downstream.

    import Control.Monad
    import Control.Monad.Trans.State
    
    pushLeft :: Monad m => Pipe (Either l a) a (StateT [l] m) r
    pushLeft = forever $ do
        o <- await
        case o of
            Right a -> yield a
            Left l -> do
                stack <- lift get
                lift $ put (l : stack)
    

    At the upstream end we'll look for something on top of the stack to yield. If there isn't one, we'll await for a value from upstream and yield it.

    popLeft :: Monad m => Pipe a (Either l a) (StateT [l] m) r
    popLeft = forever $ do
        stack <- lift get
        case stack of
            [] -> await >>= yield . Right
            (x : xs) -> do
                lift $ put xs
                yield (Left x)
    

    Now we can write loopLeft. We compose the upstream and downstream pipes together with pipe composition popLeft >-> hoist lift p >-> pushLeft. The hoist lift turns a Pipe a b m r into a Pipe a b (t m) r. The distribute turns a Pipe a b (t m) r into a t (Pipe a b m) r. To get back to a Pipe a b m r we run the whole StateT computation starting with an empty stack []. In Pipes.Lift there's a nice name evalStateP for the combination of evalStateT and distribute.

    import Pipes.Lift
    
    loopLeft :: Monad m => Pipe (Either l a) (Either l b) m r -> Pipe a b m r
    loopLeft p = flip evalStateT [] . distribute $ popLeft >-> hoist lift p >-> pushLeft