I used kafka-connect
to stream a table from MySQL
to a kafka topic. The table contained some columns with datetime(6) column type liked this 1611290740285818
.
When I converted this timestamp to string using ksqlDB using:
SELECT TIMESTAMPTOSTRING(my_timestamp, 'yyyy-MM-dd HH:mm:ss.SSSSSS','UTC') AS DT6
FROM my_topic
EMIT CHANGES;
The displayed string was actually +53114-10-20 14:12:20.712000
, while the actual time was supposed to be 2021-01-22 04:45:40.285818
.
Can anyone advise what was wrong with my query?
@Aydin is correct in their answer. The bigint value you you shared is microseconds, and ksqlDB's TIMESTAMPTOSTRING
function expects milliseconds. The time format string that you specified is just telling ksqlDB how to format the timestamp, not how to interpret it. Here's an example:
-- Create a sample stream
ksql> CREATE STREAM TMP (TS BIGINT) WITH (KAFKA_TOPIC='TMP', PARTITIONS=1, VALUE_FORMAT='AVRO');
Message
----------------
Stream created
----------------
-- Populate it with example data
ksql> INSERT INTO TMP VALUES (1611290740285818);
-- Query the stream from the beginning
ksql> SET 'auto.offset.reset' = 'earliest';
>
Successfully changed local property 'auto.offset.reset' from 'earliest' to 'earliest'.
-- Reproduce the described behaviour
ksql> SELECT TS, TIMESTAMPTOSTRING(TS, 'yyyy-MM-dd HH:mm:ss.SSSSSS','UTC') FROM TMP EMIT CHANGES;
+--------------------+------------------------------+
|TS |KSQL_COL_0 |
+--------------------+------------------------------+
|1611290740285818 |+53029-10-09 09:11:25.818000 |
^CQuery terminated
By dividing the microseconds by 1000 they become milliseconds, and the function behaves as you would expect:
ksql> SELECT TS,
TIMESTAMPTOSTRING(TS/1000, 'yyyy-MM-dd HH:mm:ss.SSS','UTC')
FROM TMP
EMIT CHANGES;
+------------------+-------------------------+
|TS |KSQL_COL_0 |
+------------------+-------------------------+
|1611290740285818 |2021-01-22 04:45:40.285 |