Search code examples
apache-sparkeventsapache-kafkastreamingkudu

spark streaming pick latest event for every record per trigger process interval


We have a spark streaming (spark version 2.4.0 ) job which consumes one Kafka topic(4 partition) which includes business-changes as json with Id. These Kafka values also consist RecordTime field and other fields inside json object. This streaming job upserts a Kudu table according to the Id field.

After a while we noticed that, some updates are really not reflecting the latest state of the values for some id fields. We assume 4 different executor processing per partition and when one of them finishes earlier than other it updates target Kudu table. so if we have values like below:

(Id=1, val=A, RecordTime: 10:00:05 ) partition1
(Id=2, val=A, RecordTime: 10:00:04 ) partition1
(Id=1, val=B, RecordTime: 10:00:07 ) partition2
(Id=1, val=C, RecordTime: 10:00:06 ) partition3
(Id=2, val=D, RecordTime: 10:00:05 ) partition1
(Id=2, val=C, RecordTime: 10:00:06 ) partition4
(Id=1, val=E, RecordTime: 10:00:03 ) partition4

then Kudu table should be like this :

Id Value RecordTime
1 B 10:00:07
2 C 10:00:06

But, sometimes we saw the Kudu table like this :

Id Value RecordTime
1 A 10:00:05
2 C 10:00:06

trigger interval is 1-minute.

So, how can we achieve the ordered update of target Kudu table.

  1. Should we use single partition for ordering but if we do this pros/cons?
  2. For spark streaming how we can pick the latest record and values at per trigger-interval
  3. Upsert kudu table according to both id and RecordTime but how?
  4. Is there any other approach we can think about?

Hope i could explain my problem enough. Briefly, how we can achieve event ordering for per micro-batch interval at spark streaming?

Special thanks to anyone who can help me.


Solution

  • As you are sourcing the data from Kafka, it is useful to recall that Kafka provides only ordering guarantees within a topic partition.

    Therefore, you can solve your issue if you have your Kafka producer produce all the messages for the same ID into the same partition. This can either be achieved by a custom paritioner in your KafkaProducer, or if you simply use the value of id as the "key" part of the Kafka message.

    If you do not have control over the Kafka producer you will need to make your Spark Streaming job stateful. Here, the challenging part is to define a time frame how long your job should wait for other messages with the same id to arrive. Is it just a few seconds? Maybe a few hours? I have made the experience that this can be difficult to answer, and sometimes the answer is "a few hours" which means you need to keep the state for a few hours which could make your job go OutOfMemory.