Search code examples
pythondataframepysparkapache-spark-sqlrdd

Input row doesn't have expected number of values required by the schema


I'm reading data from a csv file with 7 million rows. Sample data in that file-

"dfg.AAIXpWU4Q","1"
"cvbc.AAU3aXfQ","1"
"T-L5aL1uT_OfFbk","1"
"D9TOXrA_LsQa-awVk","2"
"JWg8_0lGDWcH_9aDc","2"
"ewrq.AAbCVh5wA","1"
"ewrq.AALAC-Qku3heg","1"
"ewrq.AADSmhJ7A","2"
"ewrq.AAEAoHUNA","1"
"ewrq.AALfV5u-7Yg","1"

And I read it like-

>>> rdd = sc.textFile("/path/to/file/*")

>>> rdd.take(2)
['"7wAfdgdfgd","7"', '"1x3Qdfgdf","1"']
​
# reading the RDD into a dataframe
>>> my_df = rdd.map(lambda x: (x.split(","))).toDF()

# changing column names
>>> df1 = my_df.selectExpr("_1 as user_id", "_2 as hits")

>>> df1.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows

>>> df2 = df1.sort(col('hits').desc())
>>> df2.show(10)

But this gives me the following error-

Input row doesn't have expected number of values required by the schema. 2 fields are required while 18 values are provided.

I'm guessing it's the way I convert the RDD to DF. Maybe the x.split(",") is not taking into account the bad data - how do I get around this issue?


Solution

  • Going by @pault's comment, I just did the following to solve this-

    >>> rdd = sc.textFile("/path/to/file/*")
    
    # checking out how the data looks
    >>> rdd.take(2)
    ['"7wAfdgdfgd","7"', '"1x3Qdfgdf","1"']
    
    >>> my_df = spark.read.csv("/path/to/file/*", quote='"', sep=",")
    
    >>> df1 = my_df.selectExpr("_c0 as user_id", "_c1 as hits")
    
    >>> df1 = df1.withColumn("hits", df1["hits"].cast(IntegerType()))
    
    >>> df2 = df1.sort(col('hits').desc())