Let us say we have a JSON object like this (with base64 encoded bytestring):
TaggedImage = TaggedImage { id :: Text, image :: ByteString }
Now, we want to receive image
from a source, and store it in a location using the information in id
tag. So, what that means is that id
must be parsed ahead of time (to determine location for the image) while image
is parsed in streaming fashion. Is this straight-forward to do?
I am planning to use pipes-aeson
, aws
(for S3
storage) and pipes
to do streaming decoding from Websocket
producer with a S3
bucket as the consumer (which can't be created until we parse the id
to determine the location of S3
bucket). Looking at decoded
method, I can't figure out if I could indeed do what I asked above. This is my first attempt at streaming in JSON and pipes. So, help will be very much appreciated.
A simple example with read and write to the filesystem will also do as stand-in for Websocket producer
and S3 consumer
.
Addendum
Since JSON key-value pairs are unordered according to RFC while the arrays are ordered, image
data might come before id
, it seems, for the data type I defined above. So, it might also help to change it to JSON array (a tuple in Haskell which aeson
TH derivation seems to convert to ordered array). Please feel free to change the data type definition if need be, to impose ordering for decoding. For example, the data type might be changed to:
TaggedImage = TaggedImage (Text,ByteString)
I believe you won’t be able to reuse the pipes-aeson
library because it doesn't provide a way to stream over a nested field of a decoded JSON record nor is there any support for cursor-like navigation of the struct. That means that you will need to parse the skeleton of the JSON record by hand.
Also, some work needs to be done to wrap the base64-bytestring
in a pipes
-like API with this type:
-- Convert a base64-encoded stream to a raw byte stream
decodeBase64
:: Producer ByteString m r
-- ^ Base64-encoded bytes
-> Producer ByteString m (Either SomeException (Producer ByteString m r))
-- ^ Raw bytes
Note that the result returns a Producer
for the remainder of the byte string (i.e. everything after the base64-encoded bytes) if the decoding completes successfully. This lets you resume parsing where the image bytes end.
However, assuming that you have a decodeBase64
function, then the rough outline of how the code would work is that you’d have three parts:
binary
parser adapted to pipes
decodeBase64
function to stream the decoded image bytesbinary
parser adapted to pipes
In other words, the types and implementation would look roughly like this:
-- This would match the "{ 'id' : 'foo', 'image' : '" prefix of the JSON record
skipPrefix :: Data.Binary.Get ()
skipPrefix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipPrefix’ = execStateT (Pipes.Binary.decodeGet skipPrefix)
— This would match the "' }" suffix of the JSON record
skipSuffix :: Data.Binary.Get ()
skipSuffix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipSuffix’ = execStateT (Pipes.Binary.decodeGet skipSuffix)
streamImage
:: Monad m
=> Producer ByteString m r
-> Producer ByteString m (Either SomeException (Producer ByteString m r))
streamImage p0 = do
e0 <- lift (skipPrefix’ p0)
case e0 of
Left exc -> return (Left (toException exc))
Right p1 -> do
e1 <- decodeBase64 p1
case e1 of
Left exc -> return (Left exc)
Right p2 -> do
e2 <- lift (skipSuffix’ p2)
case e2 of
Left exc -> return (Left (toException exc))
Right p3 -> return (Right p3)
In other words, streamImage
would take a Producer
as input that begins at the first character of the JSON record, and it will stream the decoded image bytes extracted from that record. If decoding succeeds, then it will return the remainder of the byte stream immediately after the JSON record.