Search code examples
pythonnumpyapache-sparkpysparkrdd

Converting rdd of numpy arrays to pyspark dataframe


I get the following error when trying to convert an rdd made of numpy arrays to a dataframe in pyspark :

below is the piece of code leading to this error, I'm not even sure I can get the point where the error actually is, even reading the trace...

Does anyone knows how that could be bypassed ?

Thanks a lot !

In [111]: rddUser.take(5)

Out[111]:

[array([u'1008798262000292538', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'102254941859441333', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'1035609083097069747', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'10363297284472000', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32'),
 array([u'1059178934871294116', u'1.0', u'0.0', ..., u'0.0', u'0.0', u'1.0'], 
       dtype='<U32')]

then here comes the mess:

In [110]: rddUser.toDF(schema=None).show()  

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-110-073037afd70e> in <module>()
----> 1 rddUser.toDF(schema=None).show()

     62         [Row(name=u'Alice', age=1)]
     63         """
---> 64         return sqlContext.createDataFrame(self, schema, sampleRatio)
     65 
     66     RDD.toDF = toDF

    421 
    422         if isinstance(data, RDD):
--> 423             rdd, schema = self._createFromRDD(data, schema, samplingRatio)
    424         else:
    425             rdd, schema = self._createFromLocal(data, schema)

    308         """
    309         if schema is None or isinstance(schema, (list, tuple)):
--> 310             struct = self._inferSchema(rdd, samplingRatio)
    311             converter = _create_converter(struct)
    312             rdd = rdd.map(converter)

    253         """
    254         first = rdd.first()
--> 255         if not first:
    256             raise ValueError("The first row in RDD is empty, "
    257                              "can not infer schema")

ValueError: The truth value of an array with more than one element is ambiguous. Use a.any() or a.all()

Solution

  • If RDD is defined as just map with tolist

    import numpy as np
    
    rdd = spark.sparkContext.parallelize([
        np.array([u'1059178934871294116', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0']),
        np.array([u'102254941859441333', u'1.0', u'0.0', u'0.0', u'0.0', u'1.0'])
    ])
    
    df = rdd.map(lambda x: x.tolist()).toDF(["user_id"])
    
    # +-------------------+---+---+---+---+---+
    # |            user_id| _2| _3| _4| _5| _6|
    # +-------------------+---+---+---+---+---+
    # |1059178934871294116|1.0|0.0|0.0|0.0|1.0|
    # | 102254941859441333|1.0|0.0|0.0|0.0|1.0|
    # +-------------------+---+---+---+---+---+
    

    but considering your comment I assume you want to use it with ml. Then this might be better:

    from pyspark.ml.linalg import DenseVector
    
    (rdd
       .map(lambda x: (x[0].tolist(), DenseVector(x[1:])))
       .toDF(["user_id", "features"])
       .show(2, False))
    # +-------------------+---------------------+
    # |user_id            |features             |
    # +-------------------+---------------------+
    # |1059178934871294116|[1.0,0.0,0.0,0.0,1.0]|
    # |102254941859441333 |[1.0,0.0,0.0,0.0,1.0]|
    # +-------------------+---------------------+
    

    You should also take a look at pyspark.ml.feature.OneHotEncoder.