Search code examples
apache-flinkflink-streamingflink-cepflink-table-api

Integration DataStreamAPI and TableAPI


In addition to this question I've create this example to integrate the DataStreamAPI and the TableAPI and this time I has no error, and I have two jobs instead of one, one is created for the DataStreamAPI which is running perfect, and the other job is created for the TableAPI which is running perfect too, but the only issue is that never receive any value from the DataStreamAPI, example:

        /*FILTERING NULL IDs*/
        final SingleOutputStreamOperator<Event> stream_filtered = eventsStream
                .filter(new NullidEventsFilterFunction())
                .uid("id_filter_operator")
                .name("Event Filter");

        final StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(TableEnvironmentConfiguration.getEnv(), fsSettings);
        SingleOutputStreamOperator<String> toTable = stream_filtered.map(x -> x.id).name("Map for table");
        Table source = fsTableEnv.fromDataStream(toTable);
        source.execute(); /*without this line the TableAPI job is not started, but nothing happens if is not there either*/
        DataStream<String> finalRes = fsTableEnv.toAppendStream(source, String.class);
        finalRes.map((MapFunction<String, String>) value -> value)
                .name("Mapping after table")
                .addSink(new SinkFunction<String>() {
            @Override
            public void invoke(String value) {
                LOG.info("Record from table: " + value);
            }
        }).name("Sink after map from table");
        
        /*STARTING TRANSFORMATIONS*/
        Init.init(stream_filtered);
        env.execute(job_name);

by doing that I can see this line in the logger:

INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Event Mapper -> Watermarks Added -> Event Filter -> Map for table -> SourceConversion(table=[Unregistered_DataStream_5], fields=[f0]) -> SinkConversionToRow -> Sink: Select table sink (1/1) (0d3cd78d35480c44f09603786bf775e7) switched from DEPLOYING to RUNNING.

but no record is received or sent out.

See the image for the DataStream job enter image description here

and see the image for the TableAPI job enter image description here

Any idea? Thanks in advance. Kind regards!


Solution

  • If you want to write one job that starts and ends with the DataStream API, and uses the Table API in the middle, then here's a simple example you can build upon.

    Note that the details involved have changed some from release to release, and this particular example works as written with Flink 1.11. FLIP-136: Improve interoperability between DataStream and Table API is being worked on to make this even easier.

    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    import static org.apache.flink.table.api.Expressions.$;
    
    public class BackAndForth {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            SingleOutputStreamOperator<Tuple2<String, Long>> rawInput = env.fromElements(
                    new Tuple2<>("u2", 0L),
                    new Tuple2<>("u1", 5L),
                    new Tuple2<>("u2", 1L),
                    new Tuple2<>("u3", 1L),
                    new Tuple2<>("u1", 0L),
                    new Tuple2<>("u1", 3L),
                    new Tuple2<>("u2", 2L));
    
            Table events = tableEnv.fromDataStream(rawInput, $("userId"), $("value"));
    
            Table results = events
                    .select($("userId"), $("value"))
                    .where($("value").isGreater(0));
    
            tableEnv
                    .toAppendStream(results, Row.class)
                    .print();
    
            env.execute();
        }
    }
    

    You may be concerned that in the web UI it is showing "Records Sent: 0" and "Records Received: 0". This is very misleading. These Flink metrics only measure records and bytes flowing within Flink, and do not report any i/o with external systems. These metrics also do not report records and bytes flowing between operators that are chained together. Everything in those two jobs is chained, so records/bytes sent/received will always be zero in this case.