Search code examples
apache-flinkflink-streaming

How to get and modify jobgraph in Flink?


I want to modify jobGraph, like getVertices(), ColocationGroup() and I tried following:

    val s = StreamExecutionEnvironment.getExecutionEnvironment
    val p = new Param()
    s.addSource(new MySource(p))
      .map(new MyMap(p))
      .addSink(new MySink(p))
    val g = s.getStreamGraph.getJobGraph()
    val v = g.getVertices()
    s.executeAsync()
    

But in debug mode the vertices is not my MySource operator, and I am confused that getStreamGraph could accept a JobID, how could a job have an id before it start..., so I thought maybe I should get it after execute and I changed code to following,

    val s = StreamExecutionEnvironment.getExecutionEnvironment
    val p = new Param()
    s.addSource(new MySource(p))
      .map(new MyMap(p))
      .addSink(new MySink(p))
    s.executeAsync()
    val g = s.getStreamGraph.getJobGraph()
    val v = g.getVertices()

But the code get stuck in s.getStreamGraph.getJobGraph()

How to get the jobgraph actually?...


Solution

  • It's not possible to modify the JobGraph. The various APIs construct the JobGraph, which is then makes its way to the JobManager, which turns it into an execution graph, which is then scheduled and run in task slots provided by the task managers. There's no API that will allow you to modify the job's topology once it has been established. (It's not inconceivable that this could someday be supported, but it's not possible now.)

    If you just want to see a representation of it, System.out.println(env.getExecutionPlan()) is interesting.

    If you are looking for more dynamism that you can get from a static DAG, the Stateful Functions API is much more flexible.