Search code examples
apache-sparkhbasehdfsprotocol-buffersspark-streaming

Saving protobuf in Hbase/HDFS using Spark streaming


I am looking to store the protobuf messages in Hbase/HDFS using spark streaming. And I have below two questions

  1. What is the efficient way of storing huge number of protobuf messages and the efficient way of retrieving them to do some analytics? For example, should they be stored as Strings/byte[] in Hbase or Should they be stored in parquet files in HDFS etc.
  2. How should the hierarchical structure of a protobuf messages be stored? I mean, should the nested elements be flattened out before storage, or is there any mechanism to store them as is? If the nested elements are collections or maps should they be exploded and stored as multiple rows?

The sample structure of Protobuf message is shown below

>     +--MsgNode-1
>       +--Attribute1 - String
>       +--Attribute2 - Int
>       +--MsgNode-2
>         +--Attribute1 - String
>         +--Attribute2 - Double
>         +--MsgNode-3 - List of MsgNode-3's
>           +--Attribute1 - Int

I am planning to use Spark streaming to collect the protobuf messages as bytes and store them in Hbase/HDFS.


Solution

  • Question 1 :

    What is the efficient way of storing huge number of protobuf messages and the efficient way of retrieving them to do some analytics? For example, should they be stored as Strings/byte[] in Hbase or Should they be stored in parquet files in HDFS etc.

    I would recommend - store Proto-buf as Parquet AVRO files(splitting in to meaningful message with AVRO schema).

    This can be achieved using dataframes api spark 1.5 and above (PartiotionBy with SaveMode.Append)

    see this a-powerful-big-data-trio

    If you store as string or byte array you cant do data analytics directly (query on raw data ) is not possible.

    If you are using cloudera, impala(which supports parquet-avro) can be used to query rawdata.

    Question 2:

    How should the hierarchical structure of a protobuf messages be stored? I mean, should the nested elements be flattened out before storage, or is there any mechanism to store them as is? If the nested elements are collections or maps should they be exploded and stored as multiple rows?

    If you store data in a raw format from spark streaming, How will you query if business wants to query and know what kind of data they received(this requirement is very common).

    In the first place, You have to understand your data (i.e. relation between different messages with in protobuf so that single row or multiple rows you can decide) then develop protobuf parser to parse message structure of protobuf. based on your data, convert it to avro generic record to save as parquet file.

    TIP :

    protobuf parsers can be developed in different ways based on your requirements. one of the generic way is like below example.

    public SortedMap<String, Object> convertProtoMessageToMap(GeneratedMessage src) {
    
        final SortedMap<String, Object> finalMap = new TreeMap<String, Object>();
        final Map<FieldDescriptor, Object> fields = src.getAllFields();
    
        for (final Map.Entry<FieldDescriptor, Object> fieldPair : fields.entrySet()) {
    
            final FieldDescriptor desc = fieldPair.getKey();
    
            if (desc.isRepeated()) {
                final List<?> fieldList = (List<?>) fieldPair.getValue();
                if (fieldList.size() != 0) {
                    final List<String> arrayListOfElements = new ArrayList<String>();
                    for (final Object o : fieldList) {
                        arrayListOfElements.add(o.toString());
                    }
                    finalMap.put(desc.getName(), arrayListOfElements);
                }
            } else {
    
                finalMap.put(desc.getName(), fieldPair.getValue().toString());
    
            }
    
        }
        return finalMap;
    }