Search code examples
spring-bootapache-kafkaspring-kafkakafka-producer-apijava-21

Failed to construct kafka producer when sending list of strings


I am new with Kafka and I am trying to read a text file and create a list of strings that I want to send for the consumers. I am using Java 21 and Spring Boot 3.2.0 (SNAPSHOT).

This is the Kafka producer configuration:

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    public Map<String, Object> producerConfig(){
        Map<String,Object> properties = new HashMap<>();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ListSerializer.class.getName());

        return properties;
    }

    @Bean
    public ProducerFactory<String, List<String>> producerFactory(){
        return new DefaultKafkaProducerFactory<>(producerConfig());
    }

    @Bean
    public KafkaTemplate<String, List<String>> kafkaTemplate(ProducerFactory<String,List<String>> producerFactory){
        return new KafkaTemplate<>(producerFactory);
    }
}

This is the Topic configuration:

@Configuration
public class KafkaTopicConfig {
    
    @Bean
    public NewTopic alexTopic(){
        return TopicBuilder.name("securityTopic")
                .build();
    }
}

This is the "application.properties":

spring.servlet.multipart.enabled=true
spring.servlet.multipart.max-file-size=10MB
spring.servlet.multipart.max-request-size=10MB
spring.kafka.bootstrap-servers=192.168.241.133:9092

This is how I am receiving the file:

public Optional uploadFile(MultipartFile file){
    if(file.isEmpty()){
        return Optional.of(new EmptyFileException("File is empty please double check"));
    }
    
    return Optional.of(this.upload.uploadFile(file));
}

This is how I am sending the "List<String>" to kafka:

public UploadFileService(KafkaTemplate<String, List<String>> kafkaTemplate) {
    this.kafkaTemplate = kafkaTemplate;
}

@Override
public String uploadFile(MultipartFile file) {
    try(BufferedReader reader = new BufferedReader(
                                    new InputStreamReader(file.getInputStream()))){

        String line;
        int i=0;
        List<String> list = new ArrayList<>();
        while ((line=reader.readLine())!=null) {
            if(i==10){
                this.kafkaTemplate.send("securityTopic",list);

If I am sending pure Strings and I am using:

StringSerializer.class

instead of:

ListSerializer.class.getName()

in the Kafka producer configuration, and also change the the other generics to String instead of List<String> is working. But how can I send List<String> then? Again I am using Kafka for 2 or 3 days and I don't know if I have to create my own serializer or use something that is built in?

The error that gives me is as in the title: "Failed to construct kafka producer".

I have tried also to use "ListSerializer.class" also but is not working.

I also tried to use "JsonSerialize.class" and is still the same error.

Thank you for your time to read this!


Solution

  • It was the wrong import. the correct import is:

    "import org.springframework.kafka.support.serializer.JsonSerializer;"