Search code examples
pythonpysparkelastic-map-reduce

Multiple Filtering in PySpark


I have imported a data set into Juputer notebook / PySpark to process through EMR, for example:

data sample

I want to clean up the data before using it using the filter function. This includes:

  1. Removing rows that are blank or '0' or NA cost or date. I think the filter would be something like: .filter(lambda (a,b,c,d): b = ?, c % 1 == c, d = ?). I'm unsure how to filter fruit and store here.
  2. Remove incorrect values e.g. '3' is not a fruit name. This is easy for numbers (just to number % 1 == number) but I'm unsure how it would filter out the words.
  3. Removing rows that are statistically outliers i.e. 3 standard deviations from the mean. So here cell C4 would clearly need to removed but I am unsure how to incorporate this logic into a filter.

Do I need to perform one filter at a time or is there a way to filter the data set (in lambda notation) all in one go?

Or, would it be easier to write a Spark SQL query instead which has many filters in the 'where' clause (but then #3 above is still difficult to write in SQL).


Solution

  • If you read in the documentation, http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.filter, it is written that

    where() is an alias for filter().

    So,you can safely use 'filter' instead of 'where' for multiple conditions too.

    EDIT: If you want to filter on many conditions for many columns, I would prefer this method.

    from dateutil.parser import parse
    import pyspark.sql.functions as F
    
    def is_date(string):
        try: 
           parse(string)
           return True
        except ValueError:
           return False
    def check_date(d):
        if is_date(d):
            return d
        else:
            return None
    
    date_udf = F.udf(check_date,StrinType())
    
    def check_fruit(name):
        fruits_list #create a list of fruits(can easily find it on google)
                    #difficult filtering words otherwise
                    #try checking from what you want, rest will be filtered
        if name in fruits_list:
            return name
        else:
            return None
    
    fruit_udf = F.udf(check_fruit,StringType())
    
    def check_cost(value):
        mean, std #calculcated beforehand
        threshold_upper = mean + (3*std)
        threhold_lower = mean - (3*std)
    
        if value > threhold_lower and value < threshold_upper:
            return value
        else:
            return None
    cost_udf = F.udf(check_cost,StringType())        
    
    #Similarly create store_udf
    
    df = df.select([date_udf(F.col('date')).alias('date'),\
                fruit_udf(F.col('fruit')).alias('fruit'),\
                cost_udf(F.col('cost')).alias('cost'),\
                store_udf(F.col('store')).alias('store')]).dropna()
    

    This will result in working on all columns together.