postgresql-simple
provides functions for streaming queries, e.g.
fold
:: (FromRow row, ToRow params)
=> Connection -> Query -> params -> a -> (a -> row -> IO a) -> IO a
I want to create a conduit source which takes full advantage of streaming.
mySource :: (FromRow row, Monad m) => Source m row
Unfortunately, because IO
appears in a contravariant position (I think?) in fold
, I'm really struggling with the types. The following type-checks, but folds the entire stream before yielding values.
getConduit :: Connection -> IO (C.ConduitM () Event IO ())
getConduit conn = fold_ conn queryEventRecord CL.sourceNull foo
where
foo :: C.ConduitM () Event IO () -> Event -> IO (C.ConduitM () Event IO ())
foo cond evt = pure (cond >> C.yield evt)
Any pointers on how to implement this would be greatly appreciated! Thanks!
One (not so nice) way to go about this it to
TMChan
to receive rowsforeach_
to just dump rows into this channelstm-conduit
to make a source out of the channelI don't have the means to test this off-hand, but the following should work
import Conduit
import Database.PostgreSQL.Simple (foreach_)
import Data.Conduit.TMChan (sourceTMChan)
import Control.Concurrent.STM.TMChan (newTMChanIO, writeTMChan, atomically)
mySource :: (FromRow row, MonadIO m) => Connection -> Query -> IO (Source m row)
mySource connection query = do
chan <- newTMChanIO
forEach_ connection query (atomically . writeTMChan chan)
pure (sourceTMChan chan)
If only we had forEach_ :: (MonadIO m, FromRow r) => Connection -> Query -> (r -> m ()) -> m ()
this might be easier...