I am trying to adapt Store
encoding and decoding for Streaming
. Store
already implements a streaming decode
with a function called decodeMessageBS
.
I tried to do a basic implementation of store
deserialization for Streaming
as below (without bracket
for now to keep it simple). However, there seems to be something wrong with the logic for popper
because decodeMessageBS
keeps throwing PeekException
:
{-# LANGUAGE RankNTypes #-}
import Streaming.Prelude as S hiding (print,show)
import Data.IORef
import Streaming as S
import qualified Data.ByteString as BS (ByteString,empty,length)
import System.IO.ByteBuffer
import Data.Store
import Data.Store.Streaming
streamDecode :: forall a. (Store a) => ByteBuffer -> Stream (Of BS.ByteString) IO () -> Stream (Of a) IO ()
streamDecode bb inp = do
ref <- lift $ newIORef inp
let popper = do
r <- S.uncons =<< readIORef ref
case r of
Nothing -> return Nothing
Just (a,rest) -> writeIORef ref rest >> return (Just a)
let go = do
r <- lift $ decodeMessageBS bb $ popper
lift $ print "Decoding"
case r of
Nothing -> return ()
Just msg -> (lift $ print "Message found") >> (S.yield . fromMessage $ msg) >> go
go
I can decode my test file fine with decodeIOPortionWith
- so, the problem seems to be in the logic needed to feed decodeMessageBS
. Will appreciate pointers on what is wrong with the logic of popper
here.
The PeekException
happens because Store
uses a different format when saving messages in streaming mode, unlike Binary
. It expects a wrapper of type Message
around Store
data when using decodeMessageBS
function. decodeIOPortionWith
doesn't expect Message
wrapper and so, works fine with saved down Store
data. After I fixed the serialization to save down the data as Message
encoding, decodeMessageBS
worked fine on that data.