Search code examples
scalaapache-sparkstreamingapache-kafkatype-parameter

Type-parameterize a DStream


Can a DStream have type parameters?

If yes, how?

When I try lazy val qwe = mStream.mapWithState(stateSpec) on myDStream: DStream[(A, B)] (class parameter), I get:

value mapWithState is not a member of org.apache.spark.streaming.dstream.DStream[(A, B)]
    lazy val qwe = mStream.mapWithState(stateSpec)

Solution

  • Substantial subset of the Spark API requires implicit ClassTags (see Scala: What is a TypeTag and how do I use it?) and PairDStreamFunctions.mapWithState is no different. Check class definition:

    class PairDStreamFunctions[K, V](self: DStream[(K, V)])
      (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
    

    and:

    def mapWithState[StateType: ClassTag, MappedType: ClassTag](
        spec: StateSpec[K, V, StateType, MappedType]
      ): MapWithStateDStream[K, V, StateType, MappedType] = {
      ...
    }
    

    If want to create a function which operates on a generic pair streams and uses mapWithState you should at least provide ClassTags for KeyType and ValueType types:

    def foo[T : ClassTag, U : ClassTag](
      stream: DStream[(T, U)], f: StateSpec[T, U, Int, Int]) = stream.mapWithState(f)
    

    If StateType and MappedType are parametrized as well you'll need ClassTags for these too:

    def bar[T : ClassTag, U : ClassTag, V : ClassTag,  W : ClassTag](
      stream: DStream[(T, U)], f: StateSpec[T, U, V, W]) = stream.mapWithState(f)