Search code examples
apache-sparkpmml

Spark JPMML import Issue


Am trying to import a PMML Model file, generated in R to Spark Context and use it to predict the scores. This is the code used in Spark.

JavaRDD<String> scoreData = data.map(new Function<String, String>() {

    @Override
    public String call(String line) throws Exception {
        String[] row = line.split(",");
        PMML pmml;
        Evaluator evaluator;
        FileSystem fs = FileSystem.get(new Configuration());
        FSDataInputStream inStr = fs.open(new Path("PATH_TO_PMML_FILE"));
        Source transformedSource = ImportFilter.apply(new InputSource(inStr));
        pmml = JAXBUtil.unmarshalPMML(transformedSource);
        System.out.println(pmml.getModels().get(0).getModelName());
        ModelEvaluatorFactory modelEvaluatorFactory = ModelEvaluatorFactory.newInstance();
        ModelEvaluator<?> modelEvaluator = modelEvaluatorFactory.newModelManager(pmml);
        System.out.println(modelEvaluator.getSummary());
        evaluator = (Evaluator) modelEvaluator;

        List<FieldName> activeFields = evaluator.getActiveFields();
        double[] features = new double[row.length - 2]; // row - {contact_id,label}  
        StringBuilder strBld = new StringBuilder();
        Map<FieldName, FieldValue> arguments = new LinkedHashMap<FieldName, FieldValue>();
        strBld.append(row[0]);
        for (int i = 3; i <= row.length - 1; i++) {
            //from f1 - f16
            FieldValue activeValue = evaluator.prepare(activeFields.get(i - 3), Double.parseDouble(row[i]));
            arguments.put(activeFields.get(i - 3), activeValue);
        }
    }

The code worked fine when run in core Java environment(without Spark context), but when running the above code I get the following exception

java.lang.NoSuchMethodError: com.google.common.collect.Range.closed(Ljava/lang/Comparable;Ljava/lang/Comparable;)Lcom/google/common/collect/Range;
at org.jpmml.evaluator.Classification$Type.<clinit>(Classification.java:278)
at org.jpmml.evaluator.ProbabilityDistribution.<init>(ProbabilityDistribution.java:26)
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluateClassification(GeneralRegressionModelEvaluator.java:333)
at org.jpmml.evaluator.GeneralRegressionModelEvaluator.evaluate(GeneralRegressionModelEvaluator.java:107)
at org.jpmml.evaluator.ModelEvaluator.evaluate(ModelEvaluator.java:266)
at org.zcoe.spark.pmml.PMMLSpark_2$1.call(PMMLSpark_2.java:146)
at org.zcoe.spark.pmml.PMMLSpark_2$1.call(PMMLSpark_2.java:1)
at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:999)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

The issue seems to be with the compatibility of Guvava Jar file which is required to run the code. I removed all the Jars containing the com.google.common.collect.Range Class from Spark's class path, but still the same issue persists.

The Spark Job details are below,

spark-submit --jars ./lib/pmml-evaluator-1.2.0.jar,./lib/pmml-model-1.2.2.jar,./lib/pmml-manager-1.1.20.jar,./lib/pmml-schema-1.2.2.jar,./lib/guava-15.0.jar --class

[Stage 0:> (0 + 2) / 2]15/06/26 14:39:15 ERROR YarnScheduler: Lost executor 1 on hslave2: remote Akka client disassociated 15/06/26 14:39:15 ERROR YarnScheduler: Lost executor 2 on hslave1: remote Akka client disassociated [Stage 0:> (0 + 2) / 2]15/06/26 14:39:33 ERROR YarnScheduler: Lost executor 4 on hslave1: remote Akka client disassociated 15/06/26 14:39:33 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 6, hslave1): ExecutorLostFailure (executor 4 lost)
Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
        at scala.Option.foreach(Option.scala:236)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

Please let me know if any mistakes I have done.


Solution

  • You should let both Spark and JPMML have their own version of Guava libraries. It is not a good idea to modify Spark base installation when you can achieve your goal by simply re-working the packaging of your Spark application.

    If you move your Spark application to Apache Maven, then it is possible to use the relocation feature of the Maven Shade Plugin to move JPMML's version of Guava library to another package such as org.jpmml.com.google. The example application of the JPMML-Cascading project does this trick.

    Also, the upside of moving to Apache Maven is that your Spark application will be available as an uber-JAR file, which greatly simplifies its deployment. For example, at the moment you are specifying pmml-manager-1.1.20.jar on your command line, which is not needed.