Search code examples
haskellhaskell-pipes

How can I idiomatically and efficiently consume a Pipe in some non-IO monad, with an IO action?


I have a Producer that creates values that depend on randomness, using my own Random monad:

policies :: Producer (Policy s a) Random x

Random is a wrapper over mwc-random that can be run from ST or IO:

newtype Random a =
  Random (forall m. PrimMonad m => Gen (PrimState m) -> m a)

runIO :: Random a -> IO a
runIO (Random r) = MWC.withSystemRandom (r @ IO)

The policies producer yields better and better policies from a simple reinforcement learning algorithm.

I can efficiently plot the policy after, say, 5,000,000 iterations by indexing into policies:

Just convergedPolicy <- Random.runIO $ Pipes.index 5000000 policies
plotPolicy convergedPolicy "policy.svg"

I now want to plot the intermediate policies on every 500,000 steps to see how they converge. I wrote a couple of functions that take the policies producer and extract a list ([Policy s a]) of, say, 10 policies—one every 500,000 iterations—and then plot all of them.

However, these functions take far longer (10x) and use more memory (4x) than just plotting the final policy as above, even though the total number of learning iterations should be the same (ie 5,000,000). I suspect that this is due to extracting a list inhibiting the garbage collector, and this seems to be an unidiomatic use of Pipes:

Idiomatic pipes style consumes the elements immediately as they are generated instead of loading all elements into memory.

What's the correct approach to consuming a pipe like this when the Producer is over some random monad (ie Random) and the effect I want to produce is in IO?

Put another way, I want to plug a Producer (Policy s a) Random x into a Consumer (Policy s a) IO x.


Solution

  • Random is a reader that reads a generator

    import Control.Monad.Primitive
    import System.Random.MWC
    
    newtype Random a = Random {
        runRandom :: forall m. PrimMonad m => Gen (PrimState m) -> m a
    }
    

    We can trivially convert a Random a into a ReaderT (Gen (PrimState m)) m a. This trivial operation is the one you want to hoist to turn a Producer ... Random a into a Producer ... IO a.

    import Control.Monad.Trans.Reader
    
    toReader :: PrimMonad m => Random a -> ReaderT (Gen (PrimState m)) m a
    toReader = ReaderT . runRandom
    

    Since toReader is trivial there won't be any random generation overhead from hoisting it. This function is written just to demonstrate its type signature.

    import Pipes
    
    hoistToReader :: PrimMonad m => Proxy a a' b b' Random                          r ->
                                    Proxy a a' b b' (ReaderT (Gen (PrimState m)) m) r
    hoistToReader = hoist toReader
    

    There are two approaches to take here. The simple approach is to hoist your Consumer into the same monad, compose the pipes together, and run them.

    type ReadGenIO = ReaderT GenIO IO
    
    toReadGenIO :: MFunctor t => t Random a -> t ReadGenIO a
    toReadGenIO = hoist toReader
    
    int :: Random Int
    int = Random uniform
    
    ints :: Producer Int Random x
    ints = forever $ do
        i <- lift int
        yield i
    
    sample :: Show a => Int -> Consumer a IO ()
    sample 0 = return ()
    sample n = do
        x <- await
        lift $ print x
        sample (n-1)
    
    sampleSomeInts :: Effect ReadGenIO ()
    sampleSomeInts = hoist toReader ints >-> hoist lift (sample 1000)
    
    runReadGenE :: Effect ReadGenIO a -> IO a
    runReadGenE = withSystemRandom . runReaderT . runEffect
    
    example :: IO ()
    example = runReadGenE sampleSomeInts
    

    There's another set of tools in Pipes.Lift that users of pipes should be aware of. These are the tools for running transformers like your Random monad by distributing it over a Proxy. There are pre-built tools here for running the familiar transformers from the transformers library. They are all built out of distribute. It turns a Proxy ... (t m) a into a t (Proxy ... m) a which you can run once with whatever tools you use to run a t.

    import Pipes.Lift
    
    runRandomP :: PrimMonad m => Proxy a a' b b' Random r ->
                                 Gen (PrimState m) -> Proxy a a' b b' m r
    runRandomP = runReaderT . distribute . hoist toReader
    

    You can finish combining the pipes together and use runEffect to get rid of the Proxys, but you'd be juggling the generator argument yourself as you combine the Proxy ... IO rs together.