Search code examples
jsonscalaperformanceserializationapache-flink

Fast json Deserialization with Map as field


I want to deserialize an Array[Byte] that is in Json format to the Event class described below and I want to do it as fast as possible. The Event class has a couple Int/String fields which shouldn't be a problem. All the contents of the json formatted Byte array must be transformed to a Map. The keys of the of the json/Map are unknown and there aren't any nested jsonObjects or arrays.

E.g Json: {"someKey1": "someval", "someInt": 34, "anotherKey": 56} -> toMap

Here's what I've done so far

import play.api.libs.json.{JsObject, Json}

...

  override def line2Event(
                           line: Array[Byte],
                           id: Int
                         ): Event = {
    val map = Json.parse(line).as[JsObject].value.toMap
    val timestamp = map.getOrElse("timestamp", id).toString.toLong
    Event(id, "GenericJson", timestamp, map)


class Event(
             val id: Int,
             val eventType: String,
             val timestamp: Long,
             extraArgs: Map[String, Any]
           )

The speed for this play api implementation for some dataset is about 25K messages/second. I would like to improve it. Would the use of some other framework help? Something like jsoniter for example?

In my case, however, I don't have a class with specific fields (i.e, all the key-value pairs of the json are stored to the Map field of the Event). Does this or another framework include a parse method like the one that I use but faster?


Solution

  • So, I've also tried using jackson parsing

      lazy val module = new MapModule{}
      lazy val mapper = JsonMapper.builder()
      .addModule(module)
      .build()
      ...
      val map: Map[String, Any] = mapper.readValue(line, classOf[Map[String, Any]])
    

    As I needed an immutable scala map note that I have also registered the DefaultScalaModule.

    Unfortunately since this class is used inside Flink it needs to be serializable and DefaultScalaModule is not (ObjectMapper is). Similar problem arose with other libs (like jsoniter). What to do? Should I create a new queston?

    EDIT: I used lazy vals for the mapper and the scalaModule (got the MapModule only) and they are now initialized when they are needed (i.e., in the flink worker). See "I have a NotSerializableException." part

    The speed is about 80K events/second which is at least 3 times faster from the previous implementation. Perhaps parsing the byte array directly to Map would also be as fast with play json but I couldnt find a way to do it.