Search code examples
haskellhaskell-pipes

Limiting pipes based on time?


Is it possible to create pipes that get all values that have been sent downstream in a certain time period? I'm implementing a server where the protocol allows me to concatenate outgoing packets and compress them together, so I'd like to effectively "empty out" the queue of downstream ByteStrings every 100ms and mappend them together to then yield on to the next pipe which does the compression.


Solution

  • Here's a solution using pipes-concurrency. You give it any Input and it will periodically drain the input of all values:

    import Control.Applicative ((<|>))
    import Control.Concurrent (threadDelay)
    import Data.Foldable (forM_)
    import Pipes
    import Pipes.Concurrent
    
    drainAll :: Input a -> STM (Maybe [a])
    drainAll i = do
        ma <- recv i
        case ma of
            Nothing -> return Nothing
            Just a  -> loop (a:)
      where
        loop diffAs = do
            ma <- recv i <|> return Nothing
            case ma of
                Nothing -> return (Just (diffAs []))
                Just a  -> loop (diffAs . (a:))
    
    bucketsEvery :: Int -> Input a -> Producer [a] IO ()
    bucketsEvery microseconds i = loop
      where
        loop = do
            lift $ threadDelay microseconds
            ma <- lift $ atomically $ drainAll i
            forM_ ma $ \a -> do
                yield a
                loop
    

    This gives you much greater control over how you consume elements from upstream, by selecting the type of Buffer you use to build the Input.

    If you're new to pipes-concurrency, you can read the tutorial which explains how to use spawn, Buffer and Input.