Search code examples
javaapache-kafkaapache-kafka-connectconfluent-platform

Confluent Replicator not passing Headers into Converter#fromConnectData


tl;dr; how can I implement a Kafka Converter which uses headers?
(when using Confluent Replicator)

I have made a custom Kafka Connect Converter, and as I understand it, the toConnectData is used when deserializing messages.

There are 2 functions in the interface, the second one includes Headers and mentions it is the function to be called by the Connect system, while the first exists for backwards compatibility.

The two interfaces:

    byte[] fromConnectData(String topic, Schema schema, Object value);
    byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)

Interface ref: https://github.com/apache/kafka/blob/1eb7644349cb07139d6a3c1ad1986979647cac99/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L52-L68

In reality I am finding the first to be used instead — and for my use-case, I need the headers to perform the function.

Example Converter implementation

package com.example;

import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;

public class ExampleConverter implements Converter {

    ...

    @Override
    public SchemaAndValue toConnectData(String topic, byte[] value) {
        throw new RuntimeException("headers not supplied, these are required in order to decrypt");
    }

    @Override
    public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
        return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
    }

}

I run this converter using Confluent's Connect container image confluentinc/cp-enterprise-replicator:7.7.0

I get the following error — which clearly indicates it is calling the older (deprecated?) function, without headers:

java.lang.RuntimeException: headers not supplied, these are required in order to decrypt
    at com.example.ExampleConverter.toConnectData(ExampleConverter.java:50)
    at io.confluent.connect.replicator.ReplicatorSourceTask.convertKeyValue(ReplicatorSourceTask.java:637)
    at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:536)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:488)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:360)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
    at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)

Seeking advice — am I doing something wrong?


Solution

  • Response from Confluent — this is not supported by Confluent Replicator

    Thank you for contacting Confluent. You are right, Converter interface has 2 implementations for both fromConnectData() or toConnectData():

    value); byte[] fromConnectData(String topic, Headers headers, Schema
    schema, Object value)
    
    toConnectData(String topic, byte[] value) toConnectData(String topic,
    Headers headers, byte[] value)
    

    Replicator calls fromConnectData() and then toConnectData() without using Headers this is why toConnectData(String topic, Headers headers, byte[] value) is not called. If you want to perform custom conversion on headers, you can use set header.converter=com.example.ExampleConverter instead.