Search code examples
apache-kafkagoogle-bigqueryavroapache-kafka-connectconfluent-schema-registry

How to convert a microsecond timestamp to milliseconds in Kafka connect?


While populating BigQuery from Kafka using confluentinc/kafka-connect-bigquery, I'd like to use the timestamp of our domain (Avro) events to (day-) partition the tables.

My connector.properties looks as follows:

[...]
transforms=ConvertTimestamp
transforms.ConvertTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
transforms.ConvertTimestamp.field=metadata_date
transforms.ConvertTimestamp.target.type=Timestamp

timestampPartitionFieldName=metadata_date
[...]
Exception in thread "pool-5-thread-522" com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException: table insertion failed for the following rows:
    [row index 0]: invalid: Timestamp field value is out of range:1597279521000000000
    [row index 1]: invalid: Timestamp field value is out of range:1597279523000000000
[...]

The issue seems to be, our timestamps are in microseconds (UTC Unix epoch)

"type": {
    "type": "long",
    "logicalType": "timestamp-micros"
}

while BigQuery wants milliseconds (or seconds?).

Is there a way to transform the timestamp directly using the connector?


Solution

  • This appears to have been addressed now. You can use the additional transform parameter unix.precision to ensure that Avro fields of type timestamp-micros are parsed correctly:

    transforms=ConvertTimestamp
    transforms.ConvertTimestamp.type=org.apache.kafka.connect.transforms.TimestampConverter$Value
    transforms.ConvertTimestamp.field=metadata_date
    transforms.ConvertTimestamp.target.type=Timestamp
    transforms.ConvertTimestamp.unix.precision=microseconds
    

    It appears that the Confluence documentation has not been updated to reflect this, but the corresponding proposal document sheds some more light on this matter.