Search code examples
javaapache-kafkaapache-kafka-streams

Using Kafka Streams with Serdes that rely on schema references in the Headers


I'm trying to use Kafka Streams to perform KTable-KTable foreign key joins on CDC data. The data I will be reading is in Avro format, however it is serialized in a manner that wouldn't be compatible with other industry serializer/deserializers (ex. Confluent schema registry) because the schema identifiers are stored in the headers.

When I setup my KTables' Serdes, my Kafka Streams app runs initially, but ultimately fails because it internally invokes the Serializer method with byte[] serialize(String topic, T data); and not a method with headers (ie. byte[] serialize(String topic, Headers headers, T data) in the wrapping serializer ValueAndTimestampSerializer. The Serdes I'm working with cannot handle this and throw an exception.

First question is, does anyone know a way to implore Kafka Streams to call the method with the right method signature internally?

I'm exploring approaches to get around this, including writing new Serdes that re-serialize with the schema identifiers in the message itself. This may involve recopying the data to a new topic or using interceptors.

However, I understand ValueTransformer has access to headers in the ProcessorContext and I'm wondering if there might there be a faster way using transformValues(). The idea is to first read the value as a byte[] and then deserialize the value to my Avro class in the transformer (see example below). When I do this however, I'm getting an exception.

StreamsBuilder builder = new StreamsBuilder();
 final KTable<Long, MySpecificClass> myTable = builder.table(
      "my-topic",
       Consumed.with(Serdes.Long(), Serdes.ByteArray())
    )
    .transformValues(MyDeserializerTransformer::new);

 ...

 KTable<Long, JoinResult> joinResultTable = myTable.join(rightTable, MySpecificClass::getJoinKey, myValueJoiner);

 joinResultTable.toStream()...
public class MyDeserializerTransformer implements
    ValueTransformerWithKey<Long, byte[], MySpecificClass> {
  MyAvroDeserializer deserializer;
  ProcessorContext context;

  @Override
  public void init(ProcessorContext context) {
    deserializer = new MyAvroDeserializer();
    this.context = context;
  }

  @Override
  public MySpecificClass transform(Long key, byte[] value) {
    return deserializer.deserialize(context.topic(), context.headers(), value);
  }

  @Override
  public void close() {

  }
}

When I run this, I receive a ClassCastException. How can I fix this issue or find a work around? Do I need to use a secondary state store?

"class": "org.apache.kafka.streams.errors.StreamsException",
    "msg": "ClassCastException invoking Processor. Do the Processor's input types match the deserialized types? Check the Serde setup and change the default Serdes in StreamConfig or provide correct Serdes via method parameters. Make sure the Processor can accept the deserialized input of type key: java.lang.Long, and value: org.apache.kafka.streams.kstream.internals.Change.\nNote that although incorrect Serdes are a common cause of error, the cast exception might have another cause (in user code, for example). For example, if a processor wires in a store, but casts the generics incorrectly, a class cast exception could be raised during processing, but the cause would not be wrong Serdes.",
    "stack": [
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:185)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)",
      "org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:117)",
      "org.apache.kafka.streams.kstream.internals.KTableTransformValues$KTableTransformValuesProcessor.process(KTableTransformValues.java:87)",
      "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
      "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
      "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",
...
"cause": {
 "class": "java.lang.ClassCastException",
      "msg": "class com.package.MySpecificClass cannot be cast to class [B (com.package.MySpecificClass is in unnamed module of loader 'app'; [B is in module java.base of loader 'bootstrap')",
      "stack": [
        "org.apache.kafka.common.serialization.ByteArraySerializer.serialize(ByteArraySerializer.java:19)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:102)",
        "org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignJoinSubscriptionSendProcessorSupplier$UnbindChangeProcessor.process(ForeignJoinSubscriptionSendProcessorSupplier.java:72)",
        "org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)",
        "org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)",
        "org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)",

Solution

  • I was able to solve this issue by first reading the input topic as a KStream and converting it to a KTable with different Serde as a second step, it seems State Stores are having the issue with not invoking serializer/deserializer method signatures with the headers.