Search code examples
apache-flinkflink-streaming

"Buffer pool is destroyed" when I use Flink SlidingEventTimeWindows


Flink throws "java.lang.IllegalStateException: Buffer pool is destroyed" when I use "SlidingEventTimeWindows", but every thing goes ok when I change to "SlidingProcessingTimeWindows".

The stacktrace is following:

18:37:53,728 WARN  org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error while emitting latency marker.
java.lang.RuntimeException: Buffer pool is destroyed.
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:147)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:683)
	at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:151)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:230)
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:125)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:93)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:144)
	... 10 more

I finally solved the steps following.

First, replace "collect" with "collectWithTimestamp" in My DataMockSource,which is used for generate stream data.After doing so,"Error while emitting latency marker" will disapear in console.

Second , replace the BoundedOutOfOrdernessTimestampExtractor with AscendingTimestampExtractor,which is used for EventTime processing.In my DataMockSource ,I generate data and emit on the same time,so AscendingTimestampExtractor is the right way to generate watermark.

I post the major code here, and full project on github. Hope it is helpfull.

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(10000); //

DataStreamSource<MockData> mockDataDataStreamSource = env.addSource(new DataMockSource());
mockDataDataStreamSource.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<MockData>() {
      @Override
      public long extractAscendingTimestamp(MockData element) {
        return element.getTimestamp();
      }
    });

SingleOutputStreamOperator<Tuple2<String, Long>> countStream = mockDataDataStreamSource
    .keyBy("country").window(
        SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
//        .allowedLateness(Time.seconds(5))
    .process(
        new FlinkEventTimeCountFunction()).name("count elements");

countStream.addSink(new SinkFunction<Tuple2<String, Long>>() {
  @Override
  public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
    System.out.println(value);
  }
});

env.execute("count test ");

My DataMockSource is here:

private volatile boolean running = true;
  @Override
  public void run(SourceContext sourceContext) throws Exception {
    while (running){
      MockData mockData = new MockData();
      mockData.setAge(ThreadLocalRandom.current().nextInt(1,99));
      mockData.setCountry("country "+ThreadLocalRandom.current().nextInt(2,5));
      mockData.setId(ThreadLocalRandom.current().nextLong());
      mockData.setTimestamp(Instant.now().toEpochMilli());
      // emit record with timestamp
      sourceContext.collectWithTimestamp(mockData,Instant.now().toEpochMilli());
//      sourceContext.collect(mockData);

      TimeUnit.SECONDS.sleep(3);
    }
  }

  @Override
  public void cancel() {
     running = false;
  }


Solution

  • When working in event time you need to arrange for timestamp extraction and watermarking to occur, either in your source or by using assignTimestampsAndWatermarks. It looks like you aren't doing that, which would explain why you won't get any output (the event-time window will never be triggered).

    Also, your source should have a cancel method. Something like this:

    private volatile boolean running = true;
    
    @Override
    public void run(SourceContext ctx) throws Exception {
        while (running) {
            ...
        }
    }
    
    @Override
    public void cancel() {
        running = false;
    }
    

    I think this might explain the exception you are seeing. Perhaps the source is continuing to run and send latency markers after the job has started to shut itself down.