I have a code that uses a file handle to simulate sink for the streaming Bytestring
from a source (AWS S3
). If we want to use Network.Websocket
as the sink, would it suffice to swap LBS.writeFile
in the code below with sendBinaryData
(with handle to connection)?
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text -> IO Int
getObject cfg bucket key = do
req <- waitCatch =<< async (runResourceT $ do
{- Create a request object with S3.getObject and run the request with pureAws. -}
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
{- Stream the response to a lazy bytestring -}
liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes
let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
return $ lookup "content-length" (S3.omUserMetadata mdata))
case req of
Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
Right _ -> return 0
The source of confusion for me is how the termination of the stream is determined? In case of files, this is taken care of by writeFile
API. What about sendBinaryData
? Does it handle termination in similar way as writeFile
? Or is it determined by the data parser on the client side?
Update
This question is about how to stream the data to a websocket handle (let us assume a handle has been provided) like we do with the file handle in the example above, not really about how to manage the handle within resourceT
. conduit
does seem to take mapM_
approach to sink data. So, it seems that is indeed the way to go.
The termination question is because of this line of thought I have: if we have a function listening for data on the other side of a Websocket handle, then determining end of message seems to matter in streaming context. Given a function like below:
f :: LBS.ByteString -> a
if we do S.mapM_
to stream the data to websocket handle, does it take care of adding some kind of end of stream
marker so that f
listening on the other side can stop processing the lazy bytestring. Otherwise f
won't know when the message is complete.
Here's a few bits and pieces that may make things more intelligible. First, for the first little demo, revising your getObject
, I use Streaming.ByteString.writeFile
, which is in ResourceT
anyway, to drop the detour by lazy bytestring.
{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
import qualified Aws
import qualified Aws.S3 as S3
import Data.Conduit
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as HTTP
import qualified Data.ByteString.Streaming as SB
import qualified Data.ByteString.Streaming.Internal as SB
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)
import qualified Network.WebSockets as WebSockets
import Control.Monad.Trans.Resource
data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration
, _aws_s3cfg :: S3.S3Configuration a
, _aws_httpmgr :: HTTP.Manager }
getObject :: AwsConfig Aws.NormalQuery -> FilePath -> T.Text -> T.Text -> IO Int
getObject cfg file bucket key = do
req <- waitCatch =<< async (runResourceT $ do
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
S3.getObject bucket key
let bytestream = do
-- lookup "content-length" (S3.omUserMetadata mdata))
SB.chunk B.empty -- this will be replaced by content-length
hoist lift (HTTP.responseBody rsp) $$+- CL.mapM_ SB.chunk
SB.writeFile file bytestream ) -- this is in ResourceT
case req of
Left _ -> return 2
Right _ -> return 0
We can abstract from this more or less what you were doing with SB.writeFile
:
getObjectAbstracted
:: (SB.ByteString (ResourceT IO) () -> ResourceT IO b)
-> AwsConfig Aws.NormalQuery -> S3.Bucket -> Text -> ResourceT IO b
getObjectAbstracted action cfg bucket key = do
S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <-
Aws.pureAws (_aws_cfg cfg)
(_aws_s3cfg cfg)
(_aws_httpmgr cfg)
(S3.getObject bucket key)
action (hoist lift (HTTP.responseBody rsp) $$+- CL.mapM_ SB.chunk)
Here now, we need a little helper not included in the streaming bytestring library
mapMChunks_ :: Monad m => (B.ByteString -> m ()) -> SB.ByteString m r -> m r
mapMChunks_ act bytestream = do
(a S.:> r) <- SB.foldlChunksM (\_ bs -> act bs) (return ()) bytestream
return r
and can proceed more or less as @haoformayor planned, using streaming bytestring
writeConnection :: MonadIO m => WebSockets.Connection -> SB.ByteString m r -> m r
writeConnection connection =
mapMChunks_ (liftIO . WebSockets.sendBinaryData connection)
-- following `haoformayor`
connectWrite
:: (MonadResource m, WebSockets.WebSocketsData a)
=> WebSockets.PendingConnection
-> a -- closing message
-> SB.ByteString m r -- stream from aws
-> m r
connectWrite request closeMessage bytestream = do
(releaseKey, connection) <- allocate (WebSockets.acceptRequest request)
(`WebSockets.sendClose` closeMessage)
writeConnection connection bytestream
getObjectWS :: WebSockets.WebSocketsData a =>
WebSockets.PendingConnection
-> a
-> AwsConfig Aws.NormalQuery
-> S3.Bucket
-> Text
-> ResourceT IO ()
getObjectWS request closeMessage = getObjectAbstracted (connectWrite request closeMessage)
Of course none of this so far makes any use of the difference between conduit
and streaming
/streaming-bytestring
.