Search code examples
amazon-s3apache-kafkaapache-kafka-connectconfluent-schema-registrys3-kafka-connector

How to extract nested field from Envelop type schema in s3 sink connector


Avro schema :

{
  "type": "record",
  "name": "Envelope",
  "namespace": "test",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "id",
              "type": {
                "type": "long",
                "connect.default": 0
              },
              "default": 0
            },
            {
              "name": "createdAt",
              "type": [
                "null",
                {
                  "type": "string",
                  "connect.version": 1,
                  "connect.name": "io.debezium.time.ZonedTimestamp"
                }
              ],
              "default": null
            },
           
          ],
          "connect.name": "test.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    }
   
  ],
  "connect.name": "test.Envelope"
}

S3-sink connector configuration:

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "behavior.on.null.values": "ignore",
  "s3.region": "us-west-2",
  "flush.size": "1",
  "tasks.max": "3",
  "timezone": "UTC",
  "locale": "US",
  "format.class": "io.confluent.connect.s3.format.json.JsonFormat",
  "aws.access.key.id": "---",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter",
  "s3.bucket.name": "test-s3-sink-created-at-partition",
  "partition.duration.ms": "1000",
  "topics.regex": "test_topic",
  "aws.secret.access.key": "---",
  "partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
  "value.converter.schemas.enable": "false",
  "name": "s3-sink-created-at-partition",
  "errors.tolerance": "all",
  "storage.class": "io.confluent.connect.s3.storage.S3Storage",
  "path.format": "YYYY/MM/dd",
  "timestamp.extractor": "RecordField",
  "timestamp.field": "createdAt"
}

Error:

    org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:591)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:239)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: The field 'createdAt' does not exist in
   Caused by: org.apache.kafka.connect.errors.DataException: Unable to find nested field 'createdAt'

Problem facing:

Currently, I am trying to get data from test-topic using above sink connector using createAt field in s3 bucket but it continuously throwing error for createdAt field. And s3-bucket is not created using above configuration. Please provide your suggestions on this.


Solution

  • You should be able to use after.Value.createdAt - See my PR

    But the better option is to unwrap the envelope, like you're asking.

    s3-bucket is not created using above configuration

    You need to create the bucket ahead of time.