Search code examples
pythonpysparkapache-spark-ml

PySpark ML Pipeline.load results throws java.lang.UnsupportedOperationException: empty collection


I have a PySpark fitted pipeline that I am saving to disk for later use.

Here is my pipeline code :

model  = Pipeline(stages=[segment_indexer, model_name_indexer, make_name_indexer, engine_type_indexer, segment_encoder, model_name_incoder, make_name_incoder, engine_type_incoder, x_assembler, estimator]).fit(trainingData)
model.save('file:/opt/app/fitted-model' )

This saves the model to disk.

I am trying to load the model back like so

model2 = pyspark.ml.pipeline.PipelineModel.load("file:/tmp/mymodels/fitted-model")

which throws

Traceback (most recent call last):

File "<stdin>", line 1, in <module>
  File "/usr/lib/spark/python/pyspark/ml/util.py", line 257, in load
    return cls.read().load(path)
  File "/usr/lib/spark/python/pyspark/ml/util.py", line 197, in load
    java_obj = self._jread.load(path)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2760.load.
: java.lang.UnsupportedOperationException: empty collection
    at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1370)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
    at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:382)
    at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:266)
    at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:347)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

What am i doing wrong ?

`


Solution

  • I had to do a pipeline.transform in order for the pipeline to actually apply transforms on the dataframe before saving. After doing that, loading the model back brings the model instance back and then we can apply transform again on a dataframe .