Search code examples
scalaapache-sparkavro

How to iterate records spark scala?


I have a variable "myrdd" that is an avro file with 10 records loaded through hadoopfile.

When I do

myrdd.first_1.datum.getName()

I can get the name. Problem is, I have 10 records in "myrdd". When I do:

myrdd.map(x => {println(x._1.datum.getName())})

it does not work and prints out a weird object a single time. How can I iterate over all records?


Solution

  • Here is a log from a session using spark-shell with a similar scenario.

    Given

    scala> persons
    res8: org.apache.spark.sql.DataFrame = [name: string, age: int]
    
    scala> persons.first
    res7: org.apache.spark.sql.Row = [Justin,19]
    

    Your issue looks like

    scala> persons.map(t => println(t))
    res4: org.apache.spark.rdd.RDD[Unit] = MapPartitionsRDD[10]
    

    so map just returns another RDD (the function is not applied immediately, the function is applied "lazily" when you really iterate over the result).

    So when you materialize (using collect()) you get a "normal" collection:

    scala> persons.collect()
    res11: Array[org.apache.spark.sql.Row] = Array([Justin,19])
    

    over which which you can map. Note that in this case you have a side-effect in the closure passed to map (the println), the result of println is Unit):

    scala> persons.collect().map(t => println(t))
    [Justin,19]
    res5: Array[Unit] = Array(())
    

    Same result if collect is applied at the end:

    scala> persons.map(t => println(t)).collect()
    [Justin,19]
    res19: Array[Unit] = Array(())
    

    But if you just want to print the rows, you can simplify it to using foreach:

    scala> persons.foreach(t => println(t))
    [Justin,19]
    

    As @RohanAletty has pointed out in a comment, this works for a local Spark job. If the job runs in a cluster, collect is required as well:

    persons.collect().foreach(t => println(t))
    

    Notes

    • The same behaviour can be observed in the Iterator class.
    • The output of the session above has been reordered

    Update

    As for filtering: The location of collect is "bad", if you apply filters after collect which can be applied before.

    For example these expressions give the same result:

    scala> persons.filter("age > 20").collect().foreach(println)
    [Michael,29]
    [Andy,30]
    
    scala> persons.collect().filter(r => r.getInt(1) >= 20).foreach(println)
    [Michael,29]
    [Andy,30]
    

    but the 2nd case is worse, because that filter could have been applied before collect.

    The same applies to any type of aggregation as well.