Search code examples
apache-flinkflink-streaming

Unable to access global parameters within KafkaRecordDeserializationSchema


I have an Apache Flink job that initially stores certain configuration data as "global job parameters":

final var env = StreamExecutionEnvironment.getExecutionEnvironment();
final var executionConfig = env.getConfig();
...
executionConfig.setGlobalJobParameters(params);

The problem I'm facing is that I have a custom KafkaRecordDeserializationSchema where I need access to those — in the same way any rich user function could do.

I have several fields/members that are getting initialized within there because that class is doing a little bit more than just deserializing messages, so I need them.

Is there a way to do something similar to getRuntimeContext().getExecutionConfig().getGlobalJobParameters() within any of the deserializers or would it be better to split the flow and do several operations within more "specialized" function(s)?


Solution

  • The DeserializationSchema.InitializationContext that's passed in doesn't provide access to the execution configuration. But what you can do is to define a constructor for your custom KafkaRecordDeserializationSchema to which you pass the configuration.