Error to serialize message when sending to kafka topic

i need to test a message, which contains headers, so i need to use MessageBuilder, but I can not serialize.

I tried adding the serialization settings on the producer props but it did not work.

Can someone help me?

this error:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

My test class:

public class TransactionMastercardAdapterTest extends AbstractTest{

private KafkaTemplate<String, Message<String>> template;

public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

public static void setUp() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("", embeddedKafka.getZookeeperConnectionString());

public void sendTransactionCommandTest(){

    String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
            + "\"cardId\" : \"11\","
            + "\"transactionId\" : \"20110405123456\","
            + "\"amount\" : 200.59,"
            + "\"partnerId\" : \"11\"}";

    Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, Message<String>> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, Message<String>> ("notification_topic", MessageBuilder.withPayload(payload)
            .setHeader("status", "RECEIVED")
            .setHeader("service", "MASTERCARD")

    Map<String, Object> configs = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka);

    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);

    Consumer<byte[], byte[]> consumer = cf.createConsumer();
    ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);




  • I'd say the error is obvious:

    Can't convert value of class to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

    Where your value is GenericMessage, but StringSerializer can work only with strings.

    What you need is called JavaSerializer which does not exist, but not so difficult to write:

    public class JavaSerializer implements Serializer<Object> {
        public byte[] serialize(String topic, Object data) {
            try {
                ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
                ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
                return byteStream.toByteArray();
            catch (IOException e) {
                throw new IllegalStateException("Can't serialize object: " + data, e);
        public void configure(Map<String, ?> configs, boolean isKey) {
        public void close() {

    And configure it for that value.serializer property.