Search code examples
apache-kafkaapache-kafka-connectconfluent-platforms3-kafka-connector

Can Kafka Sink Connectors include the record timestamp as the payload stored at the storage


I'm using both the S3 and JDBC sink connectors and I'm experiencing a bit of weird behavior about my data being stored. For some reconciliation I would really like to keep the either the Kafka Ingestion time or the record producing time into the data that is stored in the Sink system.

I was looking in the documentation and I did not find this. I'm using the Confluent connectors but I could also use other connectors like Camel if it would allow me to do this.

Can someone give me some pointers on this?

UPDATE: Based on the good feedback from onecricketeer I understood I should be looking at this: https://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield

And I also saw this example: Kafka connect consumer referencing offset and storing in message

I will test it but do I understand correctly for example that in theory I could do something like this:

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "recordOffset"
"transforms.InsertField.partition.field": "recordPartition"
"transforms.InsertField.timestamp.field": "recordTimestamp"

And this would create me 3 new properties in the record called recordOffset, recordPartition and recordTimestamp containing the values described.

And If I wanted to ensure that the values would always be there or fail I would need to do (not sure I understood the suffix part):

"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"!transforms.InsertField.offset.field": "recordOffset"
"!transforms.InsertField.partition.field": "recordPartition"
"!transforms.InsertField.timestamp.field": "recordTimestamp"

Solution

  • As @OneCricketeer says, the InsertField Single Message Transform does the job here. Here's a sample S3 sink configuration using it:

    {
              "connector.class"        : "io.confluent.connect.s3.S3SinkConnector",
              "storage.class"          : "io.confluent.connect.s3.storage.S3Storage",
              "s3.region"              : "us-west-2",
              "s3.bucket.name"         : "rmoff-smt-demo-01",
              "topics"                 : "customers,transactions",
              "tasks.max"              : "4",
              "flush.size"             : "16",
              "format.class"           : "io.confluent.connect.s3.format.json.JsonFormat",
              "schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
              "schema.compatibility"   : "NONE",
              "partitioner.class"      : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
              "transforms"                          : "insertTS,formatTS",
              "transforms.insertTS.type"            : "org.apache.kafka.connect.transforms.InsertField$Value",
              "transforms.insertTS.timestamp.field" : "messageTS",
              "transforms.formatTS.type"            : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
              "transforms.formatTS.format"          : "yyyy-MM-dd HH:mm:ss:SSS",
              "transforms.formatTS.field"           : "messageTS",
              "transforms.formatTS.target.type"     : "string"        
            }
    

    Note that it also uses TimestampConverter to format the timestamp in a string - by default it's an epoch.

    Your question prompted me to write this up properly and record a little tutorial - you can see it here: https://youtu.be/3Gj_SoyuTYk