Search code examples
apache-flinkflink-streamingflink-sql

Flink query submission in Application Mode


I am relatively new to Flink and checking if the following is a good way to submit Flink jobs without sending JARs across. Note: This works locally for me and I am in the process of trying this out on AWS EMR. But want to know if there is anything I am missing here or should be careful about:

In the main method, I set up the necessary data sources and then poll a remote HTTP endpoint for jobs (queries) - something like this:

public static void main(String[] args) throws Exception {
   //Create env
   //Define sources
   //Define source tables
   Job myJob;
   while ((myJob = getNextJob(..call endpoint..)) != null) {
      //Get the query
      Table resultTable = tableEnv.sqlQuery(myJob.getQuery());
      DataStream<Row> resultStream = tableEnv.toDataStream(resultTable);
      SinkFunction<Row> printSink = new MyHTTPSinkFunction(remoteJobId);
      resultStream.addSink(printSink);
      JobClient jobRef = env.executeAsync(); 
      //Send ack to remote endpoint
   }
}

Of course there is additional tracking using listeners and all and edge condition checks (e.g. job failures etc.).

The JAR is put under lib folder of the Flink cluster and then I start it up in Application Mode using instructions here.

With this, I am able to submit jobs pretty quickly and get back results with very low latency (currently for batch jobs, but we may use this approach in streaming mode as well). I assume if the JobManager crashes, a backup Job Manager will continue polling and submitting jobs. And with the JobID I can track jobs as well.

Does this appear a reasonable approach? Any feedback will be much appreciated.


Solution

  • Unfortunately the application mode currently only supports running a single job in the highly available setup. This is mostly due to the main() method being executed in the Dispatcher (part of the JobManager process).

    See the https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/overview/#application-mode for more details.

    Session cluster is best suited (it's actually an only option right now) for running multiple jobs on a single Flink cluster in highly available setup.