I want to implement a custom s3 partitioner class to include some avro message fields and some extra logic to generate the output s3 path prefix
The project is in kotlin, this is my class:
package co.kafkaProcessor.connect
import io.confluent.connect.storage.errors.PartitionException
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner
import org.apache.kafka.connect.errors.ConnectException
import org.apache.kafka.connect.sink.SinkRecord
import org.joda.time.DateTime
import org.joda.time.DateTimeZone
import org.slf4j.Logger
import org.slf4j.LoggerFactory
class MachineAwareHourlyPartitioner<T> : TimeBasedPartitioner<T>() {
private val log: Logger = LoggerFactory.getLogger(MachineAwareHourlyPartitioner::class.java)
private lateinit var environmentName: String
override fun configure(config: MutableMap<String, Any>?) {
super.configure(config)
environmentName = config!!["environment.prefix"] as String
}
private fun encodedPartitionForTimestamp(sinkRecord: SinkRecord, timestamp: Long?): String? {
// Our custom logic goes here
}
}
at first I've tried by creating a custom shadowJar task to generate the Jar file:
tasks {
withType<ShadowJar> {
mergeServiceFiles()
append("META-INF/spring.handlers")
append("META-INF/spring.schemas")
append("META-INF/spring.tooling")
transform(PropertiesFileTransformer::class.java) {
paths = listOf("META-INF/spring.factories")
mergeStrategy = "append"
}
}
// Custom jars for kafka connect
create<ShadowJar>("kafkaConnectUtilsJar") {
archiveClassifier.set("connect-utils")
include("co/kafkaProcessor/connect/**")
include("co/kafkaProcessor/serializer/**")
from(project.sourceSets.main.get().output)
configurations = listOf(project.configurations.runtimeClasspath.get())
}
}
but doing jar -tvf filename.jar
showed that it only included my own code and kafka connect failed with java.lang.ClassNotFoundException: io.confluent.connect.storage.partitioner.TimeBasedPartitioner
.
I thought that you're not supposed to include kakfa connect code in your custom jar, also because if I try to configure the task with TimeBasedPartitioner
it works so the class is available.
I've then tried to include the storage partitioner too by changing the custom jar definition to:
tasks {
withType<ShadowJar> {
mergeServiceFiles()
append("META-INF/spring.handlers")
append("META-INF/spring.schemas")
append("META-INF/spring.tooling")
transform(PropertiesFileTransformer::class.java) {
paths = listOf("META-INF/spring.factories")
mergeStrategy = "append"
}
}
// Custom jars for kafka connect
create<ShadowJar>("kafkaConnectUtilsJar") {
archiveClassifier.set("connect-utils")
dependencies {
include(dependency("io.confluent:kafka-connect-storage-partitioner:10.2.4"))
}
from(project.sourceSets.main.get().output)
configurations = listOf(project.configurations.runtimeClasspath.get())
}
}
this includes all my application code unfortunately, but I can see the partitioner being included in the jar file.
Kafka connect now fails with this error:
java.lang.ClassCastException: class co.kafkaProcessor.connect.MachineAwareHourlyPartitioner cannot be cast to class io.confluent.connect.storage.partitioner.Partitioner (co.kafkaProcessor.connect.MachineAwareHourlyPartitioner is in unnamed module of loader 'app'; io.confluent.connect.storage.partitioner.Partitioner is in unnamed module of loader org.apache.kafka.connect.runtime.isolation.PluginClassLoader @63a6dffd)
at io.confluent.connect.s3.S3SinkTask.newPartitioner(S3SinkTask.java:196)
at io.confluent.connect.s3.S3SinkTask.start(S3SinkTask.java:117)
at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:312)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:186)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Update: I've also tried to change how the function was overridden by overriding the public methods encodePartition
however that didn't change.
I've also tried to add a test like this (that hopefully should try to cast to Partitioner`:
val partitioner = MachineAwareHourlyPartitioner<String>()
val implementedPartitioner = partitioner as Partitioner<String>
which didn't fail
I was able to get the partitioner working by adding my jar file (without any included dependencies) into the s3 connector directory:
/usr/share/confluent-hub-components/confluentinc-kafka-connect-s3/lib/
I'm not sure if this is related to the plugin isolation that avoids libraries from different plugins to interfere to each other but in my original try I had the plugin in the main classpath /usr/share/java/kafka/
which I thought would made it available to every plugin
Just as an extra detail, we also use another custom class to override the avro TopicNameStrategy
which didn't work in the s3 connector folder, I had to copy the jar also in the /usr/share/java/kafka/
directory to fix this, not sure why one works in the global folder and the other doesn't