I have a DataSet[Row]
where each row is JSON string. I want to just print the JSON stream or count the JSON stream per batch.
Here is my code so far
val ds = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers",bootstrapServers"))
.option("subscribe", topicName)
.option("checkpointLocation", hdfsCheckPointDir)
.load();
val ds1 = ds.select(from_json(col("value").cast("string"), schema) as 'payload)
val ds2 = ds1.select($"payload.info")
val query = ds2.writeStream.outputMode("append").queryName("table").format("memory").start()
query.awaitTermination()
select * from table; -- don't see anything and there are no errors. However when I run my Kafka consumer separately (independent ofSpark I can see the data)
My question really is what do I need to do just print the data I am receiving from Kafka using Structured Streaming? The messages in Kafka are JSON encoded strings so I am converting JSON encoded strings to some struct and eventually to a dataset. I am using Spark 2.1.0
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame as SparkDataFrame
from pyspark.sql.functions import *
# {"name":"jimyag","age":12,"ip":"111.11.1.1"}
# nc -l p 9999
spark:SparkSession = SparkSession.builder.appName("readJSONData").getOrCreate()
lines:SparkDataFrame = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).load()
jsons = lines.select(from_json(col("value"), "name STRING, age INT, ip STRING").alias("data")).select("data.*")
jsons.writeStream.format("console").start().awaitTermination()
# print
# +------+---+----------+
# | name|age| ip|
# +------+---+----------+
# |jimyag| 12|111.11.1.1|
# +------+---+----------+