Search code examples
apache-kafkaavroapache-kafka-streams

Specifying serdes on a map operation in Kafka Streams


I'm sorry if this by any chance is a duplicate post, but I have been searching for an answer and got nothing so far.

What I need is to specify the serdes during a map operation that changes the key type in Kafka Streams. The original KStream has a string-typed key and avro (GenericRecord) value, but I need to remap it to an avro key and value. Something along these lines:

KStream<String, GenericRecord> inputStream = builder.stream("someTopic");
KStream<GenericRecord, GenericRecord> rekeyedStream = inputStream.map((key, value) -> {
   GenericRecord newKey = new GenericData.Record(someSchema);
   ...
   return new KeyValue(newKey, newValue);
});

I believe I need to specify the serde since the types are being changed, but I found no way to do it on a map operator. When reading from a topic, grouping or writing back to a topic, we can usually do something like the following to overwrite the default serdes:

KStream<GenericRecord, GenericRecord> stream = builder.stream("someTopic", 
    Consumed.with(keySerde, valueSerde));

KGroupedStream<GenericRecord, GenericRecord> groupedStream = 
    inputStream.groupBy((key, value)->somethingThatChangesTheKey(), 
    Grouped.with(newKeySerde, newValueSerde));

inputStream.to("someTopic", Produced.with(keySerde,valueSerde));

And yet I'm at a complete loss as to how to specify the serdes in a map when the types change, and in this particular case I can't be using my app's default serdes.

The closest I got to finding a solution was this post right here, but I'm afraid the accepted response told the OP he needs to specify the serdes, but not how this may be done during a map (at least as far as I understood, I could be mistaken).

Any insight would be appreciated.


Solution

  • You cannot specify a new serde on map() because map() does not need the serde. The map() operator itself gets an input object and produces an output object, but it never serializes or deserializes any messages.

    Only operators that read from or write into a Kafka topic allow you to set a serde, as only those operator would use a serde.

    It's not clear to me, what you try to accomplish by setting a serde on the map() operator. Can you elaborate?