I'm using both the S3 and JDBC sink connectors and I'm experiencing a bit of weird behavior about my data being stored. For some reconciliation I would really like to keep the either the Kafka Ingestion time or the record producing time into the data that is stored in the Sink system.
I was looking in the documentation and I did not find this. I'm using the Confluent connectors but I could also use other connectors like Camel if it would allow me to do this.
Can someone give me some pointers on this?
UPDATE: Based on the good feedback from onecricketeer I understood I should be looking at this: https://docs.confluent.io/5.5.0/connect/transforms/insertfield.html#insertfield
And I also saw this example: Kafka connect consumer referencing offset and storing in message
I will test it but do I understand correctly for example that in theory I could do something like this:
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertField.offset.field": "recordOffset"
"transforms.InsertField.partition.field": "recordPartition"
"transforms.InsertField.timestamp.field": "recordTimestamp"
And this would create me 3 new properties in the record called recordOffset, recordPartition and recordTimestamp containing the values described.
And If I wanted to ensure that the values would always be there or fail I would need to do (not sure I understood the suffix part):
"transforms": "InsertField",
"transforms.InsertField.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"!transforms.InsertField.offset.field": "recordOffset"
"!transforms.InsertField.partition.field": "recordPartition"
"!transforms.InsertField.timestamp.field": "recordTimestamp"
As @OneCricketeer
says, the InsertField
Single Message Transform does the job here. Here's a sample S3 sink configuration using it:
{
"connector.class" : "io.confluent.connect.s3.S3SinkConnector",
"storage.class" : "io.confluent.connect.s3.storage.S3Storage",
"s3.region" : "us-west-2",
"s3.bucket.name" : "rmoff-smt-demo-01",
"topics" : "customers,transactions",
"tasks.max" : "4",
"flush.size" : "16",
"format.class" : "io.confluent.connect.s3.format.json.JsonFormat",
"schema.generator.class" : "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"schema.compatibility" : "NONE",
"partitioner.class" : "io.confluent.connect.storage.partitioner.DefaultPartitioner",
"transforms" : "insertTS,formatTS",
"transforms.insertTS.type" : "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.insertTS.timestamp.field" : "messageTS",
"transforms.formatTS.type" : "org.apache.kafka.connect.transforms.TimestampConverter$Value",
"transforms.formatTS.format" : "yyyy-MM-dd HH:mm:ss:SSS",
"transforms.formatTS.field" : "messageTS",
"transforms.formatTS.target.type" : "string"
}
Note that it also uses TimestampConverter
to format the timestamp in a string - by default it's an epoch.
Your question prompted me to write this up properly and record a little tutorial - you can see it here: https://youtu.be/3Gj_SoyuTYk