I am consuming data from a topic and as we know we get data real time, where we see repeated elements, How can actually replace old message with latest message.
I am using the following same code to consume from a topic
schema = StructType(
[
StructField("Id",StringType(),True),
StructField("cTime",StringType(),True),
StructField("latestTime",StringType(),False),
StructField("service",StringType(),True),
]
topic = "topic1"
bootstrap_servers = "mrdc.it.com:9093,mrdc.it.com:9093,mrdc.it.com:9093"
options = {
"kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxxx.aud.com" password="xxxxxxxx";',\
"kafka.ssl.ca.location": "/tmp/cert.crt",\
"kafka.sasl.mechanism": "PLAIN",\
"kafka.security.protocol" : "SASL_SSL",\
"kafka.bootstrap.servers": bootstrap_servers,\
"failOnDataLoss": "false",\
"subscribe": topic,\
"startingOffsets": "latest",\
"enable.auto.commit": "false",\
"auto.offset.reset": "false",\
"enable.partition.eof": "true",\
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
"value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_df = spark.readStream.format("kafka").options(**options).load()
kafka_mobile_apps_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("apps"))
df = avertack_kafka_eventhub_connections(source= "KAFKA", kafka_config=kafka_config)
sql_features = ["apps.Id",
"apps.cTime",
"apps.latesTime",
"apps.service"
]
kafka_df_features = df.selectExpr(sql_features)
display(kafka_df_features)
The output is as shown
Id cTime latestTime service
3178 2022-03-03T20:39:52.889Z 2022-03-03T20:39:58.601Z mobile
3178 2022-03-03T20:39:52.889Z 2022-03-03T20:39:59.012Z mobile
3240 2022-03-03T20:39:59.140Z 2022-03-03T20:39:59.220Z mobile
3246 2022-03-03T20:40:00.615Z 2022-03-03T20:40:00.648Z mobile
.
.
.
How can we overwrite row 1 with row 2, using keys as ["id"] where "latestTime" column, how to keep only the latest time message.
Is there any approach in real time, if not how can we at least check once in a hour to replace the old messages with new
final output
Id cTime latestTime service
3178 2022-03-03T20:39:52.889Z 2022-03-03T20:39:59.012Z mobile
3240 2022-03-03T20:39:59.140Z 2022-03-03T20:39:59.220Z mobile
3246 2022-03-03T20:40:00.615Z 2022-03-03T20:40:00.648Z mobile
.
.
.
.
Spark is intended more for batch/micro-batch processing; it gets a collection of records which you can order by time and take the "latest", or you can "group by id" and do the same...
But, you will have to combine this with some database / persistent storage to create a view of "most recent, by id". I've seen this done with HBase, Couchbase, MongoDB, etc. which all have some level of Spark integration, if that is a requirement.
Out of the box, with Spark, I don't think it provides this easily (there is a RocksDB State Store you can look at).
Alternatively, and natively to Kafka, there is Kafka Streams which provides exactly what you're looking for, although is done with Java.
If you must use Python, KSQLdb can be setup and its API can be used from any other language and allows you to more easily define stream processing as SQL statements.
Kafka Connect is another alternative, for example, if you have a standard relational database, and the ID in your data is the primary key of a table, then matching keys will perform UPDATE
queries and overwrite existing records, or insert on new IDs.