Search code examples
apache-kafkaapache-flinkflink-streamingflink-sql

Flink : DataStream to Table


Usecase: Read protobuf messages from Kafka, deserialize them, apply some transformation (flatten out some columns), and write to dynamodb.

Unfortunately, Kafka Flink Connector only supports - csv, json and avro formats. So, I had to use lower level APIs (datastream).

Problem: If I can create a table out of the datastream object, then I can accept a query to run on that table. It would make the transformation part seamless and generic. Is it possible to run a SQL query over datastream object?


Solution

  • If You have a DataStream of objects, then You can simply register given DataStream as Table using StreamTableEnvironment.

    This would look more or less like below:

    val myStream = ...
    val env: StreamExecutionEnvironment = configureFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment)
    val tEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
    tEnv.registerDataStream("myTable", myStream, [Field expressions])
    

    Then You should be able to query the dynamic table created from Your DataStream.