Search code examples
apache-sparkpysparklibsvmapache-spark-ml

How can I read LIBSVM models (saved using LIBSVM) into PySpark?


I have a LIBSVM scaling model (generated with svm-scale) that I would like to port over to PySpark. I've naively tried the following:

scaler_path = "path to model"
a = MinMaxScaler().load(scaler_path)

But I'm thrown an error, expecting a metadata directory:

Py4JJavaErrorTraceback (most recent call last)
<ipython-input-22-1942e7522174> in <module>()
----> 1 a = MinMaxScaler().load(scaler_path)

/srv/data/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/util.pyc in load(cls, path)
    226     def load(cls, path):
    227         """Reads an ML instance from the input path, a shortcut of `read().load(path)`."""
--> 228         return cls.read().load(path)
    229 
    230 

/srv/data/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/ml/util.pyc in load(self, path)
    174         if not isinstance(path, basestring):
    175             raise TypeError("path should be a basestring, got type %s" % type(path))
--> 176         java_obj = self._jread.load(path)
    177         if not hasattr(self._clazz, "_from_java"):
    178             raise NotImplementedError("This Java ML type cannot be loaded into Python currently: %r"

/usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/srv/data/spark/spark-2.0.0-bin-hadoop2.6/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc in get_return_value(answer, gateway_client, target_id, name)
    317                 raise Py4JJavaError(
    318                     "An error occurred while calling {0}{1}{2}.\n".
--> 319                     format(target_id, ".", name), value)
    320             else:
    321                 raise Py4JError(

Py4JJavaError: An error occurred while calling o321.load.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: file:[filename]/metadata

```

Is there a simple work-around for loading this? The format of the LIBSVM model is

x
0 1
1 -1050 1030
2 0 1
3 0 3
4 0 1
5 0 1

Solution

  • First, the file presented isn't in libsvm format. The correct format of a libsvm file is the following :

    <label> <index1>:<value1> <index2>:<value2> ... <indexN>:<valueN>
    

    Thus your data preparation is incorrect to start with.

    Secondly, the class method load(path) that you are using with MinMaxScaler reads an ML instance from the input path.

    Remember that : MinMaxScaler computes summary statistics on a data set and produces a MinMaxScalerModel. The model can then transform each feature individually such that it is in the given range.

    e.g :

    from pyspark.ml.linalg import Vectors
    from pyspark.mllib.regression import LabeledPoint
    from pyspark.ml.feature import MinMaxScaler
    df = spark.createDataFrame([(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])) ,(0.0, Vectors.dense([1.01, 2.02, 3.03]))],['label','features'])
    
    df.show(truncate=False)
    # +-----+---------------------+
    # |label|features             |
    # +-----+---------------------+
    # |1.1  |(3,[0,2],[1.23,4.56])|
    # |0.0  |[1.01,2.02,3.03]     |
    # +-----+---------------------+
    
    mmScaler = MinMaxScaler(inputCol="features", outputCol="scaled")
    temp_path = "/tmp/spark/"
    minMaxScalerPath = temp_path + "min-max-scaler"
    mmScaler.save(minMaxScalerPath)   
    

    The snippet above will save the MinMaxScaler feature transformer so it can be loaded after with the class method load.

    Now, let's take a look at what actually happened. The class method save will create the following file structure :

    /tmp/spark/
    └── min-max-scaler
        └── metadata
            ├── part-00000
            └── _SUCCESS
    

    Let's check the content of that part-0000 file :

    $ cat /tmp/spark/min-max-scaler/metadata/part-00000 | python -m json.tool
    {
        "class": "org.apache.spark.ml.feature.MinMaxScaler",
        "paramMap": {
            "inputCol": "features",
            "max": 1.0,
            "min": 0.0,
            "outputCol": "scaled"
        },
        "sparkVersion": "2.0.0",
        "timestamp": 1480501003244,
        "uid": "MinMaxScaler_42e68455a929c67ba66f"
    }
    

    So actually when you load the transformer :

    loadedMMScaler = MinMaxScaler.load(minMaxScalerPath)
    

    You are actually load that file. It won't take a libsvm file !

    Now you can apply your transformer to create the model and transform your DataFrame :

    model = loadedMMScaler.fit(df)
    
    model.transform(df).show(truncate=False)                                    
    # +-----+---------------------+-------------+
    # |label|features             |scaled       |
    # +-----+---------------------+-------------+
    # |1.1  |(3,[0,2],[1.23,4.56])|[1.0,0.0,1.0]|
    # |0.0  |[1.01,2.02,3.03]     |[0.0,1.0,0.0]|
    # +-----+---------------------+-------------+
    

    Now let's get back to that libsvm file and let us create some dummy data and save it to a libsvm format using MLUtils

    from pyspark.mllib.regression import LabeledPoint
    from pyspark.mllib.linalg import Vectors
    from pyspark.mllib.util import MLUtils
    data = sc.parallelize([LabeledPoint(1.1, Vectors.sparse(3, [(0, 1.23), (2, 4.56)])), LabeledPoint(0.0, Vectors.dense([1.01, 2.02, 3.03]))])
    MLUtils.saveAsLibSVMFile(data, temp_path + "data")
    

    Back to our file structure :

    /tmp/spark/
    ├── data
    │   ├── part-00000
    │   ├── part-00001
    │   ├── part-00002
    │   ├── part-00003
    │   ├── part-00004
    │   ├── part-00005
    │   ├── part-00006
    │   ├── part-00007
    │   └── _SUCCESS
    └── min-max-scaler
        └── metadata
            ├── part-00000
            └── _SUCCESS
    

    You can check the content of those file which is in libsvm format now :

    $ cat /tmp/spark/data/part-0000*
    1.1 1:1.23 3:4.56
    0.0 1:1.01 2:2.02 3:3.03
    

    Now let's load that data and apply :

    loadedData = MLUtils.loadLibSVMFile(sc, temp_path + "data")
    loadedDataDF = spark.createDataFrame(loadedData.map(lambda lp : (lp.label, lp.features.asML())), ['label','features'])
    
    loadedDataDF.show(truncate=False)
    # +-----+----------------------------+                                            
    # |label|features                    |
    # +-----+----------------------------+
    # |1.1  |(3,[0,2],[1.23,4.56])       |
    # |0.0  |(3,[0,1,2],[1.01,2.02,3.03])|
    # +-----+----------------------------+
    

    Note that converting MLlib Vectors to ML Vectors is very important. You can read more about it here.

    model.transform(loadedDataDF).show(truncate=False)
    # +-----+----------------------------+-------------+
    # |label|features                    |scaled       |
    # +-----+----------------------------+-------------+
    # |1.1  |(3,[0,2],[1.23,4.56])       |[1.0,0.0,1.0]|
    # |0.0  |(3,[0,1,2],[1.01,2.02,3.03])|[0.0,1.0,0.0]|
    # +-----+----------------------------+-------------+
    

    I hope that this answers your question!