We have a Streaming Job that has 20 separate pipelines, with each pipeline having one/many Kafka topic sources and with some pipelines having Windowed Processor and others being a Non-Windowed Processor.
We are noticing data loss for Windowed Processor pipelines when the job goes down and takes some time to recover/when the job needs to be restarted.
I have set UID for all of the Operators and I can see in logs that offsets are being restored from savepoint for the Kafka consumer operator
we are using BoundedOutOfOrdernessTimestampExtractor to Assign watermarks based on event time.
public class KafkaEventTimestampExtractor extends BoundedOutOfOrdernessTimestampExtractor<Event> implements Serializable{
public KafkaEventTimestampExtractor(Time maxOutOfOrderness) {
super(maxOutOfOrderness);
}
@Override
public long extractTimestamp(Event element) {
try {
log.info("event to be processed, event:{}", new ObjectMapper().writeValueAsString(element));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
Long ts = null;
ts = Double.valueOf(Double.parseDouble(element.getTs())).longValue();
ts = ts.toString().length() < 13 ? ts * 1000 : ts;
return ts;
}
}
Pipeline Config looks something like this.
SourceUtil
.getEventDataStream(env, kafkaSourceSet)
.process(new S3EventProcessor()).uid(“…..**)
.addSink();
SourceUtil
.getEventDataStream(env, kafkaSourceSet)
.assignTimestampsAndWatermarks(
new KafkaEventTimestampExtractor(Time.seconds(4)))
.windowAll(TumblingEventTimeWindows.of(
Time.milliseconds(kafkaSourceSet.bufferWindowSize))
.process(new S3EventProcessor()).uid(“…..**)
.addSink();
Lets say job is down 30 min, in that case pipeline where we do not use window processor does not miss any data but paritial data is missed from the windowed processor for those 30 min.
when we increase the out-of-order events delay in TimeWinows, ie- we increased It to 30min from 4sec, then the events are not getting missed if the application is up within 30min.we are getting nowhere near the solution since the delay of more than 1 min is infeasible for us also there will be too many live windows which will mean huge infra change for us.
The only scenario I can imagine that might explain this is if the event timestamps are affected by the outage. Then a 30-minute outage would cause a 30-minute gap in the timestamps, and with out-or-order ingestion, a 4-second bounded-out-of-orderness strategy will yield some late events that will be dropped by the window.