Search code examples
java-8apache-sparktf-idftext-classification

groupingBy operation in Java-8


I'm trying to re-write famous example of Spark's text classification (http://chimpler.wordpress.com/2014/06/11/classifiying-documents-using-naive-bayes-on-apache-spark-mllib/) on Java 8.

I have a problem - in this code I'm making some data preparations for getting idfs of all words in all files:

    termDocsRdd.collect().stream().flatMap(doc -> doc.getTerms().stream()
                                .map(term -> new ImmutableMap.Builder<String, String>()
                                .put(doc.getName(),term)
                                .build())).distinct()        

And I'm stuck on the groupBy operation. (I need to group this by term, so each term must be a key and the value must be a sequence of documents). In Scala this operation looks very simple - .groupBy(_._2). But how can I do this in Java?

I tried to write something like:

    .groupingBy(term -> term, mapping((Document) d -> d.getDocNameContainsTerm(term), toList()));

but it's incorrect...

Somebody knows how to write it in Java?

Thank You very much.


Solution

  • If I understand you correctly, you want to do something like this:

    (import static java.util.stream.Collectors.*;)

    Map<Term, Set<Document>> collect = termDocsRdd.collect().stream().flatMap(
     doc -> doc.getTerms().stream().map(term -> new AbstractMap.SimpleEntry<>(doc, term)))
    .collect(groupingBy(Map.Entry::getValue, mapping(Map.Entry::getKey, toSet())));
    

    The use of Map.Entry/ AbstractMap.SimpleEntry is due to the absence of a standard Pair<K,V> class in Java-8. Map.Entry implementations can fulfill this role but at the cost of having unintuitive and verbose type and method names (regarding the task of serving as Pair implementation).


    If you are using the current Eclipse version (I tested with LunaSR1 20140925) with its limited type inference, you have to help the compiler a little bit:

    Map<Term, Set<Document>> collect = termDocsRdd.collect().stream().flatMap(
     doc -> doc.getTerms().stream().<Map.Entry<Document,Term>>map(term -> new AbstractMap.SimpleEntry<>(doc, term)))
    .collect(groupingBy(Map.Entry::getValue, mapping(Map.Entry::getKey, toSet())));