Search code examples
apache-kafkaspark-structured-streamingspark-kafka-integration

Producer is publishing into Kafka but cannot read from Spark structured streaming


I am using Kafka to publish Tweets and it runs correctly as I am able to see the echo with the following command

bin/kafka-console-consumer.sh --bootstrap-server xxx.xxx.xx.xxx:9092 --topic trump --from-beginning

But when I try to consume with Structured streaming using the following code

if __name__ == "__main__":

    spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()

    source_df = spark \
        .readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "xxx.xxx.xx.xxx:9092") \
        .option("subscribe", "tweets") \
        .option("startingOffsets", "latest") \
        .load()

    query = source_df \
        .writeStream \
        .outputMode("append") \
        .format("console") \
        .start()

Then I got an output but it does not show the Tweet under the value column. Instead I have a weird chain of alphanumeric as below. I checked without truncate the column value and I got the same but longer pattern.

+----+--------------------+------+---------+------+--------------------+-------------+
| key|               value| topic|partition|offset|           timestamp|timestampType|
+----+--------------------+------+---------+------+--------------------+-------------+
|null|[52 54 20 40 70 7...|tweets|        0| 45724|2021-03-17 12:57:...|            0|
|null|[23 52 57 52 49 2...|tweets|        0| 45725|2021-03-17 12:57:...|            0|
|null|[52 54 20 40 54 7...|tweets|        0| 45726|2021-03-17 12:57:...|            0|
|null|[52 54 20 40 44 6...|tweets|        0| 45727|2021-03-17 12:57:...|            0|
|null|[40 42 42 43 50 6...|tweets|        0| 45728|2021-03-17 12:57:...|            0|
|null|[40 4C 6F 72 64 5...|tweets|        0| 45729|2021-03-17 12:57:...|            0|
|null|[41 6E 6E 6F 75 6...|tweets|        0| 45730|2021-03-17 12:57:...|            0|
|null|[42 69 74 63 6F 6...|tweets|        0| 45731|2021-03-17 12:57:...|            0|
|null|[40 65 72 69 6B 7...|tweets|        0| 45732|2021-03-17 12:57:...|            0|
|null|[74 68 65 20 6D 6...|tweets|        0| 45733|2021-03-17 12:57:...|            0|
|null|[52 54 20 40 6D 6...|tweets|        0| 45734|2021-03-17 12:57:...|            0|
|null|[52 54 20 40 6D 6...|tweets|        0| 45735|2021-03-17 12:57:...|            0|
|null|[40 42 54 43 54 4...|tweets|        0| 45736|2021-03-17 12:57:...|            0|
|null|[52 54 20 40 49 6...|tweets|        0| 45737|2021-03-17 12:57:...|            0|
|null|[52 54 20 40 63 6...|tweets|        0| 45738|2021-03-17 12:57:...|            0|
|null|[42 75 20 6F 6C 6...|tweets|        0| 45739|2021-03-17 12:57:...|            0|
|null|[40 5F 43 72 79 7...|tweets|        0| 45740|2021-03-17 12:57:...|            0|
|null|[40 57 69 6E 66 6...|tweets|        0| 45741|2021-03-17 12:57:...|            0|
|null|[4D 79 20 72 65 6...|tweets|        0| 45742|2021-03-17 12:57:...|            0|
|null|[52 54 20 40 6F 6...|tweets|        0| 45743|2021-03-17 12:57:...|            0|
+----+--------------------+------+---------+------+--------------------+-------------+
only showing top 20 rows

Any help to learn the situation would be appreciated.


Solution

  • By default, the data (columns key and value) stored in Kafka is serialized as a String.

    Looking at the Structured Streaming + Kafka Integration Guide you will see that the type of two columns key and value is binary:

    enter image description here

    Also, in the Guide it is mentioned how to deal with this. You need to cast the columns into String Type as shown below:

    source_df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
            .writeStream \
            .outputMode("append") \
            .format("console") \
            .start()