I am looking to store the protobuf messages in Hbase/HDFS using spark streaming. And I have below two questions
The sample structure of Protobuf message is shown below
> +--MsgNode-1
> +--Attribute1 - String
> +--Attribute2 - Int
> +--MsgNode-2
> +--Attribute1 - String
> +--Attribute2 - Double
> +--MsgNode-3 - List of MsgNode-3's
> +--Attribute1 - Int
I am planning to use Spark streaming to collect the protobuf messages as bytes and store them in Hbase/HDFS.
Question 1 :
What is the efficient way of storing huge number of protobuf messages and the efficient way of retrieving them to do some analytics? For example, should they be stored as Strings/byte[] in Hbase or Should they be stored in parquet files in HDFS etc.
I would recommend - store Proto-buf as Parquet AVRO files(splitting in to meaningful message with AVRO schema).
This can be achieved using dataframes api spark 1.5 and above (PartiotionBy
with SaveMode.Append
)
see this a-powerful-big-data-trio
If you store as string or byte array you cant do data analytics directly (query on raw data ) is not possible.
If you are using cloudera, impala(which supports parquet-avro) can be used to query rawdata.
Question 2:
How should the hierarchical structure of a protobuf messages be stored? I mean, should the nested elements be flattened out before storage, or is there any mechanism to store them as is? If the nested elements are collections or maps should they be exploded and stored as multiple rows?
If you store data in a raw format from spark streaming, How will you query if business wants to query and know what kind of data they received(this requirement is very common).
In the first place, You have to understand your data (i.e. relation between different messages with in protobuf so that single row or multiple rows you can decide) then develop protobuf parser to parse message structure of protobuf. based on your data, convert it to avro generic record to save as parquet file.
protobuf parsers can be developed in different ways based on your requirements. one of the generic way is like below example.
public SortedMap<String, Object> convertProtoMessageToMap(GeneratedMessage src) {
final SortedMap<String, Object> finalMap = new TreeMap<String, Object>();
final Map<FieldDescriptor, Object> fields = src.getAllFields();
for (final Map.Entry<FieldDescriptor, Object> fieldPair : fields.entrySet()) {
final FieldDescriptor desc = fieldPair.getKey();
if (desc.isRepeated()) {
final List<?> fieldList = (List<?>) fieldPair.getValue();
if (fieldList.size() != 0) {
final List<String> arrayListOfElements = new ArrayList<String>();
for (final Object o : fieldList) {
arrayListOfElements.add(o.toString());
}
finalMap.put(desc.getName(), arrayListOfElements);
}
} else {
finalMap.put(desc.getName(), fieldPair.getValue().toString());
}
}
return finalMap;
}