Search code examples
javaapache-kafkaapache-flink

Accessing Kafka Topic with two process


I have a Kafka producer class which works fine. The producer fills the Kafka topic. Its code is in following:

public class kafka_test {
private final static String TOPIC = "flinkTopic";
private final static String BOOTSTRAP_SERVERS = "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092";
public FlinkKafkaConsumer<String> createStringConsumerForTopic(
        String topic, String kafkaAddress, String kafkaGroup) {
    //        ************************** KAFKA Properties ******
    Properties props = new Properties();
    props.setProperty("bootstrap.servers", kafkaAddress);
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
            topic, new SimpleStringSchema(), props);
    myconsumer.setStartFromLatest();
    return myconsumer;
}
  private static Producer<Long, String> createProducer() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "MyKafkaProducer");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    return new KafkaProducer<>(props);
}

public void runProducer(String msg) throws Exception {
    final Producer<Long, String> producer = createProducer();

    try {
            final ProducerRecord<Long, String> record = new ProducerRecord<>(TOPIC, msg );
            RecordMetadata metadata = producer.send(record).get();
            System.out.printf("sent record(key=%s value='%s')" + " metadata(partition=%d, offset=%d)\n",
                    record.key(), record.value(), metadata.partition(), metadata.offset());
    } finally {
        producer.flush();
        producer.close();
    }
 }
}

  public class producerTest {
  public static void main(String[] args) throws Exception{
    kafka_test objKafka=new kafka_test();
    String pathFile="/home/cfms11/IdeaProjects/pooyaflink2/KafkaTest/quickstart/lastDay4.csv";
    String delimiter="\n";
   objKafka.createStringProducer("flinkTopic",
   "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
    Scanner scanner = new Scanner(new File(pathFile));
    scanner.useDelimiter(delimiter);
    int i=0;
    while(scanner.hasNext()){
        if (i==0)
            TimeUnit.MINUTES.sleep(1);
         objKafka.runProducer(scanner.next());
       i++;
    }
    scanner.close();
    }
   }

Because, I want to provide data for my Flink program, so, I use Kafka. In fact, I have this part code to consume data from Kafka topic:

    Properties props = new Properties();
    props.setProperty("bootstrap.servers", 
    "10.32.0.2:9092,10.32.0.3:9092,10.32.0.4:9092");
    props.setProperty("group.id", kafkaGroup);
    FlinkKafkaConsumer<String> myconsumer = new FlinkKafkaConsumer<>(
            "flinkTopic", new SimpleStringSchema(), props);
    DataStream<String> text =   env.addSource(myconsumer).setStartFromEarliest());

I want to run Producer code at the same time that my program is running. My goal is that Producer send one record to the topic and consumer can poll that record from topic at the same time.

Would you please tell me how it is possible and how to manage it.


Solution

  • I think you need create two class file, one is the producer, the other is the consumer. Create topic first and then run the consumer, or run the producer directly.