Search code examples
scalaapache-sparkrdd

How to filter an rdd by data type?


I have an rdd that i am trying to filter for only float type. Do Spark rdds provide any way of doing this?

I have a csv where I need only float values greater than 40 into a new rdd. To achieve this, i am checking if it is an instance of type float and filtering them. When I filter with a !, all the strings are still there in the output and when i dont use !, the output is empty.

val airports1 = airports.filter(line => !line.split(",")(6).isInstanceOf[Float])
val airports2 = airports1.filter(line => line.split(",")(6).toFloat > 40)

At the .toFloat , i run into NumberFormatException which I've tried to handle in a try catch block.


Solution

  • Since you have a plain string and you are trying to get float values from it, you are not actually filtering by type. But, if they can be parsed to float instead.
    You can accomplish that using a flatMap together with Option.

    import org.apache.spark.sql.SparkSession
    import scala.util.Try
    
    val spark = SparkSession.builder.master("local[*]").appName("Float caster").getOrCreate()
    val sc = spark.sparkContext
    
    val data = List("x,10", "y,3.3", "z,a")
    val rdd = sc.parallelize(data) // rdd: RDD[String]
    val filtered = rdd.flatMap(line => Try(line.split(",")(1).toFloat).toOption) // filtered: RDD[Float]
    filtered.collect() // res0: Array[Float] = Array(10.0, 3.3)
    

    For the > 40 part you can either, perform another filter after or filter the inner Option.
    (Both should perform more or less equals due spark laziness, thus choose the one is more clear for you).

    // Option 1 - Another filter.
    val filtered2 = filtered.filter(x => x > 40)
    
    // Option 2 - Filter the inner option in one step.
    val filtered = rdd.flatMap(line => Try(line.split(",")(1).toFloat).toOption.filter(x => x > 40))
    

    Let me know if you have any question.