I'm getting data from kafka topic, then exploding array and producing multiple events using flatMap.
Incoming event format:
Event(eventId: Long, time: Long)
IncomingEvent(customerId: Long, events: List[Event])
Event format after exploding incoming event:
EventAfterExploding(customerId: Long, eventId: Long, time: Long)
These events will be written to the MySQL by using JDBC sink provided by Flink.
The data stored in the same Kafka partitions has the same customer id, that means I don't have any ordering issue here. But there can be lots of eventIds
in an event, so that means there can be lots of events in the same flink partition after flatMap
operation. This will cause latency or maybe OOM issues because one operator have to process more data. In order to prevent this issue, I can apply repartition or increase parallelism. But there is also one more concern here is that every (customerId, eventId)
pair have to be sent to the same sink operator because there can be race condition issues if different writers try to operate same pair. For example;
event1 => EventAfterExploding(1, 1, 1)
event2 => EventAfterExploding(1, 1, 2)
In this scenario, database have to contains event2
that has the latest time, but if these two data go to different sink partitions, event1
can be in database instead of event2
.
How can I solve race condition problem and scaling problem that happens when there are lots of data in the same partition? Does applying code block given below solves these problem? I though code given below, because after keyBy
operation data will be redistribute and it will also guarantee that the same data will ben sent to the same partition but just want to make sure. Thanks!
incomingEvents
.flatMap(new ExplodingFunction())
.keyBy(event => (event.customerId, event.eventId))
.addSink(JdbcSink.sink(...))
Yes, that code will have the effect you are looking for. All events for the same customerId and eventId will go to the same instance of the sink.