Search code examples
c++protocol-buffersgrpc

How to pass big amount of data(unknown size, minimum 10GB) using gRPC


From a source I am getting stream data which size will not be known before the final processing, but the minimum is 10 GB. I have to send this large amount of data using gRPC.

Need to mention here, this large amount data will be passed through the gRPC while the processing of the streaming is done. In this step, I have thought to store all the value in a vector.

Regarding sending large amount of data I have tried to get idea and found:
  • This where it is mentioned not to pass large data using gRPC. Here, mentioned to use any other message protocol where I have limitation to use something else rather than gRPC(at least till today).
  • From this post I have tried to know how chunk message can be sent but I am not sure is it related to my problem or not.
  • First post where I have found a blog to stream data using go language.
  • This one the presentation using python language of this post. But it is also incomplete.
  • gRPC example could be a good start bt cannot decode due to lack of C++ knowledge

From there, a huge Update I have done in the question. But the main theme of the question is not changed

What I have done so far and some points about my project. The github repo is available here.
  • A Unary rpc is present in the project

  • I know that my new Bi directional rpc will take some time. I want that the Unary rpc will not wait for the completion of the Bi directional rpc. Right now I am thinking in a synchronous way where Unary rpc is waiting to pass it's status for the streaming one completion.

  • I am avoiding the unnecessary lines in C++ code. But giving whole proto files

  • big_data.proto

syntax = "proto3";

package demo_grpc;

message Large_Data {
    repeated int32 large_data_collection = 1 [packed=true];
    int32 data_chunk_number = 2;
}
  • addressbook.proto
syntax = "proto3";

package demo_grpc;

import "myproto/big_data.proto";

message S_Response {
    string name     = 1;
    string street   = 2;
    string zip      = 3;
    string city     = 4;
    string country  = 5;

    int32 double_init_val = 6;
}

message C_Request {
    uint32 choose_area = 1;
    string name = 2;
    int32 init_val = 3;
}

service AddressBook {
    rpc GetAddress(C_Request) returns (S_Response) {}

    rpc Stream_Chunk_Service(stream Large_Data) returns (stream Large_Data) {}
}

  • client.cpp
#include <big_data.pb.h>
#include <addressbook.grpc.pb.h>

#include <grpcpp/grpcpp.h>
#include <grpcpp/create_channel.h>

#include <iostream>
#include <numeric>
using namespace std;

// This function prompts the user to set value for the required area
void Client_Request(demo_grpc::C_Request &request_)
{
// do processing for unary rpc. Intentionally avoided here
}

// According to Client Request this function display the value of protobuf message
void Server_Response(demo_grpc::C_Request &request_, const demo_grpc::S_Response &response_)
{
// do processing for unary rpc. Intentionally avoided here
}

// following function make large vector and then chunk to send via stream from client to server
void Stream_Data_Chunk_Request(demo_grpc::Large_Data &request_,
                               demo_grpc::Large_Data &response_,
                               uint64_t preferred_chunk_size_in_kibyte)
{
    // A dummy vector which in real case will be the large data set's container
    std::vector<int32_t> large_vector;

    // irerate it now for 1024*10 times
    for(int64_t i = 0; i < 1024 * 10; i++)
    {
        large_vector.push_back(1);
    }

    uint64_t preferred_chunk_size_in_kibyte_holds_integer_num = 0; // 1 chunk how many intger will contain that num will come here

    // total chunk number will be updated here
    uint32_t total_chunk = total_chunk_counter(large_vector.size(), preferred_chunk_size_in_kibyte, preferred_chunk_size_in_kibyte_holds_integer_num);

    // A temp counter to trace the index of the large_vector
    int32_t temp_count = 0;

    // loop will start if the total num of chunk is greater than 0. After each iteration total_chunk will be decremented
    while(total_chunk > 0)
    {
        for (int64_t i = temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i < preferred_chunk_size_in_kibyte_holds_integer_num + temp_count * preferred_chunk_size_in_kibyte_holds_integer_num; i++)
        {
            // the repeated field large_data_collection is taking value from the large_vector
            request_.add_large_data_collection(large_vector[i]);
        }
        temp_count++;
        total_chunk--;

        std::string ip_address = "localhost:50051";
        auto channel = grpc::CreateChannel(ip_address, grpc::InsecureChannelCredentials());
        std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel);

        grpc::ClientContext context;
        std::shared_ptr<::grpc::ClientReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data> > stream(stub->Stream_Chunk_Service(&context));

        // While the size of each chunk is eached then this repeated field is cleared. I am not sure before this
        // value can be transfered to server or not. But my assumption is saying that it should be done
        request_.clear_large_data_collection();
    }

}

int main(int argc, char* argv[])
{
    std::string client_address = "localhost:50051";
    std::cout << "Address of client: " << client_address << std::endl;

    // The following part for the Unary RPC
    demo_grpc::C_Request query;
    demo_grpc::S_Response result;
    Client_Request(query);

    // This part for the streaming chunk data (Bi directional Stream RPC)
    demo_grpc::Large_Data stream_chunk_request_;
    demo_grpc::Large_Data stream_chunk_response_;
    uint64_t preferred_chunk_size_in_kibyte = 64;
    Stream_Data_Chunk_Request(stream_chunk_request_, stream_chunk_response_, preferred_chunk_size_in_kibyte);

    // Call
    auto channel = grpc::CreateChannel(client_address, grpc::InsecureChannelCredentials());
    std::unique_ptr<demo_grpc::AddressBook::Stub> stub = demo_grpc::AddressBook::NewStub(channel);
    grpc::ClientContext context;
    grpc::Status status = stub->GetAddress(&context, query, &result);

    // the following status is for unary rpc as far I have understood the structure
    if (status.ok())
    {
        Server_Response(query, result);
    }
    else
    {
        std::cout << status.error_message() << std::endl;
    }

    return 0;
}
  • heper function total_chunk_counter
#include <cmath>
uint32_t total_chunk_counter(uint64_t num_of_container_content,
                             uint64_t preferred_chunk_size_in_kibyte,
                             uint64_t &preferred_chunk_size_in_kibyte_holds_integer_num)
{
    uint64_t cotainer_size_in_kibyte = (32ULL * num_of_container_content) / 1024;
    preferred_chunk_size_in_kibyte_holds_integer_num = (num_of_container_content * preferred_chunk_size_in_kibyte) / cotainer_size_in_kibyte;

    float total_chunk = static_cast<float>(num_of_container_content) / preferred_chunk_size_in_kibyte_holds_integer_num;
    return std::ceil(total_chunk);
}

  • server.cpp which is totally incomplete
#include <myproto/big_data.pb.h>
#include <myproto/addressbook.grpc.pb.h>

#include <grpcpp/grpcpp.h>
#include <grpcpp/server_builder.h>

#include <iostream>

class AddressBookService final : public demo_grpc::AddressBook::Service {
    public:
        virtual ::grpc::Status GetAddress(::grpc::ServerContext* context, const ::demo_grpc::C_Request* request, ::demo_grpc::S_Response* response)
        {
            switch (request->choose_area())
            {
             // do processing for unary rpc. Intentionally avoided here
            std::cout << "Information of " << request->choose_area() << " is sent to Client" << std::endl;
            return grpc::Status::OK;
            }


    // Bi-directional streaming chunk data
    virtual ::grpc::Status Stream_Chunk_Service(::grpc::ServerContext* context, ::grpc::ServerReaderWriter< ::demo_grpc::Large_Data, ::demo_grpc::Large_Data>* stream)
    {
        // stream->Large_Data;
        return grpc::Status::OK;
    }
};

void RunServer()
{
    std::cout << "grpc Version: " << grpc::Version() << std::endl;
    std::string server_address = "localhost:50051";
    std::cout << "Address of server: " << server_address << std::endl;

    grpc::ServerBuilder builder;
    builder.AddListeningPort(server_address, grpc::InsecureServerCredentials());

    AddressBookService my_service;
    builder.RegisterService(&my_service);

    std::unique_ptr<grpc::Server> server(builder.BuildAndStart());
    server->Wait();
}

int main(int argc, char* argv[])
{
    RunServer();
    return 0;
}

In summary my desire

  • I need to pass the content of large_vector with the repeated field large_data_collection of message Large_Data. I should chunk the size of the large_vector and populate the repeated field large_data_collection with that chunk size
  • In server side all chunk will be concatenate by keeping the exact order of the large_vector. Some processing will be done on them (eg: double the value of each index). Then again whole data will be sent to the client as a chunk stream
  • Would be great if the present unary rpc don't wait for the completion of the bi-directional rpc

Solution with example would be really helpful. Advance thanks. The github repo is available here.


Solution

  • Due to lots of personal issue, I am very late to post the answer of my problem. By the way, I have achieved my solution in the following way. A little bit summary of my solution:

    • I have tried to pass a big amount of data, for this I have chosen a big size of vector whose size will be varied
    • I have calculated the number of sample in vector, chose a size for the chunk as well as calculated how many integers will be passed by each chunk
    • Finally, passed the data one by one with respect to number of chunk
    • One limitation I have chosen that size of chunk should be 64 or it's multiple.
    • A complete solution is available in my git repo. I have linked here only the commit of the merge request

    Proto message

    // Proto file to send large amount of data using stream
    
    syntax = "proto3";
    
    package demo_grpc;
    
    message Streaming
    {
        repeated int32 data_collection = 1 [packed=true];
        int32 index = 2;
    }
    
    // Follwoing messages will be used for Bi-directional streaming (chunk data)
    
    // Client side message
    message Large_Data_Request {
        repeated int32 chunk_data_client_request = 1 [packed=true]; // Data container where chunk data will be stored to pass as stream from client to server
        uint32 client_data_stream_size = 2; // This will be the size of the whole data stream from client to server (Eg: Size of a vector. vec.size())
        uint32 chunk_data_length = 3; // Number of data samples in each chunk
        uint32 required_chunk = 4; // Number of chunk to complete the Streaming
        string name = 5; // Name name of the data stream
    }
    
    // Server side message
    message Large_Data_Response {
        repeated int32 chunk_data_server_response = 1 [packed=true]; // Data container to be used to do stream operation from server to client
        int32 server_data_stream_size = 2; // This will be the size of the whole data stream from server to client (Eg: Size of a vector. vec.size())
    }
    

    Client side request to send stream data

    
    #include "client_zone.h"
    
    // Calculates the number of chunk
    /**
     * \param [in] data_size Total size of given/provided data which will be passed as chunk. Eg: myvec.size()
     * \param [in] chunk_size Chunk size in KiloByte. This chunk size will be given by the user. Right now less than 64 KB is not allowed
     * \param [in] sample Data sample in each chunk
     */
    void grpc_client::get_chunk_number(uint64_t data_size,
                                       uint64_t chunk_size,
                                       uint64_t &sample,
                                       uint32_t &total_chunk)
    {
        // Each integer size is 4 Byte. That's why N integer size will be N * 4 bit
        // To convert this into KB we have to divide the prior result with 1024. Eg: (vector.size() * 4) / 1024
        uint64_t data_size_kb = (4ULL * data_size) / 1024;
    
        float total_chunk_intermediate = static_cast<float>(data_size) / sample;
        total_chunk = std::ceil(total_chunk_intermediate);
    }
    
    // After producing large data set it passes them & get response from server as chunk
    void grpc_client::data_chunk_stream_request()
    {
        demo_grpc::Large_Data_Request request_;
        demo_grpc::Large_Data_Response response_;
    
        // This chunk size will be provided by the user. It will not be greater than 4 MB and will be provided in KB unit.
        // Eg: If you want to provide a chunk size of 3 MB you have to provide 3072 (3MB = 3 * 1024 KB)
        // Also, this value will be a multiple of 64. That means at least a chunk size should be provided which can carry
        // 64 KB data
        uint64_t chunk_size;
        std::cout << std:: endl << "Provide chunk size (KB). Not more than 4 MB. Will be multiple of 64: ";
        std::cin >> chunk_size;
    
        if (static_cast<float>(chunk_size) / 1024 > 4)
        {
            throw std::runtime_error("Chunk size is > 4 MB. Message field of protobuf is limited to 4 MB");
        }
        if (chunk_size % 64 != 0)
        {
            throw std::runtime_error("Please provide chunk size in multiple of 64 otherwise Number of chunk will be fractional");
        }
    
        // This vector is the container to hold dummy data created in client side
        std::vector<int32_t> dummy_data_set;
    
        // This is the size of the vector which will be chose by customer
        uint32_t vector_size;
    
        std::cout << "Dummy data will be create in client side. Please provide a size of vector: ";
        std::cin >> vector_size;
        for(int64_t i = 0; i < vector_size; i++)
        {
            dummy_data_set.push_back(2);
        }
    
        // Number of data in each chunk. Each integer is 4 byte. If chunk size is 64 KB that means "sample" will be number of
        // Integers resides in this each chunk size.
        // Convert this 64 KB to byte = 64 * 1024 Byte.
        // 4 byte is taken by 1 integer, so, 64 * 1024 bytes is taken by (64*1024)/4 integer
        uint64_t sample = (chunk_size * 1024)/4;
    
        // Number of chunk to transfer whole data set (here, dummy_data_set)
        uint32_t total_chunk = 0;
    
        get_chunk_number(dummy_data_set.size(), chunk_size, sample, total_chunk);
    
        // Proto Buffer message is preparing to pass to Server
        request_.set_chunk_data_length(sample);
        request_.set_client_data_stream_size(dummy_data_set.size());
        request_.set_required_chunk(total_chunk);
        request_.set_name(detected_area_name);
    
        grpc::ClientContext context;
        std::shared_ptr<::grpc::ClientReaderWriter< ::demo_grpc::Large_Data_Request, ::demo_grpc::Large_Data_Response> > stream(stub->Stream_Chunk_Service(&context));
    
        int32_t temp_count = 0;
        while(total_chunk > 0)
        {
            for (int64_t i = temp_count * sample; i < sample + temp_count * sample; i++)
            {
                // This condition checks the iteration number with the vector data size of the client
                // if client data size has reached then writing will be stopped
                if (i < dummy_data_set.size())
                {
                    request_.add_chunk_data_client_request(dummy_data_set[i]);
                }
                else
                {
                    break;
                }
            }
            temp_count++;
            total_chunk--;
    
            stream->Write(request_);
    
            // message field will be cleared after passing one chunk. It provides a fresh repeated field in the next iteration to pass a new chunk
            request_.clear_chunk_data_client_request();
        }
    
        stream->WritesDone();
    
        // Reading chunk response from server
        std::vector<int32_t> dummy_final_data_set;
    
        // This variable checks data size of client during read back from server
        uint32_t client_track_data_size_before_reading = 0;
    
        while (stream->Read(&response_))
        {
            for(int64_t i = 0; i < request_.chunk_data_length(); i++)
            {
                if (client_track_data_size_before_reading < dummy_data_set.size())
                {
                    dummy_final_data_set.push_back(response_.chunk_data_server_response(i));
                    client_track_data_size_before_reading++;
                }
                else
                {
                    break;
                }
            }
        }
    
        grpc::Status status = stream->Finish();
    
        if (status.ok())
        {
            if (dummy_final_data_set.size() == response_.server_data_stream_size() | dummy_final_data_set.size() < response_.server_data_stream_size())
            {
                std::cout << "Server successfully has sent all data" << std::endl;
            }
    
        }
        else
            std::cout << "!!!!! Server is Failed to Stream !!!!!" << std::endl;
    }
    

    Server side response to return steam data after processing

    /*
    This is the service response algorithm from server to pass
    stream data using chunk
    */
    
    #include <numeric>
    
    void Data_Chunk_Stream_Response(::grpc::ServerReaderWriter< ::demo_grpc::Large_Data_Response, ::demo_grpc::Large_Data_Request>* stream)
    {
        // Instance of request and response from client & server respectively
        demo_grpc::Large_Data_Request request;
        demo_grpc::Large_Data_Response response;
    
        // This vector will hold all data which will be transferred from client platform
        std::vector<int32_t> server_dummy_data_set;
    
        // This variable will track what is the size of the data which is passed from client
        uint32_t server_track_client_data_size_before_reading = 0;
    
        // Read operation
        while (stream->Read(&request))
        {
            // Iterate till the chunk data length is reached
            for(int64_t i = 0; i < request.chunk_data_length(); i++)
            {
                // This condition checks the iteration number with the vector data size of the client.
                // if client data size has reached then reading will be stopped
                if(server_track_client_data_size_before_reading < request.client_data_stream_size())
                {
                    server_dummy_data_set.push_back(request.chunk_data_client_request(i));
                    server_track_client_data_size_before_reading++;
                }
                else
                {
                    break;
                }
            }
        }
    
        std::cout << std::endl;
    
        if(server_dummy_data_set.size() == request.client_data_stream_size())
        {
            std::cout << "Prepared data in server is identical to the data size transferred by client" << std::endl;
        }
        else
        {
            throw std::runtime_error("Caution !!! Data sample is mismatched");
        }
    
        // set proto message for server side
        response.set_server_data_stream_size(server_dummy_data_set.size());
    
        int64_t track_index = 0; // Tracks the current index of the server_dummy_data_set
    
        // Server starts streaming the data chunk by chunk to the client. Here, server is doing a dummy operation on clients data.
        // Here, it multiplies by 2 each data of the client.
    
        // This variable will track client side data size during write back to client
        uint32_t server_track_client_data_size_before_writing = 0;
        for (int64_t i = 0; i < request.required_chunk(); i++)
        {
            for(int64_t j = track_index * request.chunk_data_length(); j < request.chunk_data_length() + track_index * request.chunk_data_length(); j++)
            {
                if(server_track_client_data_size_before_writing < request.client_data_stream_size())
                {
                    response.add_chunk_data_server_response(server_dummy_data_set[j] * 2);
                    server_track_client_data_size_before_writing++;
                }
                else
                {
                    break;
                }
            }
            track_index++;
            stream->Write(response);
    
            // message field will be cleared after passing one chunk. It provides a fresh repeated field in the next iteration to pass a new chunk
            response.clear_chunk_data_server_response();
        }
    }
    
    

    Any recommendation to upate the work, idea to add new feature/modify the algorithm/idea/work is highly appreciated.