Flink throws "java.lang.IllegalStateException: Buffer pool is destroyed" when I use "SlidingEventTimeWindows", but every thing goes ok when I change to "SlidingProcessingTimeWindows".
The stacktrace is following:
18:37:53,728 WARN org.apache.flink.streaming.api.operators.AbstractStreamOperator - Error while emitting latency marker.
java.lang.RuntimeException: Buffer pool is destroyed.
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:147)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:683)
at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:151)
at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:230)
at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:125)
at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:93)
at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:144)
... 10 more
I finally solved the steps following.
First, replace "collect" with "collectWithTimestamp" in My DataMockSource,which is used for generate stream data.After doing so,"Error while emitting latency marker" will disapear in console.
Second , replace the BoundedOutOfOrdernessTimestampExtractor with AscendingTimestampExtractor,which is used for EventTime processing.In my DataMockSource ,I generate data and emit on the same time,so AscendingTimestampExtractor is the right way to generate watermark.
I post the major code here, and full project on github. Hope it is helpfull.
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(10000); //
DataStreamSource<MockData> mockDataDataStreamSource = env.addSource(new DataMockSource());
mockDataDataStreamSource.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<MockData>() {
@Override
public long extractAscendingTimestamp(MockData element) {
return element.getTimestamp();
}
});
SingleOutputStreamOperator<Tuple2<String, Long>> countStream = mockDataDataStreamSource
.keyBy("country").window(
SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
// .allowedLateness(Time.seconds(5))
.process(
new FlinkEventTimeCountFunction()).name("count elements");
countStream.addSink(new SinkFunction<Tuple2<String, Long>>() {
@Override
public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
System.out.println(value);
}
});
env.execute("count test ");
My DataMockSource is here:
private volatile boolean running = true;
@Override
public void run(SourceContext sourceContext) throws Exception {
while (running){
MockData mockData = new MockData();
mockData.setAge(ThreadLocalRandom.current().nextInt(1,99));
mockData.setCountry("country "+ThreadLocalRandom.current().nextInt(2,5));
mockData.setId(ThreadLocalRandom.current().nextLong());
mockData.setTimestamp(Instant.now().toEpochMilli());
// emit record with timestamp
sourceContext.collectWithTimestamp(mockData,Instant.now().toEpochMilli());
// sourceContext.collect(mockData);
TimeUnit.SECONDS.sleep(3);
}
}
@Override
public void cancel() {
running = false;
}
When working in event time you need to arrange for timestamp extraction and watermarking to occur, either in your source or by using assignTimestampsAndWatermarks. It looks like you aren't doing that, which would explain why you won't get any output (the event-time window will never be triggered).
Also, your source should have a cancel method. Something like this:
private volatile boolean running = true;
@Override
public void run(SourceContext ctx) throws Exception {
while (running) {
...
}
}
@Override
public void cancel() {
running = false;
}
I think this might explain the exception you are seeing. Perhaps the source is continuing to run and send latency markers after the job has started to shut itself down.