I have a kafka topic on which i am sending the data through Kafka Producer. Now at consumer side there are two option with me.
1. Using KafkaConsumer - Code of kafkaConsumer is below, which reads the data from topic and its working fine.
@EnableKafka
@Configuration
@PropertySource("kaafka.properties")
public class RawEventKafkaConsumer {
private static final Logger logger = LoggerFactory.getLogger(RawEventKafkaConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;
private PolicyExecutor policyExecutor;
public RawEventKafkaConsumer() {
policyExecutor = new PolicyExecutor();
}
@Value("${spring.kafka.topic}")
private String rawEventTopicName;
@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;
@Value("${spring.kafka.consumer.bootstrap-servers}")
private String bootStrapServer;
@Value("${spring.kafka.consumer.group-id}")
private String groupId;
@Value("${spring.kafka.producer.key-serializer}")
private String keySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String valueSerializer;
@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;
@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;
@Bean
public DefaultKafkaConsumerFactory<String, BaseDataModel> rawEventConsumer() {
Map<String, Object> consumerProperties = new HashMap<>();
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServer);
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean(name="kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> kafkaListenerContainerFactory() {
logger.info("kafkaListenerContainerFactory called..");
ConcurrentKafkaListenerContainerFactory<String, BaseDataModel> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(rawEventConsumer());
return factory;
}
@KafkaListener(topics = "rawEventTopic", containerFactory = "kafkaListenerContainerFactory")
public void listen(String baseDataModel) {
ObjectMapper mapper = new ObjectMapper();
BaseDataModel csvDataModel;
try {
csvDataModel = mapper.readValue(baseDataModel, BaseDataModel.class);
//saving the datamodel in elastic search.
//dataModelServiceImpl.save(csvDataModel);
System.out.println("Message received " + csvDataModel.toString());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
2. Consume kafkaTopic Data using Spark Stream - Code is Below -
@Service
public class RawEventSparkStreamConsumer {
private final Logger logger = LoggerFactory.getLogger(RawEventSparkStreamConsumer.class);
@Autowired
private DataModelServiceImpl dataModelServiceImpl;
@Autowired
private JavaStreamingContext streamingContext;
@Autowired
private JavaInputDStream<ConsumerRecord<String, String>> messages;
@PostConstruct
private void sparkRawEventConsumer() {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(()->{
messages.foreachRDD((rdd) -> {
System.out.println("RDD coming *************************______________________________---------------------.." + rdd.count());
rdd.foreach(record -> {
System.out.println("Data is comming...." + record);
});
});
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
});
}
}
Both consumer kafka consumer and Spark streaming reads the data from topic sucessfully. Now i have a question , If both are doing the same thing (reading data from topic) then
Thanks.
The short answer is that you require a Spark cluster to run Spark code in a distributed fashion compared to the Kafka Consumer just runs in a single JVM and you run multiple instances of the same application manually to scale it out.
In other words, you'd run them differently. spark-submit
vs java -jar
. I don't believe using Spring changes that
Another difference would be that the "plain consumer" has more control over the Kafka configurations, and you get one record at a time. The Spark RDD can be many events, and they must all be of the same "schema" unless you want complex parsing logics, which is harder to write with RDD objects than with ConsumerRecord
values, which get extracted for you.
In general, I don't think it's a good idea to combine them.
And if they're reading from the same topic, then the Kafka Consumer protocol can only assign one consumer per partition... It's not clear how many partitions your topic has, but that could explain why one would work, but not the other