Search code examples
indexingpysparkfrequency

How can count occurrence frequency of records in Spark data frame and add it as new column to data frame without affecting on index column?


I'm trying to add a new column named Freq to the given spark dataframe without affecting on index column or records' order of frame to assign back results of Statistic frequency (which is counts) to right row/incident/event/record in dataframe.

This is my data frame:

+---+-------------+------+------------+-------------+-----------------+
| id|         Type|Length|Token_number|Encoding_type|Character_feature|
+---+-------------+------+------------+-------------+-----------------+
|  0|     Sentence|  4014|         198|        false|              136| 
|  1|    contextid|    90|           2|        false|               15|
|  2|     Sentence|   172|          11|        false|              118| 
|  3|       String|    12|           0|         true|               11| 
|  4|version-style|    16|           0|        false|               13|   
|  5|     Sentence|   339|          42|        false|              110| 
|  6|version-style|    16|           0|        false|               13|  
|  7| url_variable|    10|           2|        false|                9| 
|  8| url_variable|    10|           2|        false|                9|
|  9|     Sentence|   172|          11|        false|              117| 
| 10|    contextid|    90|           2|        false|               15| 
| 11|     Sentence|   170|          11|        false|              114|
| 12|version-style|    16|           0|        false|               13|
| 13|     Sentence|    68|          10|        false|               59|
| 14|       String|    12|           0|         true|               11|
| 15|     Sentence|   173|          11|        false|              118|
| 16|       String|    12|           0|         true|               11|
| 17|     Sentence|   132|           8|        false|               96|
| 18|       String|    12|           0|         true|               11|
| 19|    contextid|    88|           2|        false|               15|
+---+-------------+------+------------+-------------+-----------------+

I tried following scripts unsuccessfully due to presence of index column id:

from pyspark.sql import functions as F
from pyspark.sql import Window
bo = features_sdf.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')

sdf2 = (
      bo.na.fill(0).withColumn(
          'Freq',
          F.count("*").over(Window.partitionBy(bo.columns)) 
      ).withColumn(
          'MaxFreq',
          F.max('Freq').over(Window.partitionBy())
      ).withColumn(
          'MinFreq',
          F.min('Freq').over(Window.partitionBy())
      )
  )

sdf2.show()
#bad result due to existence of id column which makes every record unique and causes Freq=1
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
| id|         Type|Length|Token_number|Encoding_type|Character_feature|Freq|MaxFreq|MinFreq|
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
|  0|     Sentence|  4014|         198|        false|              136|   1|      1|      1|
|  1|    contextid|    90|           2|        false|               15|   1|      1|      1|
|  2|     Sentence|   172|          11|        false|              118|   1|      1|      1|
|  3|       String|    12|           0|         true|               11|   1|      1|      1|
|  4|version-style|    16|           0|        false|               13|   1|      1|      1|
|  5|     Sentence|   339|          42|        false|              110|   1|      1|      1|
|  6|version-style|    16|           0|        false|               13|   1|      1|      1|
|  7| url_variable|    10|           2|        false|                9|   1|      1|      1|
|  8| url_variable|    10|           2|        false|                9|   1|      1|      1|
|  9|     Sentence|   172|          11|        false|              117|   1|      1|      1|
| 10|    contextid|    90|           2|        false|               15|   1|      1|      1|
| 11|     Sentence|   170|          11|        false|              114|   1|      1|      1|
| 12|version-style|    16|           0|        false|               13|   1|      1|      1|
| 13|     Sentence|    68|          10|        false|               59|   1|      1|      1|
| 14|       String|    12|           0|         true|               11|   1|      1|      1|
| 15|     Sentence|   173|          11|        false|              118|   1|      1|      1|
| 16|       String|    12|           0|         true|               11|   1|      1|      1|
| 17|     Sentence|   132|           8|        false|               96|   1|      1|      1|
| 18|       String|    12|           0|         true|               11|   1|      1|      1|
| 19|    contextid|    88|           2|        false|               15|   1|      1|      1|
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+  

If I exclude index column id the code works but somehow it messes up the order (due to unwanted sorting/ordering) and results are not going to be assigned to the right record/row as follows:

+--------+------+------------+-------------+-----------------+----+-------+-------+
|    Type|Length|Token_number|Encoding_type|Character_feature|Freq|MaxFreq|MinFreq|
+--------+------+------------+-------------+-----------------+----+-------+-------+
|Sentence|     7|           1|        false|                6|   2|   1665|      1|
|Sentence|     7|           1|        false|                6|   2|   1665|      1|
|Sentence|    17|           4|        false|               14|   6|   1665|      1|
|Sentence|    17|           4|        false|               14|   6|   1665|      1|
|Sentence|    17|           4|        false|               14|   6|   1665|      1|
|Sentence|    17|           4|        false|               14|   6|   1665|      1|
|Sentence|    17|           4|        false|               14|   6|   1665|      1|
|Sentence|    17|           4|        false|               14|   6|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
|Sentence|    19|           4|        false|               17|  33|   1665|      1|
+--------+------+------------+-------------+-----------------+----+-------+-------+

In the end, I wanted to add this function and normalized it between 0 and 1 using simple mathematic formula and use it as a new feature. during normalizing I faced problems also and get null values. I already implemented the pythonic version and it is so easy but I'm so fed up in spark:

#Statistical Preprocessing
def add_freq_to_features(df):
  frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
  frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
  new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
  
  return new_df

# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(oba)
features_df.head(20)

and it turns following right results as I expected:

img

I also tried to litterally translted the pythonic scripts using df.groupBy(df.columns).count() but I couldn't:

# this is to build "raw" Freq based on @pltc answer
sdf2 = (sdf
    .groupBy(sdf.columns)
    .agg(F.count('*').alias('Freq'))
    .withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
)
sdf2.cache().count()
sdf2.show()

Here is the PySpark full code of what we have tried on simplified example available in this colab notebook based on the answer of @ggordon:

def add_freq_to_features_(df):
  from pyspark.sql import functions as F
  from pyspark.sql import Window
  sdf_pltc = df.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
  print("Before Any Modification") # only included for debugging purposes
  sdf_pltc.show(5,truncate=0)

  # fill missing values with 0 using `na.fill(0)` before applying count as window function
  sdf2 = (
      sdf_pltc.na.fill(0).withColumn(
          'Freq',
          F.count("*").over(Window.partitionBy(sdf_pltc.columns)) 
      ).withColumn(
          'MaxFreq',
          F.max('Freq').over(Window.partitionBy())
      ).withColumn(
          'MinFreq',
          F.min('Freq').over(Window.partitionBy())
      )
       .withColumn('id' ,  F.col('id')) 
  )
  print("After replacing null with 0 and counting by partitions") # only included for debugging purposes
  # use orderby as your last operation, only included here for debugging purposes
  #sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
  sdf2.show(5,truncate=False) # only included for debugging purposes
  sdf2 = (
      sdf2.withColumn('Freq' , F.when(
           F.col('MaxFreq')==0.000000000 , 0
           ).otherwise(
              (F.col('Freq')-F.col('MinFreq')) / (F.col('MaxFreq') - F.col('MinFreq'))
           )  
      )    # Normalzing between 0 & 1
  )
  sdf2 = sdf2.drop('MinFreq').drop('MaxFreq')
  sdf2 = sdf2.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
  #sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
  print("After normalization, encoding transformation and order by ") # only included for debugging purposes
  sdf2.show(50,truncate=False)
  
  return sdf2

Sadly due to dealing BigData I can't hack it with df.toPandas() it is inexpensive and cause OOM error. Any help will be forever appreciated.


Solution

  • The pandas behavior is different because the ID field is the DataFrame index so it does not count in the "group by all" you do. You can get the same behavior in Spark with one change.

    partitionBy takes any ordinary list of strings, Try removing the id column from your partition key list like this:

    bo = features_sdf.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
    partition_columns = bo.columns.remove('id')
    
    sdf2 = (
          bo.na.fill(0).withColumn(
              'Freq',
              F.count("*").over(Window.partitionBy(partition_columns))
          ).withColumn(
              'MaxFreq',
              F.max('Freq').over(Window.partitionBy())
          ).withColumn(
              'MinFreq',
              F.min('Freq').over(Window.partitionBy())
          )
      )
    

    That will give you the results you said worked but keep the ID field. You'll need to figure out how to do the division for your frequencies but that should get you started.