Search code examples
scalaapache-flinktypeclasssubtypingadhoc-polymorphism

DataSet/DataStream of type class interface


I am just experimenting with the use of Scala type classes within Flink. I have defined the following type class interface:

trait LikeEvent[T] {
    def timestamp(payload: T): Int
}

Now, I want to consider a DataSet of LikeEvent[_] like this:

// existing classes that need to be adapted/normalized (without touching them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(ts: Int, name: String, value: Double)

// create instances for the raw events
object EventInstance {

    implicit val logEvent = new LikeEvent[Log] {
        def timestamp(log: Log): Int = log.ts
    }

    implicit val metricEvent = new LikeEvent[Metric] {
        def timestamp(metric: Metric): Int = metric.ts
    }
}

// add ops to the raw event classes (regular class)
object EventSyntax {

    implicit class Event[T: LikeEvent](val payload: T) {
        val le = implicitly[LikeEvent[T]]
        def timestamp: Int = le.timestamp(payload)
    }
}

The following app runs just fine:

// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
  Metric(1586736000, "cpu_usage", 0.2),
  Log(1586736005, 1, "invalid login"),
  Log(1586736010, 1, "invalid login"),
  Log(1586736015, 1, "invalid login"),
  Log(1586736030, 2, "valid login"),
  Metric(1586736060, "cpu_usage", 0.8),
  Log(1586736120, 0, "end of world"),
)

// count events per hour
val eventsPerHour = events
  .map(new GetMinuteEventTuple())
  .groupBy(0).reduceGroup { g =>
    val gl = g.toList
    val (hour, count) = (gl.head._1, gl.size)
    (hour, count)
  }

eventsPerHour.print()

Printing the expected output

(0,5)
(1,1)
(2,1)

However, if I modify the syntax object like this:

// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {

  case class Event[T: LikeEvent](payload: T) {
    val le = implicitly[LikeEvent[T]]
    def timestamp: Int = le.timestamp(payload)
  }

  implicit def fromPayload[T: LikeEvent](payload: T): Event[T] = Event(payload)  
}

I get the following error:

type mismatch;
found   : org.apache.flink.api.scala.DataSet[Product with Serializable]
required: org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]

So, guided by the message, I do the following change:

val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)

After that, the error changes to:

could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]

I cannot understand why EventSyntax2 results into these errors, whereas EventSyntax compiles and runs well. Why is using a case class wrapper in EventSyntax2 more problematic than using a regular class as in EventSyntax?

Anyway, my question is twofold:

  • How can I solve my problem with EventSyntax2?
  • What would be the simplest way to achieve my goals? Here, I am just experimenting with the type class pattern for the sake of learning, but definitively a more Object-Oriented approach (based on subtyping) looks simpler to me. Something like this:
// Define trait
trait Event {
    def timestamp: Int
    def payload: Product with Serializable // Any case class
}

// Metric adapter (similar for Log)
object MetricAdapter {

    implicit class MetricEvent(val payload: Metric) extends Event {
        def timestamp: Int = payload.ts
    }
}

And then simply use val events: DataSet[Event] = env.fromElements(...) in the main.

Note that List of classes implementing a certain typeclass poses a similar question, but it considers a simple Scala List instead of a Flink DataSet (or DataStream). The focus of my question is on using the type class pattern within Flink to somehow consider heterogeneous streams/datasets, and whether it really makes sense or one should just clearly favour a regular trait in this case and inherit from it as outlined above.

BTW, you can find the code here: https://github.com/salvalcantara/flink-events-and-polymorphism.


Solution

  • Short answer: Flink cannot derive TypeInformation in scala for wildcard types

    Long answer: Both of your questions are really asking, what is TypeInformation, how is it used, and how is it derived.

    TypeInformation is Flink's internal type system that it uses to serialize data when it is shuffled across the network and stored in a statebackend (when using the DataStream api).

    Serialization is a major performance concern in data processing, so Flink contains specialized serializers for common data types and patterns. Out of the box, in its Java stack, it supports all JVM primitives, Pojo's, Flink tuples, some common collections types, and avro. The type of your class is determined using reflection and if it does not match a known type it will fall back to Kryo.

    In the scala api, type information is derived using implicits. All methods on the scala DataSet and DataStream api have their generic parameters annotated for the implicit as a type class.

    def map[T: TypeInformation] 
    

    This TypeInformation can be provided manually, like any type class, or derived using a macro that is imported from flink.

    import org.apache.flink.api.scala._
    

    This macro decorates the java type stack with support for scala tuples, scala case classes, and some common scala std library types. I say decorator because it can and will fall back to the java stack if your class is not one of those types.

    So why does version 1 work?

    Because it is an ordinary class that the type stack cannot match and so it resolved it to a generic type and returned a kryo based serializer. You can test this from the console and see it returns a generic type.

    > scala> implicitly[TypeInformation[EventSyntax.Event[_]]]
    res2: org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax.Event[_]] = GenericType<com.salvalcantara.fp.EventSyntax.Event>
    

    Version 2 does not work because it recognized the type as a case class and then works to recursively derive TypeInformation instances for each of its members. This is not possible for wildcard types, which are different than Any and so derivation fails.

    In general, you should not use Flink with heterogeneous types because it will not be able to derive efficient serializers for your workload.