Search code examples
haskellhaskell-pipes

Streaming bytes to network websocket


I have a code that uses a file handle to simulate sink for the streaming Bytestring from a source (AWS S3). If we want to use Network.Websocket as the sink, would it suffice to swap LBS.writeFile in the code below with sendBinaryData (with handle to connection)?

{-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}

import qualified Aws
import qualified Aws.S3 as S3
import           Data.Conduit (($$+-))
import qualified Data.Conduit.List as CL (mapM_)
import qualified Data.ByteString.Streaming.HTTP as SP
import qualified Data.ByteString.Lazy as LBS
import Streaming as S
import Streaming.Prelude as S hiding (show,print)
import Control.Concurrent.Async (async,waitCatch)
import Data.Text as T (Text)

data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration, _aws_s3cfg :: S3.S3Configuration a, _aws_httpmgr :: SP.Manager }

getObject :: AwsConfig Aws.NormalQuery -> T.Text -> T.Text ->  IO Int
getObject cfg bucket key = do
  req <- waitCatch =<< async (runResourceT $ do
    {- Create a request object with S3.getObject and run the request with pureAws. -}
    S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
      Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
        S3.getObject bucket key
    {- Stream the response to a lazy bytestring -}
    liftIO $ LBS.writeFile "testaws" LBS.empty -- this will be replaced by content-length of the bytes 
    let obj = (($$+- CL.mapM_ S.yield) . hoist lift ) (SP.responseBody rsp)
    S.mapM_ (liftIO . (LBS.appendFile "testaws") . LBS.fromStrict) obj
    return $ lookup "content-length" (S3.omUserMetadata mdata))
  case req of
    Left _ -> return 2 -- perhaps, we could use this to send an error message over websocket
    Right _ -> return 0

The source of confusion for me is how the termination of the stream is determined? In case of files, this is taken care of by writeFile API. What about sendBinaryData? Does it handle termination in similar way as writeFile? Or is it determined by the data parser on the client side?

Update

This question is about how to stream the data to a websocket handle (let us assume a handle has been provided) like we do with the file handle in the example above, not really about how to manage the handle within resourceT. conduit does seem to take mapM_ approach to sink data. So, it seems that is indeed the way to go.

The termination question is because of this line of thought I have: if we have a function listening for data on the other side of a Websocket handle, then determining end of message seems to matter in streaming context. Given a function like below:

f :: LBS.ByteString -> a

if we do S.mapM_ to stream the data to websocket handle, does it take care of adding some kind of end of stream marker so that f listening on the other side can stop processing the lazy bytestring. Otherwise f won't know when the message is complete.


Solution

  • Here's a few bits and pieces that may make things more intelligible. First, for the first little demo, revising your getObject, I use Streaming.ByteString.writeFile, which is in ResourceT anyway, to drop the detour by lazy bytestring.

    {-# LANGUAGE OverloadedStrings,ScopedTypeVariables #-}
    
    import qualified Aws
    import qualified Aws.S3 as S3
    import           Data.Conduit 
    import qualified Data.Conduit.List as CL (mapM_)
    import qualified Data.ByteString.Streaming.HTTP as HTTP
    import qualified Data.ByteString.Streaming as SB
    import qualified Data.ByteString.Streaming.Internal as SB
    import qualified Data.ByteString as B
    import qualified Data.ByteString.Lazy as BL
    import           Streaming as S
    import           Streaming.Prelude as S hiding (show,print)
    import           Control.Concurrent.Async (async,waitCatch)
    import           Data.Text as T (Text) 
    import qualified Network.WebSockets as WebSockets
    import           Control.Monad.Trans.Resource
    
    data AwsConfig a = AwsConfig { _aws_cfg :: Aws.Configuration
                                 , _aws_s3cfg :: S3.S3Configuration a
                                 , _aws_httpmgr :: HTTP.Manager }
    
    getObject :: AwsConfig Aws.NormalQuery -> FilePath -> T.Text -> T.Text ->  IO Int
    getObject cfg file bucket key = do
      req <- waitCatch =<< async (runResourceT $ do
        S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
          Aws.pureAws (_aws_cfg cfg) (_aws_s3cfg cfg) (_aws_httpmgr cfg) $
            S3.getObject bucket key
        let bytestream = do 
             -- lookup "content-length" (S3.omUserMetadata mdata))
             SB.chunk B.empty -- this will be replaced by content-length 
             hoist lift (HTTP.responseBody rsp)  $$+- CL.mapM_ SB.chunk 
        SB.writeFile file bytestream ) -- this is in ResourceT 
      case req of
        Left _ -> return 2
        Right _ -> return 0
    

    We can abstract from this more or less what you were doing with SB.writeFile:

    getObjectAbstracted
          :: (SB.ByteString (ResourceT IO) () -> ResourceT IO b)
             -> AwsConfig Aws.NormalQuery -> S3.Bucket -> Text -> ResourceT IO b
    getObjectAbstracted action cfg bucket key = do
        S3.GetObjectResponse { S3.gorResponse = rsp, S3.gorMetadata = mdata } <- 
          Aws.pureAws (_aws_cfg cfg) 
                      (_aws_s3cfg cfg) 
                      (_aws_httpmgr cfg) 
                      (S3.getObject bucket key)
    
        action (hoist lift (HTTP.responseBody rsp)  $$+- CL.mapM_ SB.chunk) 
    

    Here now, we need a little helper not included in the streaming bytestring library

    mapMChunks_ :: Monad m => (B.ByteString -> m ()) -> SB.ByteString m r -> m r
    mapMChunks_ act bytestream = do
      (a S.:> r) <- SB.foldlChunksM (\_ bs -> act bs) (return ()) bytestream
      return r
    

    and can proceed more or less as @haoformayor planned, using streaming bytestring

    writeConnection :: MonadIO m => WebSockets.Connection -> SB.ByteString m r -> m r
    writeConnection connection  = 
      mapMChunks_ (liftIO . WebSockets.sendBinaryData connection)
    
    -- following `haoformayor`
    connectWrite
        :: (MonadResource m, WebSockets.WebSocketsData a) 
        => WebSockets.PendingConnection 
        -> a                  -- closing  message
        -> SB.ByteString m r  -- stream from aws
        -> m r
    connectWrite request closeMessage bytestream = do
        (releaseKey, connection) <- allocate (WebSockets.acceptRequest request)
                                             (`WebSockets.sendClose` closeMessage)
        writeConnection connection bytestream
    
    getObjectWS :: WebSockets.WebSocketsData a =>
           WebSockets.PendingConnection
           -> a
           -> AwsConfig Aws.NormalQuery
           -> S3.Bucket
           -> Text
           -> ResourceT IO ()
    getObjectWS request closeMessage = getObjectAbstracted (connectWrite request closeMessage)
    

    Of course none of this so far makes any use of the difference between conduit and streaming/streaming-bytestring.