Search code examples
scalajoinapache-sparkaccumulator

How to use accumulator to count the records that have no matching items in leftOuterJoin?


Spark accumulators are a great way to get useful information about an operation over an RDD.

My problem is the following: I want to perform a join between two datasets, called, e.g. events and items (where the events are unique and involve items, and both are keyed by item_id which is primary for items)

What works is this:

val joinedRDD = events.leftOuterJoin(items)

One possible way to know how many events did not have matching items is to write:

val numMissingItems = joinedRDD.map(x => if (x._2._2.isDefined) 0 else 1).sum

My question is: is there a way to obtain this count with an accumulator? I dont want to go through the RDD just to do the count.


Solution

  • Indeed, you could use the cogroup signature and then do the logic that leftOuterJoin performs by your self, and in the no match case increment the accumulator. However, its important to note, that since this is a transformation, it is possible (for example if a task fails / is recomputed) that your accumulator may over count the number of records, although generally not by a lot. Its up to you if that is acceptable.