Search code examples
scalaapache-sparkprotocol-buffersscalapb

deserialize json object from protobuf return empty


I am parsing the bahir mqtt payload serialized via protobuf using scalapb in spark scala but the parsed json only contains first json object and others are empty.

  • Spark version: 2.3.0
  • Scala version: 2.11.8
  • Protobuf version: 2
  • sparksql-scalapb version: 0.8.0
import spark.implicits._
val parsedData = lines.select("payload").as[Array[Byte]].map(ParseData.parseFrom(_))

The proto file

syntax = "proto2";
option java_package = "protobuf";

message ParseData {
    required int64 timestamp = 1;
    message METRICS {
        required string name = 1;
        optional int64 timestamp = 2;
        optional string dataType = 3;
        optional double value = 4;
    }
    repeated METRICS metrics = 2;
    required int32 seq = 3;
}

The result I am getting

+-------------+--------------------+---+
|timestamp    |metrics             |seq|
+-------------+--------------------+---+
|1567158851979|[[T05,,,], [T06,,,]]|54 |
+-------------+--------------------+---+

But the expected result is

+-------------+-----------------------------------------------------------------+---+
|timestamp    |metrics                                                                                                      
+-------------+-----------------------------------------------------------------+---+
|1567158851979|[[T05,1566920552229,Float,34.56], [T06,1566920552229,Float,32.5]]|54 |
+-------------+-----------------------------------------------------------------+---+

Update-1

The payload incoming message before serialization looks likes this:

{
"metrics" : [{
"name" : "T05",
"timestamp" : 1566920552229,
"dataType" : "Float",
"value" : 34.56
},
{
"name" : "T06",
"timestamp" : 1566920552229,
"dataType" : "Float",
"value" : 32.5
}]
}

The MQTT server is using eclipse tahu project which uses protobuf to serialize the payload.

Update 2

Here is the code:

val lines = spark.readStream
      .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
      .option("topic", topic)
      .option("username", username)
      .option("password", password)
      .load(brokerUrl)

import spark.implicits._
val parseLines = lines.select("payload").as[Array[Byte]].map(ParseData.parseFrom(_))

lines.printSchema()

val data = parseLines.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()


 data.awaitTermination()

Schema and Sample of streamed data


root
 |-- id: integer (nullable = true)
 |-- topic: string (nullable = true)
 |-- payload: binary (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+---+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|id |topic                     |payload                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                            |timestamp          |
+---+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+
|0  |spBv1.0/XYZ/DDATA/Tahu/ABC|[08 83 E8 99 99 CE 2D 12 53 0A 3F 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 66 61 63 65 53 43 36 30 30 2F 4D 45 53 50 72 6F 63 65 73 73 54 61 67 73 2F 68 6F 6C 65 43 6F 72 72 65 63 74 69 6F 6E 4C 6F 63 61 74 69 6F 6E 31 18 D2 AE 99 99 CE 2D 20 09 38 00 4A 00 65 9A D9 19 43 12 57 0A 43 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 66 61 63 65 53 43 36 30 30 2F 4D 45 53 50 72 6F 63 65 73 73 54 61 67 73 2F 68 6F 6C 65 43 6F 72 72 65 63 74 69 6F 6E 4C 6F 63 61 74 69 6F 6E 44 65 6C 74 61 18 EA AF 99 99 CE 2D 20 09 38 00 4A 00 65 9A D9 19 43 12 49 0A 35 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 45 6C 61 70 73 65 64 18 BA A9 99 99 CE 2D 20 03 38 00 4A 00 50 CD E2 82 11 12 4B 0A 37 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 52 65 6D 61 69 6E 69 6E 67 18 EB A9 99 99 CE 2D 20 03 38 00 4A 00 50 F1 D6 82 11 12 4C 0A 38 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 31 73 74 43 6F 6E 74 72 6F 6C 2F 53 69 67 6E 61 6C 73 2F 46 49 52 53 54 5F 43 4F 4E 54 52 4F 4C 5F 54 45 4D 50 5F 45 4E 47 18 EE D2 F5 9B CE 2D 20 09 38 00 4A 00 65 9D A2 02 42 12 47 0A 33 46 69 6E 69 73 68 69 6E 67 2F 54 72 69 6F 2F 66 61 63 65 53 43 36 30 30 2F 73 69 67 6E 61 6C 73 2F 4D 6F 74 6F 72 5F 43 75 72 72 65 6E 74 5F 41 6D 70 73 18 EE D2 F5 9B CE 2D 20 09 38 00 4A 00 65 22 B7 47 41 18 17]|2019-08-30 17:30:43|
|0  |spBv1.0/XYZ/DDATA/Tahu/ABC|[08 EB EF 99 99 CE 2D 12 49 0A 35 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 45 6C 61 70 73 65 64 18 A3 B1 99 99 CE 2D 20 03 38 00 4A 00 50 CE E2 82 11 12 4B 0A 37 4D 69 78 50 72 65 70 61 72 61 74 69 6F 6E 2F 42 6F 6E 64 4D 69 78 69 6E 67 2F 73 69 67 6E 61 6C 73 4D 45 53 2F 6D 69 78 65 72 54 69 6D 65 52 65 6D 61 69 6E 69 6E 67 18 D3 B1 99 99 CE 2D 20 03 38 00 4A 00 50 F2 D6 82 11 18 18]                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                               |2019-08-30 17:30:44|
+---+--------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+

Thank you for your inputs.


Solution

  • Let's try to isolate the problem. We can start by taking Spark out of the equation.

    If we take the payload you provided and try to parse it:

    val b: Array[Byte] = Array(0x08, 0x83, 0xE8, 0x99, 0x99, 0xCE, 0x2D, 0x12, 0x53, 0x0A, 0x3F, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x66, 0x61, 0x63, 0x65, 0x53, 0x43, 0x36, 0x30, 0x30, 0x2F, 0x4D, 0x45, 0x53, 0x50, 0x72, 0x6F, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x2F, 0x68, 0x6F, 0x6C, 0x65, 0x43, 0x6F, 0x72, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6F, 0x6E, 0x4C, 0x6F, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x31, 0x18, 0xD2, 0xAE, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x9A, 0xD9, 0x19, 0x43, 0x12, 0x57, 0x0A, 0x43, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x66, 0x61, 0x63, 0x65, 0x53, 0x43, 0x36, 0x30, 0x30, 0x2F, 0x4D, 0x45, 0x53, 0x50, 0x72, 0x6F, 0x63, 0x65, 0x73, 0x73, 0x54, 0x61, 0x67, 0x73, 0x2F, 0x68, 0x6F, 0x6C, 0x65, 0x43, 0x6F, 0x72, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6F, 0x6E, 0x4C, 0x6F, 0x63, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x44, 0x65, 0x6C, 0x74, 0x61, 0x18, 0xEA, 0xAF, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x9A, 0xD9, 0x19, 0x43, 0x12, 0x49, 0x0A, 0x35, 0x4D, 0x69, 0x78, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x2F, 0x42, 0x6F, 0x6E, 0x64, 0x4D, 0x69, 0x78, 0x69, 0x6E, 0x67, 0x2F, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x4D, 0x45, 0x53, 0x2F, 0x6D, 0x69, 0x78, 0x65, 0x72, 0x54, 0x69, 0x6D, 0x65, 0x45, 0x6C, 0x61, 0x70, 0x73, 0x65, 0x64, 0x18, 0xBA, 0xA9, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x03, 0x38, 0x00, 0x4A, 0x00, 0x50, 0xCD, 0xE2, 0x82, 0x11, 0x12, 0x4B, 0x0A, 0x37, 0x4D, 0x69, 0x78, 0x50, 0x72, 0x65, 0x70, 0x61, 0x72, 0x61, 0x74, 0x69, 0x6F, 0x6E, 0x2F, 0x42, 0x6F, 0x6E, 0x64, 0x4D, 0x69, 0x78, 0x69, 0x6E, 0x67, 0x2F, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x4D, 0x45, 0x53, 0x2F, 0x6D, 0x69, 0x78, 0x65, 0x72, 0x54, 0x69, 0x6D, 0x65, 0x52, 0x65, 0x6D, 0x61, 0x69, 0x6E, 0x69, 0x6E, 0x67, 0x18, 0xEB, 0xA9, 0x99, 0x99, 0xCE, 0x2D, 0x20, 0x03, 0x38, 0x00, 0x4A, 0x00, 0x50, 0xF1, 0xD6, 0x82, 0x11, 0x12, 0x4C, 0x0A, 0x38, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x31, 0x73, 0x74, 0x43, 0x6F, 0x6E, 0x74, 0x72, 0x6F, 0x6C, 0x2F, 0x53, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x2F, 0x46, 0x49, 0x52, 0x53, 0x54, 0x5F, 0x43, 0x4F, 0x4E, 0x54, 0x52, 0x4F, 0x4C, 0x5F, 0x54, 0x45, 0x4D, 0x50, 0x5F, 0x45, 0x4E, 0x47, 0x18, 0xEE, 0xD2, 0xF5, 0x9B, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x9D, 0xA2, 0x02, 0x42, 0x12, 0x47, 0x0A, 0x33, 0x46, 0x69, 0x6E, 0x69, 0x73, 0x68, 0x69, 0x6E, 0x67, 0x2F, 0x54, 0x72, 0x69, 0x6F, 0x2F, 0x66, 0x61, 0x63, 0x65, 0x53, 0x43, 0x36, 0x30, 0x30, 0x2F, 0x73, 0x69, 0x67, 0x6E, 0x61, 0x6C, 0x73, 0x2F, 0x4D, 0x6F, 0x74, 0x6F, 0x72, 0x5F, 0x43, 0x75, 0x72, 0x72, 0x65, 0x6E, 0x74, 0x5F, 0x41, 0x6D, 0x70, 0x73, 0x18, 0xEE, 0xD2, 0xF5, 0x9B, 0xCE, 0x2D, 0x20, 0x09, 0x38, 0x00, 0x4A, 0x00, 0x65, 0x22, 0xB7, 0x47, 0x41, 0x18, 0x17).map(_.toByte)
    
    println(ParseData.parseFrom(b).toProtoString)
    

    Then the output is:

    timestamp: 1567179043843
    metrics {
      name: "Finishing/Trio/faceSC600/MESProcessTags/holeCorrectionLocation1"
    }
    metrics {
      name: "Finishing/Trio/faceSC600/MESProcessTags/holeCorrectionLocationDelta"
    }
    metrics {
      name: "MixPreparation/BondMixing/signalsMES/mixerTimeElapsed"
    }
    metrics {
      name: "MixPreparation/BondMixing/signalsMES/mixerTimeRemaining"
    }
    metrics {
      name: "Finishing/Trio/1stControl/Signals/FIRST_CONTROL_TEMP_ENG"
    }
    metrics {
      name: "Finishing/Trio/faceSC600/signals/Motor_Current_Amps"
    }
    seq: 23
    

    Which shows that only the name field inside metrics is set, but all the other fields are not set.

    This tells us that the data you are passing to parseFrom does not have the fields that you expect.

    The next step is to investigate why the producer of those messages does not set those fields in the protos before serializing. The problem seems to be on the producer side.