Search code examples
javaapache-flinkflink-streamingflink-sqlflink-table-api

Sink flink DataStream using jdbc connector to mysql sink with overwrite


My use case is

  1. Get Data from AWS Kinesis Data stream and filter/map using flink data stream api
  2. Use StreamTable Environment to group and aggregate data
  3. Use SQLTableEnvironment to write to mysql using JDBC Connector

I am able to write my datastream results into mySQL table but due to streaming its appending the each new row, while i want to overwrite.

    consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-central-1");
    consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");


    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(5000);
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, bsSettings);

    // Parse Message
    DataStream<Event> events = env.addSource(
            new FlinkKinesisConsumer<>(
                    Config.INPUT_STREAM,
                    new KinesisEventDeserializationSchema(),
                    consumerConfig
            )
    )
            .uid("kinesisEventSource");
      ....    
      ....
      ....

      SingleOutputStreamOperator<ArticleView> filteredDetailsViewEvents = articleViews
            .filter(new FilterFunction<ArticleView>() {
                @Override
                public boolean filter(ArticleView event) throws Exception {
                    return StringUtils.isNotBlank(event.getArticleNumber());
                }
            })
            .uid("filteredDetailsViewFilter");
    
   
    Table t=tEnv.fromDataStream(filteredDetailsViewEvents);

  
    tEnv.executeSql("CREATE TABLE eventsSlider1 (\n" +
            "  articleNumber String,\n" +
            "  mandant String,\n" +
            "  category STRING,\n" +
            "  cnt BIGINT NOT NULL,\n" +
            " CONSTRAINT pk_event PRIMARY KEY (articleNumber,mandant,category) NOT ENFORCED\n" +
            ") WITH (\n" +
            "   'connector.type' = 'jdbc',\n" +

            "   'connector.url' = 'jdbc:mysql://localhost:3306/events',\n" +
            "   'connector.table' = 'categorySliderItems',\n" +
            "   'connector.username' = 'root',\n" +
            "   'connector.password' = '123456'\n" 

            ")");

   tEnv.executeSql("INSERT INTO eventsSlider1 (SELECT articleNumber,mandant,category,cnt "+
            "FROM ("+
            " SELECT articleNumber,mandant,category,count(articleNumber) as cnt,"+
            " ROW_NUMBER() OVER (PARTITION BY mandant,category ORDER BY count(articleNumber) DESC) as row_num"+
            " FROM "+t+" group by articleNumber,category, mandant)"+
            " WHERE row_num <= 3)");

Solution

  • the problem was that i did not set the proper primary key in the table. as primary key was the the only thing that flink can check in upsert operations and choose update or insert operation.