I have three streaming jobs in Spark Structured Streaming, ds1, ds2 and ds3, each job is sink the streaming to a database:
val df1 = ...
val ds2 = ...
val ds3 = ...
ds1
.writeStream
.queryName("stream1")
.outputMode(OutputMode.Append())
.foreachBatch((ds, _) => {
MyUtils.save2Redis(ds)
})
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
ds2
.writeStream
.queryName("stream2")
.outputMode(OutputMode.Update())
.foreachBatch((ds, _) => {
MyUtils.save2Hbase(ds)
})
.trigger(Trigger.ProcessingTime("1 seconds"))
.start()
ds3
.writeStream
.queryName("stream3")
.outputMode(OutputMode.Update())
.foreachBatch((ds, _) => {
MyUtils.save2Hbase(ds)
MyUtils.save2Hbase(ds)
})
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
spark.streams.awaitAnyTermination()
Is it possible to rewrite the above Spark code to Apache Flink, as following:
val ds1 = ...
val ds2 = ...
val ds3 = ...
ds1.addSink()
ds2.addSink()
ds3.addSink()
env.execute("multi jobs")
I checked the how-to-submit-multiple-flink-jobs-using-single-flink-application and flink-runs-multiple-jobs-from-single-jar and multiple-jobs-in-the-same-apache-flink-project, and still don't understand how to do it.
Does Fink have the feature like awaitAnyTermination
in Spark Structured Streaming?
You can do this in Flink. Create multiple sources, and sink data into different tables for different sources. There will be multiple independent streaming flows in job DAG if there is no data join/union between them.
One thing to note is checkpoint. In Spark job, actually there are three streaming queries and they will do checkpoint independently. But in Flink, if we put them into one Flink application, checkpoint will be synced. That means if source1 -> sink1 is slow, then checkpointing for source2 -> sink2 will also be delayed. If it matters, better separate them into different Flink jobs.