Search code examples
apache-flinkflink-streaming

How to dynamic serialize parameter in flink streaming


I have a parameter params to serialize in flink streaming,

class P extend Serializable {...}

val params = new P(...)
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource(new MySource(params))
   .map(new MyMap(params))
   .addSink(new MySink(params))
env.setParallelism(1)
env.execute("My Job")

But params would change in driver node and I need to update params to executor during job running. Is it possible based on no stop of flink streaming job?


Solution

  • in short, the answer is no. Because your UDF will need to [de]serialize the parameters every time a new record comes and this will slow down the execution.

    However, you can implement your own stream operator by extending AbstractUdfStreamOperator and call it in a transform operation . I did one example here: "Implementing my own stream operator in Flink to deal with data skew".

    Then you decide on the operator when to read the new parameter. Just create a new thread that is schedule to every 10 minutes for instance. The parameter files have to be placed on all nodes that the operator will be running.