Search code examples
scalaapache-sparkspark-streaming

Scala type mismatch in map operation


I am trying a map operation on a Spark DStream in the code below:

val hashesInRecords: DStream[(RecordKey, Array[Int])] = records.map(record => {
      val hashes: List[Int] = calculateIndexing(record.fields())
      val ints: Array[Int] = hashes.toArray(Array.ofDim[Int](hashes.length))
      (new RecordKey(record.key, hashes.length), ints)
    })

The code looks fine in IntelliJ however when I try to build I get an error which I don't really understand:

Error:(53, 61) type mismatch;
 found   : Array[Int]
 required: scala.reflect.ClassTag[Int]
      val ints: Array[Int] = hashes.toArray(Array.ofDim[Int](hashes.length))

This error remains even after I add the type in the map operation like so :

records.map[(RecordKey, Array[Int])](record => {...

Solution

  • This should fix your problem, also it avoids the call of List.length which is O(N), and uses Array.length instead which is O(1).

    val hashesInRecords: DStream[(RecordKey, Array[Int])] = records.map { record =>
      val ints = calculateIndexing(record.fields()).toArray
      (new RecordKey(record.key, ints.length), ints)
    }