Search code examples
apache-sparkmachine-learningdistributed-computingapache-spark-mllibdata-science

Text Classification using Spark ML


I have a free text description based on which I need to perform a classification. For example the description can be that of an incident. Based on the description of the incident , I need to predict the risk associated with the event . For eg : "A murder in town" - this description is a candidate for "high" risk.

I tried logistic regression but realized that currently there is support only for binary classification. For Multi class classification ( there are only three possible values ) based on free text description , what would be the most suitable algorithm? ( Linear Regression or Naive Bayes )


Solution

  • This is how I solved the above problem.

    Though prediction accuracy is not bad ,the model has to be tuned further for better results.

    Experts , please revert back if you find anything wrong.

    My input data frame has two columns "Text" and "RiskClassification"

    Below are the sequence of steps to predict using Naive Bayes in Java

    1. Add a new column "label" to the input dataframe . This column will basically decode the risk classification like below
    sqlContext.udf().register("myUDF", new UDF1<String, Integer>() {
                @Override
                public Integer call(String input) throws Exception {
                    if ("LOW".equals(input))
                        return 1;
                    if ("MEDIUM".equals(input))
                        return 2;
                    if ("HIGH".equals(input))
                        return 3;
                    return 0;
                }
            }, DataTypes.IntegerType);
    
    samplingData = samplingData.withColumn("label", functions.callUDF("myUDF", samplingData.col("riskClassification")));
    
    1. Create the Training ( 80 % ) and Testing Data Sets ( 20 % )

    For eg :

    DataFrame lowRisk = samplingData.filter(samplingData.col("label").equalTo(1));
    DataFrame lowRiskTraining = lowRisk.sample(false, 0.8);
    
    1. Union All the dataframes to build the complete training data

    2. Building test data is slightly tricky . Test Data should have all data which is not present in the training data

    3. Start transformation of training data and build the model

    6 . Tokenize the text column in the training data set

    Tokenizer tokenizer = new Tokenizer().setInputCol("text").setOutputCol("words");
    DataFrame tokenized = tokenizer.transform(trainingRiskData);
    
    1. Remove Stop Words. (Here you can also do advanced operations like lemme, stemmer, POS etc using Stanford NLP library)
    StopWordsRemover remover = new StopWordsRemover().setInputCol("words").setOutputCol("filtered");
    DataFrame stopWordsRemoved = remover.transform(tokenized);
    
    1. Compute Term Frequency using HashingTF. CountVectorizer is another way to do this
    int numFeatures = 20;
    HashingTF hashingTF = new HashingTF().setInputCol("filtered").setOutputCol("rawFeatures")
            .setNumFeatures(numFeatures);
    DataFrame rawFeaturizedData = hashingTF.transform(stopWordsRemoved);
    
    IDF idf = new IDF().setInputCol("rawFeatures").setOutputCol("features");
    IDFModel idfModel = idf.fit(rawFeaturizedData);
    
    DataFrame featurizedData = idfModel.transform(rawFeaturizedData);
    
    1. Convert the featurized input into JavaRDD . Naive Bayes works on LabeledPoint
    JavaRDD<LabeledPoint> labelledJavaRDD = featurizedData.select("label", "features").toJavaRDD()
        .map(new Function<Row, LabeledPoint>() {
    
            @Override
            public LabeledPoint call(Row arg0) throws Exception {
                LabeledPoint labeledPoint = new LabeledPoint(new Double(arg0.get(0).toString()),
                        (org.apache.spark.mllib.linalg.Vector) arg0.get(1));
                return labeledPoint;
            }
        });
    
    1. Build the model
    NaiveBayes naiveBayes = new NaiveBayes(1.0, "multinomial");
    NaiveBayesModel naiveBayesModel = naiveBayes.train(labelledJavaRDD.rdd(), 1.0);
    
    1. Run all the above transformations on the test data also

    2. Loop through the test data frame and perform the below actions

    3. Create a LabeledPoint using the "label" and "features" in the test data frame

    For eg : If the test data frame has label and features in the third and seventh column , then

    LabeledPoint labeledPoint = new LabeledPoint(new Double(dataFrameRow.get(3).toString()),
    (org.apache.spark.mllib.linalg.Vector) dataFrameRow.get(7));
    
    1. Use the Prediction Model to predict the label
    double predictedLabel = naiveBayesModel.predict(labeledPoint.features());
    
    1. Add the predicted label also as a column to the test data frame.

    2. Now test data frame has the expected label and the predicted label.

    3. You can export the test data to csv and do analysis or you can compute the accuracy programatically as well.