I would want to use pureconfig with apache Flink.
How can I pass additional java properties when starting the job?
I try to pass it via: -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"
argument, but it is not accepted:
https://github.com/geoHeil/streaming-reference/blob/5-basic-flink-setup/Makefile#L21
flink run --class com.github.geoheil.streamingreference.tweets.TweetsAnalysis \ "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
-yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'"
And the main class is failing when it is trying to instanciate the configuration form the configuration file.
Note, a full reference is available at https://github.com/geoHeil/streaming-reference. You can reproduce the above error by:
git clone [email protected]:geoHeil/streaming-reference.git
cd streaming-reference
git checkout 5-basic-flink-setup
make run-local-Tweets
And should see the exception of:
ConfigurationException: Failed to start. There is a problem with the configuration: ConfigReaderFailures(ConvertFailure(KeyNotFound(foo,Set()),None,),List())
In Spark, this property is called: extraJavaOptions
.
I.e., I tried to use the method of Flink: How to pass extra JVM options to TaskManager and JobManager, but so far it does not work for the current version of Flink (1.10.1)
This property would be equivalent to spark.driver.extraJavaOptions
in Apache Spark. And I believe, it would need to be passed to the job manager.
If I read the documentation -yD
, only works on YARN. But I also need something which works locally as well.
further related articles:
Copying the answer from the mailing list.
If you reuse the cluster for several jobs, they need to share the JVM_ARGS
since it's the same process. [1] On Spark, new processes are spawned for each stage afaik.
However, the current recommendation is to use only one ad-hoc cluster per job/application (which is closer to how Spark works). So if you use YARN, every job/application spawns a new cluster that just has the right size for it. Then you can supply new parameters for new YARN submission with
flink run -m yarn-cluster -yD env.java.opts="-Dconfig.file='config/jobs/twitter-analysis.conf'" -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis
"usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
However, make sure that the path is accessible from within your YARN cluster, since the driver is probably executed on the cluster (not 100% sure).
To add a file to yarn deployment, please use
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
If you want per job level configurations on a shared cluster, I'd recommend to use normal parameters and initialize PureConfig manually (haven't used it, so not sure how). Then, you'd probably invoke your program as follows.
flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar" config.file='config/jobs/twitter-analysis.conf'
For local execution, I had some trouble configuring it as well (tried it with your code). The issue is that all parameters that we previously tried are only passed to newly spawned processes while your code is directly executed in the CLI.
FLINK_ENV_JAVA_OPTS=-Dconfig.file="`pwd`/config/jobs/twitter-analysis.conf" flink run -class com.github.geoheil.streamingreference.tweets.TweetsAnalysis "usecases/tweets/build/libs/tweets_${SCALA_VERSION}-${VERSION}-all.jar"
FLINK_ENV_JAVA_OPTS
is usually parsed from flink-conf.yaml using the env.java.opts but doesn't respect -Denv.java.opts
. I'm not sure if this is intentional.
If you could put the env.java.opts
in the flink-conf.yaml, it would most likely work for both YARN and local. With FLINK_CONF_DIR
you can set a different conf dir per job. Alternatively, you could also specify both FLINK_ENV_JAVA_OPTS
and -yD
to inject the property.