Search code examples
haskellhaskell-streaming

Adapting Store decoding for streaming library


I am trying to adapt Store encoding and decoding for Streaming. Store already implements a streaming decode with a function called decodeMessageBS.

I tried to do a basic implementation of store deserialization for Streaming as below (without bracket for now to keep it simple). However, there seems to be something wrong with the logic for popper because decodeMessageBS keeps throwing PeekException:

{-# LANGUAGE  RankNTypes #-}
import Streaming.Prelude as S hiding (print,show)
import Data.IORef
import Streaming as S
import qualified Data.ByteString as BS (ByteString,empty,length)
import System.IO.ByteBuffer
import Data.Store
import Data.Store.Streaming

streamDecode :: forall a. (Store a) => ByteBuffer -> Stream (Of BS.ByteString) IO () -> Stream (Of a) IO ()
streamDecode bb inp = do
    ref <- lift $ newIORef inp 
    let popper = do
        r <- S.uncons =<< readIORef ref
        case r of
          Nothing -> return Nothing 
          Just (a,rest) -> writeIORef ref rest >> return (Just a)
    let go = do
          r <- lift $ decodeMessageBS bb $ popper
          lift $ print "Decoding"
          case r of 
            Nothing -> return ()
            Just msg -> (lift $ print "Message found") >> (S.yield . fromMessage $ msg) >> go
    go 

I can decode my test file fine with decodeIOPortionWith - so, the problem seems to be in the logic needed to feed decodeMessageBS. Will appreciate pointers on what is wrong with the logic of popper here.


Solution

  • The PeekException happens because Store uses a different format when saving messages in streaming mode, unlike Binary. It expects a wrapper of type Message around Store data when using decodeMessageBS function. decodeIOPortionWith doesn't expect Message wrapper and so, works fine with saved down Store data. After I fixed the serialization to save down the data as Message encoding, decodeMessageBS worked fine on that data.