Search code examples
haskellaesonhaskell-pipes

Decoding JSON stream where some values are needed before others


Let us say we have a JSON object like this (with base64 encoded bytestring):

TaggedImage = TaggedImage {  id :: Text, image :: ByteString }

Now, we want to receive image from a source, and store it in a location using the information in id tag. So, what that means is that id must be parsed ahead of time (to determine location for the image) while image is parsed in streaming fashion. Is this straight-forward to do?

I am planning to use pipes-aeson, aws (for S3 storage) and pipes to do streaming decoding from Websocket producer with a S3 bucket as the consumer (which can't be created until we parse the id to determine the location of S3 bucket). Looking at decoded method, I can't figure out if I could indeed do what I asked above. This is my first attempt at streaming in JSON and pipes. So, help will be very much appreciated.

A simple example with read and write to the filesystem will also do as stand-in for Websocket producer and S3 consumer.

Addendum

Since JSON key-value pairs are unordered according to RFC while the arrays are ordered, image data might come before id, it seems, for the data type I defined above. So, it might also help to change it to JSON array (a tuple in Haskell which aeson TH derivation seems to convert to ordered array). Please feel free to change the data type definition if need be, to impose ordering for decoding. For example, the data type might be changed to:

TaggedImage = TaggedImage (Text,ByteString)

Solution

  • I believe you won’t be able to reuse the pipes-aeson library because it doesn't provide a way to stream over a nested field of a decoded JSON record nor is there any support for cursor-like navigation of the struct. That means that you will need to parse the skeleton of the JSON record by hand.

    Also, some work needs to be done to wrap the base64-bytestring in a pipes-like API with this type:

    -- Convert a base64-encoded stream to a raw byte stream
    decodeBase64
        :: Producer ByteString m r
        -- ^ Base64-encoded bytes
        -> Producer ByteString m (Either SomeException (Producer ByteString m r)) 
        -- ^ Raw bytes
    

    Note that the result returns a Producer for the remainder of the byte string (i.e. everything after the base64-encoded bytes) if the decoding completes successfully. This lets you resume parsing where the image bytes end.

    However, assuming that you have a decodeBase64 function, then the rough outline of how the code would work is that you’d have three parts:

    • Parse the prefix of the record before the image bytes using a binary parser adapted to pipes
    • Use the decodeBase64 function to stream the decoded image bytes
    • Parse the suffix of the record after the image bytes also using a binary parser adapted to pipes

    In other words, the types and implementation would look roughly like this:

    -- This would match the "{ 'id' : 'foo', 'image' : '" prefix of the JSON record
    skipPrefix :: Data.Binary.Get ()
    
    skipPrefix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
    skipPrefix’ = execStateT (Pipes.Binary.decodeGet skipPrefix)
    
    — This would match the "' }" suffix of the JSON record
    skipSuffix :: Data.Binary.Get ()
    
    skipSuffix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
    skipSuffix’ = execStateT (Pipes.Binary.decodeGet skipSuffix)
    
    streamImage
        ::  Monad m
        =>  Producer ByteString m r
        ->  Producer ByteString m (Either SomeException (Producer ByteString m r))
    streamImage p0 = do
        e0 <- lift (skipPrefix’ p0)
        case e0 of
            Left exc -> return (Left (toException exc))
            Right p1 -> do
                e1 <- decodeBase64 p1
                case e1 of
                    Left exc -> return (Left exc)
                    Right p2 -> do
                        e2 <- lift (skipSuffix’ p2)
                        case e2 of
                            Left exc -> return (Left (toException exc))
                            Right p3 -> return (Right p3)
    

    In other words, streamImage would take a Producer as input that begins at the first character of the JSON record, and it will stream the decoded image bytes extracted from that record. If decoding succeeds, then it will return the remainder of the byte stream immediately after the JSON record.