Search code examples
apache-flinkflink-streaming

When does flink initiates RocksDBKeyedStateBackend directory delete?


My Flink jobs runs for few days without any issues, but after somedays it kills the tm and restarts the entire job. in the log I found this, org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend - Deleting existing instance base directory /tmp/flink-io-4b455efa-bcde-4ef2-aed3-c66ca9d8933e/job_152b986e7e5a6f411780849f13ce4bc8_op_KeyedProcessOperator_a1c286a47e97622aa92a8f6cd4115854__1_4__uuid_4b53ff24-e240-48d6-b438-3ab2d05cbdb8

after it deleted the statestore file, it throws this error, because i am fetching data from statestore there.

java.lang.NullPointerException
    at c.c.w.d.s.b.aggregator.StateProcessFunction.addEvent(StateProcessFunction.java:81)
    at c.cs.w.d.s.b.a.StateProcessFunction.processElement(StateProcessFunction.java:113)
    at c.c.w.d.s.b.a.StateProcessFunction.processElement(ContactStateProcessFunction.java:26)
    at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:164)
    at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)

question is when and why flink backend statestore file delete initiated ? is there something which is not printed in the log ?


Solution

  • This is happening because of concurrency. mapstate.get() throws an exception, sometimes when rocksdbbackend asynch thread deleting the expired data it locks the file, at the same time if you try get it throws exception which is causing tm to restart.