Search code examples
kotlinapache-kafkaapache-kafka-streamsavro

How do I store Records/ConsumerRecords in a Kafka StateStore?


I have a Kafka Streams application written in Kotlin. In its current form, the Kafka StateStore in the application is designed to store key-value entries of type <String, CustomAVROSchema>. The StateStore, being a PersistentTimestampedKeyValueStore, is set up to use a SpecificAvroSerde for the value part of the key-value entries, which works like a charm.

The problem with this approach is that I want to keep the header and timestamp values of the incoming record, but the current solution only stores the value. I've done some research, but I cannot find any decent examples of how I can keep the header and timestamp fields.

So, how can I keep the header and timestamp fields through a StateStore operation?

My main approach to this problem what to rewrite the application to store the record itself instead of only the value, meaning that each entry in the StateStore would be of type <String, Record<String, CustomAVROSchema>> ( or <String, ConsumerRecord<String, CustomAVROSchema>>, I don't fully know the difference).

These application changes worked out ok, except when I needed to change the Serde options for the StateStore. I could not find any good replacement for the SpecificAvroSerde used earlier.

As the concept of Records is so integrated into Kafka, I was surprised when came short of finding examples of how such a SerDe configuration could look like. Does Kafka not support this feature out of the box? When reading the documentation, I learned that it's possible to create a custom serialization and deserialization class, but I had problems implementing this. Is this approach the only way to resolve my problem?

All your help would be appreciated 🙂


Solution

  • After going back and forth on my problem, I decided on trying to make my own SerDe.

    I made a StatestoreSerializerclass that implements Kafka Serializer.

    override fun serialize(topic: String, data: Record<String, CustomAVROSchema>): ByteArray? {
        return try {
            val serializedAvro = ...serializer().serialize(data.key(), data.value()).asList()
            val byteHeaders = headersToString(data.headers())
            objectMapper.writeValueAsBytes(StateStoreObject(...))
        } catch (e: Exception) {
            throw SerializationException(
                "Error when serializing record", e
            )
        }
    }
    
    private fun headersToString(headers: Headers): String {
        val map = headers
            .associate { it.key() to it.value() }
            .mapValues { Base64.getEncoder().encodeToString(it.value) }
        return map.toString()
    }
    

    Things to note includes the following:

    • Using ObjectMapper().registerKotlinModule() to enable the use of Kotlin data classes.
    • Encoding the avro object and the record headers independently, placing them into a data class, and encoding the data class.

    The deserializer essentially does the same ting, only in reverse.

    Although un-optimized, the custom SerDe configuration did not add any significant delay compared to only using avro SerDe.