I am using a Producer to send messages to a Kafka topic.
When JUnit testing, I have found that the producer in my application code (but not in my JUnit test class) is sending a null key, despite me providing a String key for it to use.
Code as follows:
Main application class
final Producer<String, HashSet<String>> actualApplicationProducer;
ApplicationInstance(String bootstrapServers) // constructor
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "ActualClient");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
actualApplicationProducer = new KafkaProducer<>(props);
}
public void doStuff()
{
HashSet<String> values = new HashSet<String>();
String key = "applicationKey";
// THIS LINE IS SENDING A NULL KEY
actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values));
}
But, in my junit classes:
@EmbeddedKafka
@ExtendWith(SpringExtension.class)
@SuppressWarnings("static-method")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class CIFFileProcessorTests
{
/** An Embedded Kafka Broker that can be used for unit testing purposes. */
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@BeforeAll
public void setUpBeforeClass(@TempDir File globalTablesDir, @TempDir File rootDir) throws Exception
{
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "JUnitClient");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CustomSerializer.class.getName());
props.put(ProducerConfig.LINGER_MS_CONFIG, lingerBatchMS);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, Math.min(maxBatchSizeBytes,1000000));
try(Producer<String, HashSet<String>> junitProducer = new Producer<>(props))
{
HashSet<String> values = new HashSet<>();
// Here, I'm sending a record, just like in my main application code, but it's sending the key correctly and not null
junitProducer.send(new ProducerRecord<>(topicName,"junitKey",values));
}
@Test
public void test()
{
ApplicationInstance sut = new ApplicationInstance(embeddedKafkaBroker.getBrokersAsString());
sut.doStuff();
// "records" is a LinkedBlockingQueue, populated by a KafkaMessageListenerContainer which is monitoring the topic for records using a MessageListener
ConsumerRecord<String, HashSet<String>> record = records.poll(1,TimeUnit.SECONDS);
assertEquals("junitKey", record.key()); // TEST FAILS - expected "junitKey" but returned null
}
Custom serializer:
try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos))
{
oos.writeObject(object);
return baos.toByteArray();
}
Does anyone know why the KafkaProducer
would send a null key when I explicitly specify a String?
--- Update ---
I have tried inspecting the metadata, and the Producer is indeed sending the key, and not null:
RecordMetadata info = actualApplicationProducer.send(new ProducerRecord<>(topicName, key, values)).get();
System.out.println("INFO - partition: " + info.partition() + ", topic: " + info.topic() + ", offset: " + info.offset() + ", timestamp: "+ info.timestamp() + ", keysize: " + info.serializedKeySize() + ", valuesize: " + info.serializedValueSize());
output:
INFO - partition: 0, topic: topicName, offset: 2, timestamp: 1656060840304, keysize: 14, valuesize: 6258
The keysize being > 0 shows that null is not passed to the topic.
So, the issue must be with the reading of the topic, perhaps?
Turns out, I was using a different Deserializer class for my KafkaMessageListenerContainer
, which didn't know what to do with the String as provided