Search code examples
csocketstcpmultiplexing

TCP Socket Multiplexing Send Large Data


Got some trouble with TCP socket multiplexing.

      //socket is non-blocking
      const int MAX = 4096;
      char *buff[MAX];
      char *p = buff;
      int fd, rvalue;
      rvalue = 0;

      if ( (fd = open(path, O_RDONLY)) < 0 ) {
          return errno;
      } else {
        int didsend, didread;
        int shouldsend;
        while ((didread = read(fd, buff, MAX)) > 0) {
          p = buff;
          shouldsend = didread;
          while ( 1 ) {
            didsend = send(sockfd, p, shouldsend, 0);
            //if send succeeds and returns the number of bytes fewer than asked for then try to send rest part in next time.
            if (didsend < shouldsend) {
              p += didsent;
              shouldsend -= didsend;
              continue;
            }
            //if there is no place for new data to send, then wait a brief time and try again.
            if ( didsend < 0 && (errno == EWOULDBLOCK || errno == EAGAIN) ) {
              usleep(1000);
              continue;
            }
            //if all data has been sent then sending loop is over.
            if (didsend == shouldsend) {
              break;
            }
            //send error
            if ( didsend < 0 ) {
              rvalue = errno;
              break;
            }
          }
        }
        close(fd);
        if (didread == -1) {
          return errno;
        }
        return rvalue;
      }

Assume I use an I/O Multiplexing function poll() or kqueue(), and non-blocking socket, then if there are only some small data like send a short message, it works fine.

But if it comes to large data, I mean larger than send()'s buffer size, since using non-blocking socket, send() will just send a portion of data, and return how much data it sends, the rest part of data can only be sent in another call of send(), but it takes time, and can't tell how long it will takes. So the second while() is actually a blocking send which using non-blocking socket.

Equivalent to:

  //socket is blocking
  const int MAX = 4096;
  char *buff[MAX];
  int fd, n;
  if ( (fd = open(path, O_RDONLY)) < 0 ) {
      return errno;
  } else {
    while ((n = read(fd, buff, MAX)) > 0) {
      if (send(sockfd, buff, n, 0) < 0) {
        return errno;
      }
    }
    close(fd);
    return 0;
  }

So, what is the solution to this, multithreading might work but that's kind of wasting resource maybe.


Solution

  • This is the general pattern for a single-threaded server that works with multiple connections and non-blocking sockets.

    It's primarily pseudo-code in C and doesn't do the necessary error checking. But it gives you an idea that for each accepted connection, you keep a struct instance that maintains the socket handle, request parsing state, response stream, and any other "state" members of that connection. Then you just loop using "select" to wait or having multiple threads doing this same thing.

    Again this is only pseudo-code and uses select/poll as an example. You can get even more scalability with epoll.

    while (1)
    {
        fd_set readset = {};
        fd_set writeset = {};
    
        for (int i = 0; i < number_of_client_connections; i++)
        {
            if (client_connections[i].reading_request)
                FD_SET(client_connection.sock, &readset);
            else
                FD_SET(client_connection.sock, &writeset);
        }
    
        // add the listen socket to the read set
        FD_SET(listen_socket, &readset);
    
        select(n + 1, &readset, &writeset, &timeout); // wait for a socket to be ready (not shown - check for errors and return value)
    
        if (FD_ISSET(listen_socket, &readset))
        {
            int new_client_socket = accept(listen_socket, &addr, &addrlength);
    
            // create a struct that keeps track of the connection state data
            struct ConnectionData client_connection = {};
            client_connection.sock = new_client_socket;
            client_connection.reading_request = 1;  // awaiting for all the request bytes to come in
            client_connections[number_of_client_connections++] = client_connection;  // pseudo code, add the client_connection to the list
        }
    
    
        for (int i = 0; i < number_of_client_connections; i++)
        {
            if (client_connections[i].reading_request)
            {
                if (FD_ISSET(client_connections[i], &readset))
                {
                    char buffer[2000];
                    int len = recv(client_connections[i].sock, buffer, 2000, 0);
                    // not shown - handle error case when (recv < 0)
                    // not shown - handle case when (recv == 0)
                    ProcessIncomingData(client_connections[i], buffer, len);  // do all the request parsing here.  Flip the client_connections[i].reading_request to 0 if ready to respond
                }
            }
            else if (client_connections[i].reading_request == 0)
            {
                if (FD_ISSET(client_connections[i], &writeset))
                {
                    client_connection* conn = &client_connections[i];
                    int len = send(conn->sock, conn->response_buffer + conn->txCount, conn->response_size - conn->txCount, 0);
                    conn->txCount += len;
    
                    if (conn->txCount == conn->response_size)
                    {
                        // done sending response - we can close this connection or change it to back to the reading state
                    }
                }
            }
        }