Search code examples
pythonapache-sparkdataframepysparkdata-cleaning

Filtering Spark Dataframe


I've created a dataframe as:

ratings = imdb_data.sort('imdbRating').select('imdbRating').filter('imdbRating is NOT NULL')

Upon doing ratings.show() as shown below, i can see that the imdbRating field has a mixed type of data such as random strings, movie title, movie url and actual ratings. So the dirty data looks this:

+--------------------+
|          imdbRating|
+--------------------+
|Mary (TV Episode...|
| Paranormal Activ...|
| Sons (TV Episode...|
|        Spion (2011)|
| Winter... und Fr...|
| and Gays (TV Epi...|
| grAs - Die Serie...|
| hat die Wahl (2000)|
|                 1.0|
|                 1.3|
|                 1.4|
|                 1.5|
|                 1.5|
|                 1.5|
|                 1.6|
|                 1.6|
|                 1.7|
|                 1.9|
|                 1.9|
|                 1.9|
+--------------------+
only showing top 20 rows

Is there anyway i can filter out the unwanted strings and all just get the ratings ? I tried using UDF as:

 ratings_udf = udf(lambda imdbRating: imdbRating if isinstance(imdbRating, float)  else None)

and tried calling it as:

ratings = imdb_data.sort('imdbRating').select('imdbRating')
filtered = rating.withColumn('imdbRating',ratings_udf(ratings.imdbRating))

The problem with above is, since it tried calling the udf on each row, each row of the dataframe mapped to a Row type and hence returning None on all the values.

Is there any straightforward way to filter out those data ? Any help will be much appreciated. Thank you


Solution

  • Finally, i was able to resolve it.The problem was there was some corrupt data with not all fields present. Firstly, i tried is using pandas by reading the csv files in pandas as:

    pd_frame = pd.read_csv('imdb.csv', error_bad_lines=False)
    

    This skipped/dropped the corrupt rows which had less columns than the actual. I tried to read the above panda dataframe, pd_frame, to spark using:

    imdb_data= spark.createDataFrame(pd_frame)
    

    but got some error because of mismatch while inferring schema. Turns out spark csv reader has something similar which drops the corrupt rows as:

    imdb_data = spark.read.csv('imdb.csv', header='true', mode='DROPMALFORMED')