Search code examples
apache-flinkflink-streamingflink-sqlpyflink

How to have multiple INSERT statements within single PyFlink SQL job?


Is it possible to have more than one INSERT INTO ... SELECT ... statement within a single PyFlink job (on Flink 1.13.6)?

I have a number of output tables that I create and I am trying to write to write to these within a single job, where the example Python & SQL looks like (assume there is an input table called 'input'):

sql1 = "INSERT INTO out1 (col1, col2) SELECT col1, col2 FROM input"
sql2 = "INSERT INTO out2 (col3, col4) SELECT col3, col4 FROM input"

env.execute_sql(sql1)
env.execute_sql(sql2)

When this is run inside a Flink cluster inside Kinesis on AWS, I get a failure:

Cannot have more than one execute() or executeAsync() call in a single environment.

When I look at the Flink web UI, I can see that there is one job called insert-into_default_catalog.default_database.out1. Does Flink separate out each INSERT statement into a separate job? It looks like it tries to create one job for the first query and then fails to create a second job for the second query.

Is there any way of getting it to run as a single job using SQL, without having to move away from SQL and the Table API?


Solution

  • If you want to do multiple INSERTs, you need to wrap them in a statement set:

    stmt_set = table_env.create_statement_set()
    
    # only single INSERT query can be accepted by `add_insert_sql` method
    stmt_set.add_insert_sql(sql1)
    stmt_set.add_insert_sql(sql2)
    
    # execute all statements together
    table_result = stmt_set.execute()
    
    # get job status through TableResult
    print(table_result.get_job_client().get_job_status())
    

    See the docs for more info.