I use Flink 1.11.3 with the SQL API and Blink planner. I work in streaming mode and consume a CSV file with the filesystem connector and CSV format. For a time column I generate watermarks and want to do window aggregations based on this time. Like fast-forward the past based on event-time.
val settings = EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build()
Does the time column have to be sorted for this, because row by row is consumed and if it is not sorted, late events could occur, which leads to dropping of the rows?
I am also interested in this issue with the CDC connector of Ververica. Maybe I can reach someone who knows about it. At the beginning it takes a snapshot of the tables and then emits these rows as a change event. What is the correct processing regarding the event time? In which order are they emitted?
Yes, when running in streaming mode you run the risk of having late events, which will be dropped by the SQL API when doing event time windowing.
Since the input is a file, why not run the job in batch mode, and avoid this problem altogether? Otherwise your options include sorting the input (by time), or making sure that the watermarking is configured so as to avoid late events.
As for the ordering of events produced by the CDC connector, I don't know.