I have a requirement where I have to get messages from 3 kafka topics as stream data and then to generate result based on Joins between these 3 topics data. Please suggest me a good approach using Direct Stream for Scala. Thanks
If the data in different topics is same and you have same processing logic while consuming data, you can consume from different topics in same stream and do aggregation. If processing logic is different for different topics, then specify concurrentThreads as 4 and then do aggregation among 4 streams. You can check spark structured streaming documentation for consuming from multiple topics.
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1,topic2")
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)]
<--- your aggregation logic here --->