tl;dr; how can I implement a Kafka Converter which uses headers?
(when using Confluent Replicator)
I have made a custom Kafka Connect Converter, and as I understand it, the toConnectData
is used when deserializing messages.
There are 2 functions in the interface, the second one includes Headers and mentions it is the function to be called by the Connect system, while the first exists for backwards compatibility.
The two interfaces:
byte[] fromConnectData(String topic, Schema schema, Object value);
byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value)
In reality I am finding the first to be used instead — and for my use-case, I need the headers to perform the function.
Example Converter implementation
package com.example;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.storage.Converter;
public class ExampleConverter implements Converter {
...
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
throw new RuntimeException("headers not supplied, these are required in order to decrypt");
}
@Override
public SchemaAndValue toConnectData(String topic, Headers headers, byte[] value) {
return new SchemaAndValue(Schema.BYTES_SCHEMA, null);
}
}
I run this converter using Confluent's Connect container image confluentinc/cp-enterprise-replicator:7.7.0
I get the following error — which clearly indicates it is calling the older (deprecated?) function, without headers:
java.lang.RuntimeException: headers not supplied, these are required in order to decrypt
at com.example.ExampleConverter.toConnectData(ExampleConverter.java:50)
at io.confluent.connect.replicator.ReplicatorSourceTask.convertKeyValue(ReplicatorSourceTask.java:637)
at io.confluent.connect.replicator.ReplicatorSourceTask.poll(ReplicatorSourceTask.java:536)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.poll(AbstractWorkerSourceTask.java:488)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:360)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:80)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Seeking advice — am I doing something wrong?
Response from Confluent — this is not supported by Confluent Replicator
Thank you for contacting Confluent. You are right, Converter interface has 2 implementations for both fromConnectData() or toConnectData():
value); byte[] fromConnectData(String topic, Headers headers, Schema schema, Object value) toConnectData(String topic, byte[] value) toConnectData(String topic, Headers headers, byte[] value)
Replicator calls fromConnectData() and then toConnectData() without using Headers this is why toConnectData(String topic, Headers headers, byte[] value) is not called. If you want to perform custom conversion on headers, you can use set
header.converter=com.example.ExampleConverter
instead.