Search code examples
mongodbapache-kafka-connectmongodb-kafka-connector

Using mongo-kafka as sink connector, how do I set a field's value to be of Date type?


I have a mongo sink connector as well as a schema registry.

I configured the mongo sink connector to access the schema registry similar to: https://github.com/mongodb/mongo-kafka/blob/master/docs/sink.md#configuration-example-for-avro

I created a schema following this: https://github.com/mongodb/mongo-kafka/blob/master/docs/sink.md#logical-types . It looks something like this:

{
  "type": "record",
  "name": "MyLogicalTypesRecord",
  "namespace": "com.mongodb.kafka.data.kafka.avro",
  "fields": [
    {
      "name": "myTimestampMillisField",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}

However, when the records go through, the data looks like this: { "myTimestampMillisField": 1572035138104 }, instead of something similar to this { "myTimestampMillisField": ISODate("2019-10-25T20:28:19.628Z") }.

I checked the schema registry to make sure the logical type is there and that looks good.

I'm not sure what I'm doing wrong or if there is a better way to set as Date type in mongo. Any ideas?


Solution

  • I figured it out.

    https://docs.confluent.io/current/connect/transforms/timestampconverter.html

    "transforms": "TimestampConverter",
    "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value",
    "transforms.TimestampConverter.field": "myTimestampMillisField",
    "transforms.TimestampConverter.target.type": "Date"