I have a PySpark fitted pipeline that I am saving to disk for later use.
Here is my pipeline code :
model = Pipeline(stages=[segment_indexer, model_name_indexer, make_name_indexer, engine_type_indexer, segment_encoder, model_name_incoder, make_name_incoder, engine_type_incoder, x_assembler, estimator]).fit(trainingData)
model.save('file:/opt/app/fitted-model' )
This saves the model to disk.
I am trying to load the model back like so
model2 = pyspark.ml.pipeline.PipelineModel.load("file:/tmp/mymodels/fitted-model")
which throws
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/ml/util.py", line 257, in load
return cls.read().load(path)
File "/usr/lib/spark/python/pyspark/ml/util.py", line 197, in load
java_obj = self._jread.load(path)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o2760.load.
: java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$first$1.apply(RDD.scala:1370)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.first(RDD.scala:1367)
at org.apache.spark.ml.util.DefaultParamsReader$.loadMetadata(ReadWrite.scala:382)
at org.apache.spark.ml.Pipeline$SharedReadWrite$.load(Pipeline.scala:266)
at org.apache.spark.ml.PipelineModel$PipelineModelReader.load(Pipeline.scala:347)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:748)
What am i doing wrong ?
`
I had to do a pipeline.transform
in order for the pipeline to actually apply transforms on the dataframe before saving.
After doing that, loading the model back brings the model instance back and then we can apply transform
again on a dataframe .