Search code examples
haskellhaskell-pipes

Pipe that maintains state


I'm trying to calculate rolling hash values (buzzhash) for a big file using pipes.

Currently I have this. But don't know how to write a pipe that maintains a state.

import qualified Data.ByteString.Lazy as L
import Data.Word
import Data.Bits(xor,rotate)
import Data.Array
import Pipes
import Control.Monad.State.Strict
import Control.Monad(forever)

produceFromList (x:xs) = do 
  yield x
  produceFromList xs

buzzHash = do
  x <- await
  h <- lift $ get -- pull out previous value
  let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
  lift $ put h' -- save new value 
  yield h'

stdoutLn :: Consumer Word64 IO ()
stdoutLn = do 
  a <- await 
  lift $ print a

main = do 
  bs <- L.unpack `fmap` L.getContents
  runEffect $ produceFromList bs >-> buzzHash >-> stdoutLn

hashArrW8 :: Array Word8 Word64

How do I make buzzHash save previous value and use it for the calculation of next value? Initial state value should be 0.


Solution

  • You were almost there; you just need to run the state.

    main = do
      bs <- L.unpack `fmap` L.getContents
      flip execStateT 0 $ runEffect $ produceList bs >-> buzzHash >-> hoist lift stdoutLn
    

    I assume you don't want to recover the state, so I use execStateT rather than runStateT.

    The only curiosity here is that stdoutLn was marked as Consumer Word64 IO () . So I use hoist lift to make it Consumer Word64 (StateT Word64 IO) () Everything in the series a >-> b >-> c must agree in the underlying monad and return type.

    Here are a few further comments that might save you time. First produceFromList is each.

    Moreover, you could have avoided the hoist lift by relabeling your stdoutLn:

    stdoutLn :: MonadIO m => Consumer Word64 m ()
    stdoutLn = do 
       a <- await 
       liftIO $ print a
    

    But here there is some trouble: you are not repeating the action. This should pretty clearly be a loop:

    stdoutLn :: MonadIO m => Consumer Word64 m ()
    stdoutLn = do 
       a <- await 
       liftIO $ print a
       stdoutLn
    

    in fact this is already available as P.print, so we can write

    import qualified Pipes.Prelude as P
    main = do
      bs <- L.unpack `fmap` L.getContents
      flip execStateT 0 $ runEffect $ each bs >-> buzzHash >-> P.print
    

    If I understand you, buzzHash is meant to be repeated indefinitely too:

    buzzHash = do
      x <- await
      h <- lift $ get -- pull out previous value
      let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
      lift $ put h' -- save new value 
      yield h'
      buzzHash
    

    (this is forever buzzHash, where we use your buzzHash)

    Finally, if you

     import qualified Pipes.ByteString as PB
     import Control.Lens (view) -- (or Lens.Micro.MTL or Lens.Simple)
    

    we see we don't need the lazy bytestring IO, which doesn't stream properly anyway. Pipes.ByteString already has the unpack we want, packaged as a lens, so that we use view PB.unpack where elsewhere we would use B.unpack. So in the end we can write

    main = flip evalStateT 0 $ runEffect $ view PB.unpack PB.stdin >-> buzzHash >-> P.print
    

    Once it is in this form we see we aren't using the underlying state of the pipeline except in buzzHash, so we can localize this

    import Pipes.Lift (evalStateP) 
    main =  runEffect $ view PB.unpack PB.stdin >-> evalStateP 0 buzzHash >-> P.print
    

    or, if you like you can rewrite

    buzzHash' :: Monad m => Word64 -> Pipe Word8 Word64 m r
    buzzHash' n = evalStateP n $ forever $ do
        x <- await
        h <- lift $ get -- pull out previous value
        let h' = rotate h 1 `xor` (hashArrW8!x) -- calculate new value
        lift $ put h' -- save new value 
        yield h'
    

    Then you would write

    main =  runEffect $ view PB.unpack PB.stdin >-> buzzHash' 0 >-> P.print