Search code examples
apache-sparkapache-kafkaspark-streamingspark-structured-streamingspark-streaming-kafka

Consuming from kafka using kafka methods and spark streaming gives different result


I am trying to consume some data from Kafka using spark streaming.

I have created 2 jobs,

  1. A simple kafka job that uses:
consumeFirstStringMessageFrom(topic)

that gives topic expected values.

{
  "data": {
    "type": "SA_LIST",
    "login": "[email protected]",
    "updateDate": "2020-09-09T14:58:39.775Z",
    "content": [
      {
        "sku": "800633955",
        "status": "ACTIVE",
        "quantity": 1
      }
    ],
    "saCode": "E40056",
    "clientId": "30179801688090",
    "$setOnInsert": {
      "__v": 0
    }
  },
  "operation": "UPDATE",
  "type": "List"
}
  1. A spark streaming job:
val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaConfig.broker)
      .option("subscribe", kafkaConfig.topic)
      .option("startingOffsets", kafkaConfig.startingOffsets)
      .load()

 df.writeStream
      .outputMode(OutputMode.Append())
      .format("console")
      .option("truncate", false)
      .trigger(Trigger.ProcessingTime("2 seconds"))
      .start().awaitTermination()

It shows the following result

{
  "key": "I4NTY4NV9MSVNUX1dJU0hMSVNUIg==",
  "value": "eyJkYXRhIjp7InR5cGUiOiJXSVNITElTVCIsImxvZ2luIjoiZHJlYW1lcjJAeW9wbWFpbC5jb20iLCJ1cGRhdGVEYXRZSI6Ikxpc3QifQ==",
  "topic": "PLP_GLOBAL_QA",
  "partition": 0,
  "offset": 1826,
  "timestamp": "2020-09-10T16:09:08.606Z",
  "timestampType": 0
}

It seems that it shows topic info (key, value, topic, partition, offset,...) Am I missing something?

I can add more info if needed.


Solution

  • The Spark Streaming jobs shows you the data in a serialized form whereas your Kafka Consumer already de-serialized it.

    According to the Spark Structured Kafka integration guide you are getting not only key and value of the Kafka message, but also other (meta) information. Here is the schema for each message you are getting from Kafka:

    Column      Type
    key         binary
    value       binary
    topic       string
    partition   int
    offset      long
    timestamp   timestamp
    timestampType   int
    

    If you only want to select the key and value or even only the value you can select them and cast them to a human readable string:

    [...]
      .load()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]