I have to work with time zones and nano second time resolution. I therefore use ZonedDateTime. Apparently Apache Flink does not serialize ZonedDateTime properly. It does serialize the LocalDateTime part as expected, however, it forgets to handle the time zone.
When I log for example a zoned date inside a Flink stream map function I always get something like
2018-03-01T04:10:30.773471918null
Whereas on data inception I get the proper zone
2018-03-01T04:10:30.773471918-05:00
The null refers to the zone. Later of course I get a null pointer exception as I have to use proper time compare, which needs the zone.
How can I fix that easiest? Thanks for a reply.
I do not fully understand why it does not pick up a serializer. This solution at least works: I implemented a Kryo serializer for ZonedDateTime
import com.esotericsoftware.kryo.io.{Input, Output}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.markatta.timeforscala.ZonedDateTime
class ZonedDateTimeSerializer extends Serializer[ZonedDateTime] {
setImmutable(true)
override def write(kryo: Kryo, out: Output, obj: ZonedDateTime): Unit = {
ZonedDateTimeSerializer.write(out, obj)
}
override def read(kryo: Kryo, in: Input, `type`: Class[ZonedDateTime]): ZonedDateTime = {
ZonedDateTimeSerializer.read(in)
}
}
object ZonedDateTimeSerializer {
def write(out: Output, obj: ZonedDateTime): Unit = {
LocalDateSerializer.write(out, obj.toLocalDate)
LocalTimeSerializer.write(out, obj.toLocalTime)
ZoneIdSerializer.write(out, obj.getZone)
}
def read(in: Input): ZonedDateTime = {
val date = LocalDateSerializer.read(in)
val time = LocalTimeSerializer.read(in)
val zone = ZoneIdSerializer.read(in)
ZonedDateTime(date, time, zone)
}
}
I took the implementation from the newest implementation Kyro. Then I registered it as follows:
env.getConfig.registerTypeWithKryoSerializer(classOf[ZonedDateTime], classOf[ZonedDateTimeSerializer])
This seems to fix the problem. Not sure if it comes from the fact that I use timesforscala, but I want to use this library because it adds important additions I depend on. Comments welcome.