Search code examples
pythonapache-sparkpysparkrdd

Does Schema depend on first row while converting RDD to DataFrame in pyspark?


My Question is while converting from Rdd to dataframe in pyspark does the schema depends on the first row ?

data1  = [('A','abc',0.1,'',0.562),('B','def',0.15,0.5,0.123),('A','ghi',0.2,0.2,0.1345),('B','jkl','',0.1,0.642),('B','mno',0.1,0.1,'')]
>>> val1=sc.parallelize(data1).toDF()
>>> val1.show()
+---+---+----+---+------+
| _1| _2|  _3| _4|    _5|
+---+---+----+---+------+
|  A|abc| 0.1|   | 0.562|  <------ Does it depends on type of this row?
|  B|def|0.15|0.5| 0.123|
|  A|ghi| 0.2|0.2|0.1345|
|  B|jkl|null|0.1| 0.642|
|  B|mno| 0.1|0.1|  null|
+---+---+----+---+------+

>>> val1.printSchema()
root
 |-- _1: string (nullable = true)
 |-- _2: string (nullable = true)
 |-- _3: double (nullable = true)
 |-- _4: string (nullable = true)
 |-- _5: double (nullable = true)

As you can see column _4 should have been double but it considered as string.

Any Suggestions will be helpfull. Thanks!


Solution

  • @Prathik, I think you are right. toDF() is a shorthand for spark.createDataFrame(rdd, schema, sampleRatio).

    Here's the signature for createDataFrame:

    def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True)
    

    So by default, the parameters schema and samplingRatio are None. According to the doc:

    If schema inference is needed, samplingRatio is used to determined the ratio of rows used for schema inference. The first row will be used if samplingRatio is None.

    So by default, toDF() will use the first row to infer the data type, which it figures StringType for column 4, but FloatType for column 5.

    Here you can't specify the schema to be FloatType for column 4 and 5, since they have strings in their columns. But you can try set sampleRatio to 0.3 as below:

    data1  = [('A','abc',0.1,'',0.562),('B','def',0.15,0.5,0.123),('A','ghi',0.2,0.2,0.1345),('B','jkl','',0.1,0.642),('B','mno',0.1,0.1,'')]
    val1=sc.parallelize(data1).toDF(sampleRatio=0.3)
    val1.show()
    val1.printSchema()
    

    Some times the above code will throw out error if it happens to sample the string row

    Can not merge type <class 'pyspark.sql.types.DoubleType'> and <class 'pyspark.sql.types.StringType'>
    

    but if you are patient and try more times (< 10 for me), you may get something like this. And you can see that both column 4 and 5 are FloatType, because by luck, the program picked double numbers while running createDataFrame.

    +---+---+----+----+------+
    | _1| _2|  _3|  _4|    _5|
    +---+---+----+----+------+
    |  A|abc| 0.1|null| 0.562|
    |  B|def|0.15| 0.5| 0.123|
    |  A|ghi| 0.2| 0.2|0.1345|
    |  B|jkl|null| 0.1| 0.642|
    |  B|mno| 0.1| 0.1|  null|
    +---+---+----+----+------+
    
    root
     |-- _1: string (nullable = true)
     |-- _2: string (nullable = true)
     |-- _3: double (nullable = true)
     |-- _4: double (nullable = true)
     |-- _5: double (nullable = true)