Search code examples
haskellconduitpostgresql-simple

Creating a streaming Conduit Source with postgresql-simple


postgresql-simple provides functions for streaming queries, e.g.

fold 
  :: (FromRow row, ToRow params)
  => Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a

I want to create a conduit source which takes full advantage of streaming.

mySource :: (FromRow row, Monad m) => Source m row

Unfortunately, because IO appears in a contravariant position (I think?) in fold, I'm really struggling with the types. The following type-checks, but folds the entire stream before yielding values.

getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
  where
    foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
    foo cond evt = pure (cond >> C.yield evt)

Any pointers on how to implement this would be greatly appreciated! Thanks!


Solution

  • One (not so nice) way to go about this it to

    • make a new TMChan to receive rows
    • set foreach_ to just dump rows into this channel
    • finally use stm-conduit to make a source out of the channel

    I don't have the means to test this off-hand, but the following should work

    import Conduit
    import Database.PostgreSQL.Simple (foreach_)
    import Data.Conduit.TMChan (sourceTMChan)
    import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically)
    
    mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row)
    mySource connection query = do
      chan <- newTMChanIO
      forEach_ connection query (atomically . writeTMChan chan)
      pure (sourceTMChan chan)
    

    If only we had forEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m ()) -> m () this might be easier...