Search code examples
apache-nifi

Apache NiFi from unix timestamp to actual date not working


I have the following NiFi Flow, with which I am struggling to generate a date, out of a unix timestamp. And I was not able to find a solution since last year :(

First of all, I receive a file from a Kafka Processor. The data comes as a text and it looks as follows:

exclsns1,1671785280,1671785594,1671785608.

The next step is to use a ConvertRecord and generate a Parquet File out of these incoming files. For that, I have generated the following schemas:

Record Reader --> CSV Reader:

{
  "type" : "record",
  "name" : "spark_schema",
  "fields" : [ {
    "name" : "excelReader",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "time",
    "type" : [ "null", "long" ],
    "default" : null
  }, {
    "name" : "starttime",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "endtime",
    "type" : [ "null", "string" ],
    "default" : null 
    } ]
}

Record Writer --> Parquet Record Set Writer

{
  "type" : "record",
  "name" : "spark_schema",
  "fields" : [ {
    "name" : "excelReader",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "time",
    "type" : [ "null", "long" ],
    "default" : null
  }, {
    "name" : "starttime",
    "type": { "type":"int", "logicalType":"date"},
    "default" : null
  }, {
    "name" : "endtime",
    "type": { "type":"long", "logicalType":"timestamp-millis"},
    "default" : null
  } ]
}

Notice that I have tried different types for the data, but none of which solved my issue. The next step is to go into a PartitionRecord Processor, in which I use a ParquetReader and the same Parquet Record Set Writer controllers. Beside that, I have defined 6 properties to help me identify why the data is not converted as expected:

a_endtime --> /endtime
a_endtime_converted --> format(/endtime, "yyyy/MM/dd/HH", "GMT")
a_startime --> /starttime
a_startime_converted --> format(/starttime, "yyyy/MM/dd/HH", "GMT")
a_time  --> /time
a_time_converted --> format(/time, "yyyy/MM/dd/HH", "GMT")

However, once the flowfile gets on the Success Queue after PartitionRecord, I have the following values:

a_endtime
1671785608
a_endtime_converted
1970/01/20/08
a_startime
1671785594
a_startime_converted
1970/01/20/08
a_time
1671785280
a_time_converted
1970/01/20/08

1671785608 = Friday, December 23, 2022 8:53:28 AM

1671785594 = Friday, December 23, 2022 8:53:14 AM

1671785280 = Friday, December 23, 2022 8:48:00 AM

What am I doing wrong and having the same date generated for every value? Has anybody else faced a similar issue and might give me a hint on what to do to solve my issue?

Thank you :)


Solution

  • Unix time counted in seconds since 1/1/1970

    Nifi based on java, and java time counted in milliseconds since 1/1/1970

    So, you have just multiply your value by 1000 before formatting to date