Search code examples
c++boostwebsocketboost-beastbeast

Unable to find the reason for "Broken Pipe" error while sending continuous data chunks through Beast websocket


I am working on streaming audio recognition with IBM Watson speech to text web service API. I have created a web-socket with boost (beast 1.68.0) library in C++(std 11).

I have successfully connected to the IBM server, and want to send a 231,296 bytes of raw audio data to server in following manner.

{
  "action": "start",
  "content-type": "audio/l16;rate=44100"
}

websocket.binary(true);
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 50,000 bytes>
<bytes of binary audio data 31,296 bytes>

websocket.binary(false);
{
  "action": "stop"
}

Expected Result from IBMServer is :

 {"results": [
      {"alternatives": [
            {  "confidence": xxxx, 
               "transcript": "call Rohan Chauhan "
            }],"final": true
      }], "result_index": 0
}

But I am not getting the desired result: rather the error says "Broken pipe"

DataSize is: 50000 | mIsLast is : 0
DataSize is: 50000 | mIsLast is : 0
what : Broken pipe
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 50000 | mIsLast is : 0
what : Operation canceled
DataSize is: 31296 | mIsLast is : 0
what : Operation canceled

Here is my code which is an adaptation of the sample example given in beast library.

Foo.hpp

class IbmWebsocketSession: public std::enable_shared_from_this<IbmWebsocketSession> {
protected:
    char binarydata[50000];
    std::string TextStart;
    std::string TextStop;

public:
    explicit IbmWebsocketSession(net::io_context& ioc, ssl::context& ctx, SttService* ibmWatsonobj) :
        mResolver(ioc), mWebSocket(ioc, ctx) {
    TextStart ="{\"action\":\"start\",\"content-type\": \"audio/l16;rate=44100\"}";
    TextStop = "{\"action\":\"stop\"}";


   /**********************************************************************
    * Desc  : Send start frame
   **********************************************************************/
    void send_start(beast::error_code ec);
   /**********************************************************************
    * Desc  : Send Binary data
   **********************************************************************/
    void send_binary(beast::error_code ec);
   /**********************************************************************
    * Desc  : Send Stop frame
   **********************************************************************/
    void send_stop(beast::error_code ec);
   /**********************************************************************
    * Desc  : Read the file for binary data to be sent
   **********************************************************************/
    void readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF);
}

Foo.cpp

void IbmWebsocketSession::on_ssl_handshake(beast::error_code ec) {
    if(ec)
        return fail(ec, "connect");
// Perform the websocket handshake
    ws_.async_handshake_ex(host, "/speech-to-text/api/v1/recognize", [Token](request_type& reqHead) {reqHead.insert(http::field::authorization,Token);},bind(&IbmWebsocketSession::send_start, shared_from_this(),placeholders::_1));
}

void IbmWebsocketSession::send_start(beast::error_code ec){
    if(ec)
        return fail(ec, "ssl_handshake");

    ws_.async_write(net::buffer(TextStart),
        bind(&IbmWebsocketSession::send_binary, shared_from_this(),placeholders::_1));
}

void IbmWebsocketSession::send_binary(beast::error_code ec) {
    if(ec)
        return fail(ec, "send_start");
    readFile(binarydata, &Datasize, &StartPos, &IsLast);

    ws_.binary(true);
    if (!IsLast) {
        ws_.async_write(net::buffer(binarydata, Datasize),
            bind(&IbmWebsocketSession::send_binary, shared_from_this(),
                    placeholders::_1));

    } else {
        IbmWebsocketSession::on_binarysent(ec);
    }
}

void IbmWebsocketSession::on_binarysent(beast::error_code ec) {
    if(ec)
        return fail(ec, "send_binary");

    ws_.binary(false);
    ws_.async_write(net::buffer(TextStop),
           bind(&IbmWebsocketSession::read_response, shared_from_this(), placeholders::_1));
}

void IbmWebsocketSession::readFile(char *bdata, unsigned int *Len, unsigned int *start_pos,bool *ReachedEOF) {

    unsigned int end = 0;
    unsigned int start = 0;
    unsigned int length = 0;

    // Creation of ifstream class object to read the file
    ifstream infile(filepath, ifstream::binary);

    if (infile) {
        // Get the size of the file
        infile.seekg(0, ios::end);
        end = infile.tellg();

        infile.seekg(*start_pos, ios::beg);
        start = infile.tellg();

        length = end - start;
    }

    if ((size_t) length < 150) {
        *Len = (size_t) length;
        *ReachedEOF = true;
    // cout << "Reached end of File (last 150 bytes)" << endl;

    } else if ((size_t) length <= 50000) {  //Maximumbytes to send are 50000
        *Len = (size_t) length;
        *start_pos += (size_t) length;
        *ReachedEOF = false;
        infile.read(bdata, length);

    } else {
        *Len = 50000;
        *start_pos += 50000;
        *ReachedEOF = false;
        infile.read(bdata, 50000);
    }

    infile.close();
}

Any suggestions here?


Solution

  • From boost's documentation we have the following excerpt on websocket::async_write

    This function is used to asynchronously write a complete message. This call always returns immediately. The asynchronous operation will continue until one of the following conditions is true:

    1. The complete message is written.

    2. An error occurs.

    So when you create your buffer object to pass to it net::buffer(TextStart) for example the lifetime of the buffer passed to it is only until the function returns. It could be that even after the function returns you the async write is still operating on the buffer as per the documentation but the contents are no longer valid since the buffer was a local variable.

    To remedy this you could, make your TextStart static or declare that as a member of your class and copy it to boost::asio::buffer there are plenty of examples on how to do that. Note I only mention TextStart in the IbmWebsocketSession::send_start function. The problem is pretty much the same throughout your code.

    From IBM Watson's API definition, the Initiate a connection requires a certain format which can then be represented as a string. You have the string but missing the proper format due to which the connection is being closed by the peer and you are writing to a closed socket, thus a broken pipe.

    The initiate connection requires :

      var message = {
        action: 'start',
        content-type: 'audio/l16;rate=22050'
      };
    

    Which can be represented as string TextStart = "action: 'start',\r\ncontent-type: 'audio\/l16;rate=44100'" according to your requirements.

    Following on from the discussion in the chat, the OP resolved the issue by adding the code:

    if (!IsLast ) {
        ws_.async_write(net::buffer(binarydata, Datasize),
        bind(&IbmWebsocketSession::send_binary, shared_from_this(),
        placeholders::_1));
    } 
    else {
         if (mIbmWatsonobj->IsGstFileWriteDone()) { //checks for the file write completion
             IbmWebsocketSession::on_binarysent(ec);
         } else {
             std::this_thread::sleep_for(std::chrono::seconds(1));
             IbmWebsocketSession::send_binary(ec);
         }
    }
    

    Which from discussion stems from the fact that more bytes were being sent to the client before a file write was completed on the same set of bytes. The OP now verifies this before attempting to send more bytes.