We have a spark streaming (spark version 2.4.0 ) job which consumes one Kafka topic(4 partition) which includes business-changes as json with Id. These Kafka values also consist RecordTime field and other fields inside json object. This streaming job upserts a Kudu table according to the Id field.
After a while we noticed that, some updates are really not reflecting the latest state of the values for some id fields. We assume 4 different executor processing per partition and when one of them finishes earlier than other it updates target Kudu table. so if we have values like below:
(Id=1, val=A, RecordTime: 10:00:05 ) partition1
(Id=2, val=A, RecordTime: 10:00:04 ) partition1
(Id=1, val=B, RecordTime: 10:00:07 ) partition2
(Id=1, val=C, RecordTime: 10:00:06 ) partition3
(Id=2, val=D, RecordTime: 10:00:05 ) partition1
(Id=2, val=C, RecordTime: 10:00:06 ) partition4
(Id=1, val=E, RecordTime: 10:00:03 ) partition4
then Kudu table should be like this :
Id | Value | RecordTime |
---|---|---|
1 | B | 10:00:07 |
2 | C | 10:00:06 |
But, sometimes we saw the Kudu table like this :
Id | Value | RecordTime |
---|---|---|
1 | A | 10:00:05 |
2 | C | 10:00:06 |
trigger interval is 1-minute.
So, how can we achieve the ordered update of target Kudu table.
Hope i could explain my problem enough. Briefly, how we can achieve event ordering for per micro-batch interval at spark streaming?
Special thanks to anyone who can help me.
As you are sourcing the data from Kafka, it is useful to recall that Kafka provides only ordering guarantees within a topic partition.
Therefore, you can solve your issue if you have your Kafka producer produce all the messages for the same ID into the same partition. This can either be achieved by a custom paritioner in your KafkaProducer, or if you simply use the value of id as the "key" part of the Kafka message.
If you do not have control over the Kafka producer you will need to make your Spark Streaming job stateful. Here, the challenging part is to define a time frame how long your job should wait for other messages with the same id to arrive. Is it just a few seconds? Maybe a few hours? I have made the experience that this can be difficult to answer, and sometimes the answer is "a few hours" which means you need to keep the state for a few hours which could make your job go OutOfMemory.