Search code examples
scalaapache-flinkzoneddatetime

Flink Serialization of ZonedDateTime


I have to work with time zones and nano second time resolution. I therefore use ZonedDateTime. Apparently Apache Flink does not serialize ZonedDateTime properly. It does serialize the LocalDateTime part as expected, however, it forgets to handle the time zone.

When I log for example a zoned date inside a Flink stream map function I always get something like

 2018-03-01T04:10:30.773471918null

Whereas on data inception I get the proper zone

 2018-03-01T04:10:30.773471918-05:00

The null refers to the zone. Later of course I get a null pointer exception as I have to use proper time compare, which needs the zone.

How can I fix that easiest? Thanks for a reply.


Solution

  • I do not fully understand why it does not pick up a serializer. This solution at least works: I implemented a Kryo serializer for ZonedDateTime

    import com.esotericsoftware.kryo.io.{Input, Output}
    import com.esotericsoftware.kryo.{Kryo, Serializer}
    import com.markatta.timeforscala.ZonedDateTime
    
    class ZonedDateTimeSerializer extends Serializer[ZonedDateTime] {
      setImmutable(true)
    
      override def write(kryo: Kryo, out: Output, obj: ZonedDateTime): Unit = {
        ZonedDateTimeSerializer.write(out, obj)
      }
    
      override def read(kryo: Kryo, in: Input, `type`: Class[ZonedDateTime]): ZonedDateTime = {
        ZonedDateTimeSerializer.read(in)
      }
    }
    
    object ZonedDateTimeSerializer {
      def write(out: Output, obj: ZonedDateTime): Unit = {
        LocalDateSerializer.write(out, obj.toLocalDate)
        LocalTimeSerializer.write(out, obj.toLocalTime)
        ZoneIdSerializer.write(out, obj.getZone)
      }
    
      def  read(in: Input): ZonedDateTime = {
        val date = LocalDateSerializer.read(in)
        val time = LocalTimeSerializer.read(in)
        val zone = ZoneIdSerializer.read(in)
        ZonedDateTime(date, time, zone)
      }
    }
    

    I took the implementation from the newest implementation Kyro. Then I registered it as follows:

        env.getConfig.registerTypeWithKryoSerializer(classOf[ZonedDateTime], classOf[ZonedDateTimeSerializer])
    

    This seems to fix the problem. Not sure if it comes from the fact that I use timesforscala, but I want to use this library because it adds important additions I depend on. Comments welcome.