Search code examples
apache-sparkword-count

Spark - Word count Alternate Approach


I have 1 million rows , Is there any alternate way implement word count in spark , other than traditional approach of mapping each word to 1 and then reducing it by key?

Traditional Approach :

JavaPairRDD<String, Integer> counts = textFile.flatMap(s -> 
                         Arrays.asList(SPACE.split(s)).iterator())
        .mapToPair(s -> new Tuple2<>(s, 1))
        .reduceByKey((a, b) -> a + b);

Any New Approach ?


Solution

  • There surely are many ways to do this. Here are 2:

    One: flat-map and make a data frame:

    JavaRDD<Row> rowRdd = spark.read()
            .textFile("loremipsum.txt")
            .javaRDD()
            .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
            .map(s -> RowFactory.create(s));
    spark.createDataFrame(rowRdd,
                          new StructType()
                         .add(DataTypes.createStructField("word", DataTypes.StringType, true)))
          .groupBy("word")
          .count()
          .show();
    

    Printing something like:

    +------------+-----+
    |        word|count|
    +------------+-----+
    |         Sit|   17|
    |        Elit|    6|
    |   vehicula.|    2|
    |       eros.|    2|
    |        nam.|    3|
    |   porttitor|   18|
    |consectetur.|    6|
    ...
    

    Bonus: Use a SQL to group (if that counts as yet a further alternative)

    Two: group by word and count elements in iterables:

    Map<String, Long> counts = spark.read().textFile("loremipsum.txt")
            .javaRDD()
            .flatMap(s -> Arrays.asList(s.split(" ")).iterator())
            .groupBy(i -> i)
            .aggregateByKey(0L, (id, it) -> countIterable(it), (a, b) -> a + b)
    
            .collect() //collection of Tuple2: you can stop here
            .stream()
            .collect(Collectors.toMap(t -> t._1, t -> t._2));
    

    Resulting in something like:

    {=50, Malesuada=4, justo.=3, potenti=2, vel.=11, purus=30, curabitur.=2...}
    

    With countIterable being defined as:

    private static <T> long countIterable(Iterable<T> it) {
        long res = 0;
        for (T t : it)
            res += 1;
        return res;
    }
    

    Which can also be implemented as

    return StreamSupport.stream(it.spliterator(), false).count();