Search code examples
apache-flinkflink-streamingflink-sql

Dynamic SQL Query in Flink


I have a SQL query like this

String ipdetailsSql = "select sid, _zpsbd6 as ip_address, ssresp, reason,  " +
        "SUM(CASE WHEN botcode='r1' THEN 1 ELSE 0 END ) as icf_count, " +
        "SUM(CASE WHEN botcode='r2' THEN 1 ELSE 0 END ) as dc_count, " +
        "SUM(CASE WHEN botcode='r5' THEN 1 ELSE 0 END ) as badua_count, " +
        "COUNT(*) as hits, TUMBLE_START(ts, INTERVAL '1' MINUTE) AS fseen " +
        "from sourceTopic   " +
        "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), sid, _zpsbd6, ssresp, reason";

Based on the user input I want to change the botcode='r1' to given input. Say botcode='r10' without restarting the job. Is there a way to do this. I am on flink 1.7 using stream env. I tried config stream to read the inputs. But stuck at how to change the query on fly. Can anyone help me with this? Thanks in advance


Solution

  • A stream SQL query isn't something that is executed once and is done, but rather is a declarative expression of a continuous computation. It's not possible to make arbitrary changes to that computation without starting a new job with a new query.

    In simple cases, though, there are things you can do. You might consider whether it could work to join your sourcetopic with another stream that effectively provides some query parameters. Or you might find it affordable to compute all conceivably desired results and then select the actually desired results downstream.