Search code examples
pythonapache-sparkpysparkapache-kafkaspark-structured-streaming

Is there way to replace older messages with latest in Kafka consumption(avoid duplicates in final df)


I am consuming data from a topic and as we know we get data real time, where we see repeated elements, How can actually replace old message with latest message.

I am using the following same code to consume from a topic

schema = StructType(
    [
        StructField("Id",StringType(),True),
        StructField("cTime",StringType(),True),
        StructField("latestTime",StringType(),False),
        StructField("service",StringType(),True),
    ]

topic = "topic1"
bootstrap_servers = "mrdc.it.com:9093,mrdc.it.com:9093,mrdc.it.com:9093"

options = {
    "kafka.sasl.jaas.config": 'org.apache.kafka.common.security.plain.PlainLoginModule required username="xxxxx.aud.com" password="xxxxxxxx";',\
    "kafka.ssl.ca.location": "/tmp/cert.crt",\
    "kafka.sasl.mechanism": "PLAIN",\
    "kafka.security.protocol" : "SASL_SSL",\
    "kafka.bootstrap.servers": bootstrap_servers,\
    "failOnDataLoss": "false",\
    "subscribe": topic,\
    "startingOffsets": "latest",\
    "enable.auto.commit": "false",\
    "auto.offset.reset": "false",\
    "enable.partition.eof": "true",\
    "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",\
    "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
}
kafka_df = spark.readStream.format("kafka").options(**options).load()

kafka_mobile_apps_df = kafka_df.select(from_json(col("value").cast("string"), schema).alias("apps"))

df = avertack_kafka_eventhub_connections(source= "KAFKA", kafka_config=kafka_config)

sql_features = ["apps.Id",
                "apps.cTime",
                "apps.latesTime", 
                "apps.service"
               ]

kafka_df_features = df.selectExpr(sql_features)
display(kafka_df_features)

The output is as shown

Id              cTime                   latestTime              service
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:58.601Z    mobile
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:59.012Z    mobile
3240    2022-03-03T20:39:59.140Z    2022-03-03T20:39:59.220Z    mobile
3246    2022-03-03T20:40:00.615Z    2022-03-03T20:40:00.648Z    mobile
.
.
.

How can we overwrite row 1 with row 2, using keys as ["id"] where "latestTime" column, how to keep only the latest time message.

Is there any approach in real time, if not how can we at least check once in a hour to replace the old messages with new

final output

Id              cTime                   latestTime              service
3178    2022-03-03T20:39:52.889Z    2022-03-03T20:39:59.012Z    mobile
3240    2022-03-03T20:39:59.140Z    2022-03-03T20:39:59.220Z    mobile
3246    2022-03-03T20:40:00.615Z    2022-03-03T20:40:00.648Z    mobile
.
.
.
.

Solution

  • Spark is intended more for batch/micro-batch processing; it gets a collection of records which you can order by time and take the "latest", or you can "group by id" and do the same...

    But, you will have to combine this with some database / persistent storage to create a view of "most recent, by id". I've seen this done with HBase, Couchbase, MongoDB, etc. which all have some level of Spark integration, if that is a requirement.

    Out of the box, with Spark, I don't think it provides this easily (there is a RocksDB State Store you can look at).


    Alternatively, and natively to Kafka, there is Kafka Streams which provides exactly what you're looking for, although is done with Java.

    If you must use Python, KSQLdb can be setup and its API can be used from any other language and allows you to more easily define stream processing as SQL statements.


    Kafka Connect is another alternative, for example, if you have a standard relational database, and the ID in your data is the primary key of a table, then matching keys will perform UPDATE queries and overwrite existing records, or insert on new IDs.