Search code examples
apache-sparkapache-kafkaspark-avro

Avro write java.sql.Timestamp conversion error


I need to write a timestamp to Kafka partition and then read it from it. I have defined an Avro schema for that:

{ "namespace":"sample",
  "type":"record",
  "name":"TestData",
  "fields":[
    {"name": "update_database_time", "type": "long", "logicalType": "timestamp-millis"}
  ]
}

However, I get a conversion error in the producer.send line:

java.lang.ClassCastException: java.sql.Timestamp cannot be cast to java.lang.Long

How can I fix this?

Here is the code for writing timestamp to Kafka:

    val tmstpOffset = testDataDF
      .select("update_database_time")
      .orderBy(desc("update_database_time"))
      .head()
      .getTimestamp(0)

    val avroRecord = new GenericData.Record(parseAvroSchemaFromFile("/avro-offset-schema.json"))
    avroRecord.put("update_database_time", tmstpOffset)

    val producer = new KafkaProducer[String, GenericRecord](kafkaParams().asJava)
    val data = new ProducerRecord[String, GenericRecord]("app_state_test7", avroRecord)
    producer.send(data)

Solution

  • Avro doesn't support time for timestamp directly, but logically by long. So you can convert it to long and use it as below. unix_timestamp() function is used for conversion, but if you have a specific date format, use the unix_timestamp(col, dataformat) overloaded function.

    import org.apache.spark.sql.functions._
    val tmstpOffset = testDataDF
          .select((unix_timestamp("update_database_time")*1000).as("update_database_time"))
          .orderBy(desc("update_database_time"))
          .head()
          .getTimestamp(0)