Search code examples
pythonapache-sparksparse-matrix

How to add two Sparse Vectors in Spark using Python


I've searched everywhere but I couldn't find how to add two sparse vectors using Python. I want to add two sparse vectors like this:-

(1048576, {110522: 0.6931, 521365: 1.0986, 697409: 1.0986, 725041: 0.6931, 749730: 0.6931, 962395: 0.6931})

(1048576, {4471: 1.0986, 725041: 0.6931, 850325: 1.0986, 962395: 0.6931})

Solution

  • Something like this should work:

    from pyspark.mllib.linalg import Vectors, SparseVector, DenseVector
    import numpy as np
    
    def add(v1, v2):
        """Add two sparse vectors
        >>> v1 = Vectors.sparse(3, {0: 1.0, 2: 1.0})
        >>> v2 = Vectors.sparse(3, {1: 1.0})
        >>> add(v1, v2)
        SparseVector(3, {0: 1.0, 1: 1.0, 2: 1.0})
        """
        assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
        assert v1.size == v2.size 
        # Compute union of indices
        indices = set(v1.indices).union(set(v2.indices))
        # Not particularly efficient but we are limited by SPARK-10973
        # Create index: value dicts
        v1d = dict(zip(v1.indices, v1.values))
        v2d = dict(zip(v2.indices, v2.values))
        zero = np.float64(0)
        # Create dictionary index: (v1[index] + v2[index])
        values =  {i: v1d.get(i, zero) + v2d.get(i, zero)
           for i in indices
           if v1d.get(i, zero) + v2d.get(i, zero) != zero}
    
        return Vectors.sparse(v1.size, values)
    

    If you prefer only single pass and don't care about introduced zeros you can modify above code like this:

    from collections import defaultdict
    
    def add(v1, v2):
        assert isinstance(v1, SparseVector) and isinstance(v2, SparseVector)
        assert v1.size == v2.size
        values = defaultdict(float) # Dictionary with default value 0.0
        # Add values from v1
        for i in range(v1.indices.size):
            values[v1.indices[i]] += v1.values[i]
        # Add values from v2
        for i in range(v2.indices.size):
            values[v2.indices[i]] += v2.values[i]
        return Vectors.sparse(v1.size, dict(values))
    

    If you want you can try monkey patch SparseVector:

    SparseVector.__add__ = add
    v1 = Vectors.sparse(5, {0: 1.0, 2: 3.0})
    v2 = Vectors.sparse(5, {0: -3.0, 2: -3.0, 4: 10})
    v1 + v2
    ## SparseVector(5, {0: -2.0, 4: 10.0})
    

    Alternatively you should be able to use scipy.sparse.

    from scipy.sparse import csc_matrix
    from pyspark.mllib.regression import LabeledPoint
    
    m1 = csc_matrix((
       v1.values,
       (v1.indices, [0] * v1.numNonzeros())),
       shape=(v1.size, 1))
    
    m2 = csc_matrix((
       v2.values,
       (v2.indices, [0] * v2.numNonzeros())),
       shape=(v2.size, 1))
    
    LabeledPoint(0, m1 + m2)