Search code examples
apache-flinkflink-sqlflink-batch

Does the flink 1.7.2 dataset not support kafka sink?


Does the flink 1.7.2 dataset not support kafka sink ?

After doing the batch operation I need to publish the message to kafka, meaning source is my postgres and sink is my kafka.

Is it possible ?


Solution

  • You can create your own output format and use Kafka Producer to produce to Kafka. Check the code below.

    ...
    data.output(new KafkaOPFormat());
    env.execute();
    
    import java.io.IOException;
    import java.util.Properties;
    import org.apache.flink.api.common.io.RichOutputFormat;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.configuration.Configuration;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class KafkaOPFormat extends RichOutputFormat<Tuple2<String, String>> {
    
      private final Properties properties = new Properties();
      private KafkaProducer<String, String> producer;
    
      @Override
      public void configure(Configuration configuration) {
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("batch.size", 16384);
        properties.put("linger.ms", 1);
        properties.put("buffer.memory", 33554432);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
      }
    
      @Override
      public void open(int i, int i1) throws IOException {
        producer = new KafkaProducer<String, String>(properties);
      }
    
      @Override
      public void writeRecord(Tuple2<String, String> record) throws IOException {
        producer.send(new ProducerRecord<>(record.f0, record.f1));
      }
    
      @Override
      public void close() throws IOException {
        producer.close();
      }
    }
    

    PS: I do not remember all the configs, do check for your configuration and alter accordingly.