I have a structured Streaming program, which read data from Kafka topic A, and does some processing, and finally puts data into target Kafka Topic.
Note : the processing is done in function - convertToDictForEachBatch(), which called using - foreachBatch(convertToDictForEachBatch)
As part of the processing, it reads another Kafka Topic (events_topic), and if there is a New record(s) after the last read, it does some additional processing - reloads data from BigQuery table, and persists it.
df_stream = spark.readStream.format('kafka') \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location) \
.option("kafka.ssl.keystore.password", ssl_keystore_password) \
.option("kafka.bootstrap.servers",kafkaBrokers)\
.option("subscribe", topic) \
.option("kafka.group.id", consumerGroupId)\
.option("startingOffsets", "latest") \
.option("failOnDataLoss", "false") \
.option("maxOffsetsPerTrigger", 10000) \
.load()
print(" df_stream -> ", df_stream)
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
.outputMode("append") \
.trigger(processingTime='4 minutes') \
.option("numRows",10000)\
.option("truncate", "false") \
.option("checkpointLocation", checkpoint) \
.foreachBatch(convertToDictForEachBatch) \
.start()
query.awaitTermination()
# called from - foreachbatch
def convertToDictForEachBatch(df, batchId):
# Uses the dataframe to do processing of data, the code is not added, since it is not relevant to this question
# Additional processing i.e. reloading of prediction data from Big query, into Data Frame - based on event in Kafka topic
# checks for event in topic - topic_reloadpred and further processing takes place if there is new data in the topic
events = spark.read.format('kafka') \
.option("kafka.bootstrap.servers", kafkaBrokers) \
.option("kafka.security.protocol", "SSL") \
.option("kafka.ssl.truststore.location", ssl_truststore_location) \
.option("kafka.ssl.truststore.password", ssl_truststore_password) \
.option("kafka.ssl.keystore.location", ssl_keystore_location_reloadpred) \
.option("kafka.ssl.keystore.password", ssl_keystore_password_reloadpred) \
.option("subscribe", topic_reloadpred) \
.option("kafka.group.id", consumerGroupId_reloadpred) \
.load()
# events is passed to a function, and processing is done if new events are generated
What is the best way to achieve this ? The current code is reading the entire data in the kafka topic, i need it to read only the new data.
As suggested by @Rishabh Sharma, i'm storing the offset in a separate kafka topic which has a single partition(i could store it in separate partition in the same topic as well). During processing, i'm checking the last updated offset against the current offset added. If the current offset is more than the last updated offset, i do further processing (i.e. reload the table from BigQuery)