Search code examples
apache-sparkapache-spark-sqlspark-structured-streaming

How to print Json encoded messages using Spark Structured Streaming


I have a DataSet[Row] where each row is JSON string. I want to just print the JSON stream or count the JSON stream per batch.

Here is my code so far

val ds = sparkSession.readStream()
               .format("kafka")
                .option("kafka.bootstrap.servers",bootstrapServers"))
               .option("subscribe", topicName)
               .option("checkpointLocation", hdfsCheckPointDir)
               .load();

val ds1 = ds.select(from_json(col("value").cast("string"), schema) as 'payload)
val ds2 = ds1.select($"payload.info")
val query = ds2.writeStream.outputMode("append").queryName("table").format("memory").start()
query.awaitTermination()
select * from table; --  don't see anything and there are no errors. However when I run my Kafka consumer separately (independent ofSpark I can see the data)

My question really is what do I need to do just print the data I am receiving from Kafka using Structured Streaming? The messages in Kafka are JSON encoded strings so I am converting JSON encoded strings to some struct and eventually to a dataset. I am using Spark 2.1.0


Solution

  • from pyspark.sql import SparkSession
    from pyspark.sql import DataFrame as SparkDataFrame
    from pyspark.sql.functions import *
    
    
    # {"name":"jimyag","age":12,"ip":"111.11.1.1"}
    # nc -l p 9999
    
    spark:SparkSession = SparkSession.builder.appName("readJSONData").getOrCreate()
    
    lines:SparkDataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
    
    jsons = lines.select(from_json(col("value"), "name STRING, age INT, ip STRING").alias("data")).select("data.*")
    
    jsons.writeStream.format("console").start().awaitTermination()
    
    # print
    # +------+---+----------+
    # |  name|age|        ip|
    # +------+---+----------+
    # |jimyag| 12|111.11.1.1|
    # +------+---+----------+