Search code examples
datasetbatch-processingapache-flink

NPE when flink group case class objects


I use dataSet API, I have two kinds of case classes

case class Geo(country:Int, province:Int, city:Int, county:Int)


case class AntiFraudLog(
    eventType: Int,
    valid: Boolean    
  )

case class AntiFraudSession(fraudLogs: Seq[AntiFraudLog])

Then I generated a key/value pair which its value is a case class.

 val dataKeyValue: DataSet[(Long, AntiFraudLog)]

And try to group items with same key

val groupedSortedData = dataKeyValue groupBy 0

Then transform grouped data to another case class

 val sessionData:DataSet[AntiFraudSession] = groupedSortedData reduceGroup(
  logs => AntiFraudSession(logs.map(_._2).toSeq)
  )

But when I run the program, I encountered such exception

Caused by: java.lang.NullPointerException
    at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:90)
    at org.apache.flink.api.scala.typeutils.TraversableSerializer.serialize(TraversableSerializer.scala:32)
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:100)
    at org.apache.flink.api.scala.typeutils.CaseClassSerializer.serialize(CaseClassSerializer.scala:30)
    at org.apache.flink.runtime.plugable.SerializationDelegate.write(SerializationDelegate.java:56)
    at org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer.addRecord(SpanningRecordSerializer.java:83)
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:85)
    at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
    at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    at org.apache.flink.api.scala.DataSet$$anon$5$$anonfun$flatMap$1.apply(DataSet.scala:417)
    at org.apache.flink.api.scala.DataSet$$anon$5$$anonfun$flatMap$1.apply(DataSet.scala:417)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.flink.api.scala.DataSet$$anon$5.flatMap(DataSet.scala:417)
    at org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:80)
    at org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
    at org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:163)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:584)
    at java.lang.Thread.run(Thread.java:745)

Does anyone know how to fix it?


Solution

  • It does look like Flink fails to serialize a case class that has a collection field with a null value. In your scenario that would be AntiFraudSession with fraudLogs=null. Is there more transformation logic that you think might cause elements like that to appear in the sessionData?