I'm reading data from a csv
file with 7 million rows. Sample data in that file-
"dfg.AAIXpWU4Q","1"
"cvbc.AAU3aXfQ","1"
"T-L5aL1uT_OfFbk","1"
"D9TOXrA_LsQa-awVk","2"
"JWg8_0lGDWcH_9aDc","2"
"ewrq.AAbCVh5wA","1"
"ewrq.AALAC-Qku3heg","1"
"ewrq.AADSmhJ7A","2"
"ewrq.AAEAoHUNA","1"
"ewrq.AALfV5u-7Yg","1"
And I read it like-
>>> rdd = sc.textFile("/path/to/file/*")
>>> rdd.take(2)
['"7wAfdgdfgd","7"', '"1x3Qdfgdf","1"']
# reading the RDD into a dataframe
>>> my_df = rdd.map(lambda x: (x.split(","))).toDF()
# changing column names
>>> df1 = my_df.selectExpr("_1 as user_id", "_2 as hits")
>>> df1.show(3)
+-------+----+
|user_id|hits|
+-------+----+
|"aYk...| "7"|
|"yDQ...| "1"|
|"qUU...|"13"|
+-------+----+
only showing top 3 rows
>>> df2 = df1.sort(col('hits').desc())
>>> df2.show(10)
But this gives me the following error-
Input row doesn't have expected number of values required by the schema. 2 fields are required while 18 values are provided.
I'm guessing it's the way I convert the RDD to DF. Maybe the x.split(",")
is not taking into account the bad data - how do I get around this issue?
Going by @pault's comment, I just did the following to solve this-
>>> rdd = sc.textFile("/path/to/file/*")
# checking out how the data looks
>>> rdd.take(2)
['"7wAfdgdfgd","7"', '"1x3Qdfgdf","1"']
>>> my_df = spark.read.csv("/path/to/file/*", quote='"', sep=",")
>>> df1 = my_df.selectExpr("_c0 as user_id", "_c1 as hits")
>>> df1 = df1.withColumn("hits", df1["hits"].cast(IntegerType()))
>>> df2 = df1.sort(col('hits').desc())