Search code examples
javaapache-kafkajunitkafka-producer-api

Kafka producer sending null key instead of string


I am using a Producer to send messages to a Kafka topic.

When JUnit testing, I have found that the producer in my application code (but not in my JUnit test class) is sending a null key, despite me providing a String key for it to use.

Code as follows:

Main application class

final Producer<String, HashSet<String>> actualApplicationProducer;

ApplicationInstance(String bootstrapServers) // constructor
{
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "ActualClient");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
    props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));

    actualApplicationProducer = new KafkaProducer<>(props);
}

public void doStuff()
{
    HashSet<String> values = new HashSet<String>();
    String key = "applicationKey";
    // THIS LINE IS SENDING A NULL KEY
    actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values));
}

But, in my junit classes:

@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@SuppressWarnings("static-method")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CIFFileProcessorTests 
{
    /** An Embedded Kafka Broker that can be used for unit testing purposes. */
    @Autowired
    private EmbeddedKafkaBroker embeddedKafkaBroker;

@BeforeAll
    public void setUpBeforeClass(@TempDir File globalTablesDir, @TempDir File rootDir) throws Exception 
    {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.CLIENT_ID_CONFIG, "JUnitClient");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
    props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
        try(Producer<String, HashSet<String>> junitProducer = new Producer<>(props))
        {
            HashSet<String> values = new HashSet<>();
            // Here, I'm sending a record, just like in my main application code, but it's sending the key correctly and not null
            junitProducer.send(new ProducerRecord<>(topicName,"junitKey",values));
        }

    @Test
    public void test()
    {
        ApplicationInstance sut = new ApplicationInstance(embeddedKafkaBroker.getBrokersAsString());
sut.doStuff();
        
        // "records" is a LinkedBlockingQueue, populated by a KafkaMessageListenerContainer which is monitoring the topic for records using a MessageListener
        ConsumerRecord<String, HashSet<String>> record = records.poll(1,TimeUnit.SECONDS);
        assertEquals("junitKey", record.key()); // TEST FAILS - expected "junitKey" but returned null
    }

Custom serializer:

try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
                  ObjectOutputStream oos = new ObjectOutputStream(baos)) 
        {
            oos.writeObject(object);
            return baos.toByteArray();
        }

Does anyone know why the KafkaProducer would send a null key when I explicitly specify a String?

--- Update ---

I have tried inspecting the metadata, and the Producer is indeed sending the key, and not null:

RecordMetadata info = actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values)).get(); 
System.out.println("INFO - partition: " + info.partition() + ", topic: " + info.topic() + ", offset: " + info.offset() + ", timestamp: "+ info.timestamp() + ", keysize: " + info.serializedKeySize() + ", valuesize: " + info.serializedValueSize());

output:

INFO - partition: 0, topic: topicName, offset: 2, timestamp: 1656060840304, keysize: 14, valuesize: 6258

The keysize being > 0 shows that null is not passed to the topic.

So, the issue must be with the reading of the topic, perhaps?


Solution

  • Turns out, I was using a different Deserializer class for my KafkaMessageListenerContainer, which didn't know what to do with the String as provided