I'm fairly new to Flink and need some help with my technical use-case.
Current Scenario:
I have a flink application that runs on GKE and writes records (from Kafka source) to BigQuery using custom sink. I'm able to write records into BigQuery without any issue. Currently, the records are being written into the sink one by one so each kafka message gets its own insert api call to BigQuery which is not ideal since we need to perform bulk inserts and inserting each record individually would be very expensive. I'm using bigquery-storagewrite api for this.
New Requirement:
Buffer records before writing into BigQuery Ideally I want to buffer records based on size/time before writing them to the sink.
I'm not sure how to implement this in Flink, hence I'm looking for ways to get this functionality implemented
You can probably use process window function on your data stream.
For example if you use tumbling window with 10 second it will collect all the record with in that time frame and then you can sink
input.keyBy(t -> t.f0).window(TumblingEventTimeWindows.of(Time.minutes(5))).process(new MyProcessWindowFunction());