Search code examples
hadoopapache-kafkaapache-kafka-connectorc

Kafka HDFS Sink Connector error.[Top level type must be STRUCT..]


I'm testing the 2.7 version of Kafka with Kafka connect, and i'm facing with problem that i don't understand.

I started distributed connector first with configuration like below.

bootstrap.servers=..:9092,...:9092, ...
group.id=kafka-connect-test
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter

... some internal topic configuration

plugin.path=<plugin path>

This connector serviced with 8083 port.

And I want to write ORC format data with snappy codec at HDFS.
so i made new Kafka HDFS connector with REST API with json data like below. and i don't use schema-registry.

curl -X POST <connector url:8083> \
-H Accept: application/json \
-H Content-Type: application/json \
-d
{
    "name": "hdfs-sinkconnect-test",
    "config": {
        "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
        "store.url": "hdfs:~",
        "hadoop.conf.dir": "<my hadoop.conf dir>",
        "hadoop.home": "<hadoop home dir>",
        "tasks.max": "5",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
        "format.class": "io.confluent.connect.hdfs.orc.OrcFormat",
        "flush.size": 1000,
        "avro.codec": "snappy",
        "topics": "<topic name>",
        "topics.dir": "/tmp/connect-logs",
        "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
        "locale": "ko_KR",
        "timezone": "Asia/Seoul",
        "partition.duration.ms": "3600000",
        "path.format": "'hour'=YYYYMMddHH/"
    }
}

Then i have error message like this.

# connectDistributed.out

[2021-06-28 17:14:11,596] ERROR Exception on topic partition <topic name>-<partition number>:  (io.confluent.connect.hdfs.TopicPartitionWriter:409)
org.apache.kafka.connect.errors.ConnectException: Top level type must be STRUCT but was bytes
        at io.confluent.connect.hdfs.orc.OrcRecordWriterProvider$1.write(OrcRecordWriterProvider.java:98)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:742)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:385)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:333)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:126)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

I think this error message have something to do with Schema Information. Is Schema Registry essential for Kafka Connector?
Any ideas or solutions to solve this error messsage? Thanks.


Solution

  • Writing ORC files requires a Struct type.

    The options provided by Confluent include plain JSON, JSONSchema, Avro, or Protobuf. The only option that doesn't require the Registry is the plain JsonConverter

    Note that key.deserializer and value.deserializer are not valid Connect properties. You need to refer to your key.converter and value.converter properties instead

    If you're not willing to modify the converter, you can attempt to use a HoistField transformer to create a Struct, and this will create an ORC file with a schema of only one field