I'm trying to add a new column named Freq
to the given spark dataframe without affecting on index column or records' order of frame to assign back results of Statistic frequency (which is counts) to right row/incident/event/record in dataframe.
This is my data frame:
+---+-------------+------+------------+-------------+-----------------+
| id| Type|Length|Token_number|Encoding_type|Character_feature|
+---+-------------+------+------------+-------------+-----------------+
| 0| Sentence| 4014| 198| false| 136|
| 1| contextid| 90| 2| false| 15|
| 2| Sentence| 172| 11| false| 118|
| 3| String| 12| 0| true| 11|
| 4|version-style| 16| 0| false| 13|
| 5| Sentence| 339| 42| false| 110|
| 6|version-style| 16| 0| false| 13|
| 7| url_variable| 10| 2| false| 9|
| 8| url_variable| 10| 2| false| 9|
| 9| Sentence| 172| 11| false| 117|
| 10| contextid| 90| 2| false| 15|
| 11| Sentence| 170| 11| false| 114|
| 12|version-style| 16| 0| false| 13|
| 13| Sentence| 68| 10| false| 59|
| 14| String| 12| 0| true| 11|
| 15| Sentence| 173| 11| false| 118|
| 16| String| 12| 0| true| 11|
| 17| Sentence| 132| 8| false| 96|
| 18| String| 12| 0| true| 11|
| 19| contextid| 88| 2| false| 15|
+---+-------------+------+------------+-------------+-----------------+
I tried following scripts unsuccessfully due to presence of index column id
:
from pyspark.sql import functions as F
from pyspark.sql import Window
bo = features_sdf.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
sdf2 = (
bo.na.fill(0).withColumn(
'Freq',
F.count("*").over(Window.partitionBy(bo.columns))
).withColumn(
'MaxFreq',
F.max('Freq').over(Window.partitionBy())
).withColumn(
'MinFreq',
F.min('Freq').over(Window.partitionBy())
)
)
sdf2.show()
#bad result due to existence of id column which makes every record unique and causes Freq=1
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
| id| Type|Length|Token_number|Encoding_type|Character_feature|Freq|MaxFreq|MinFreq|
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
| 0| Sentence| 4014| 198| false| 136| 1| 1| 1|
| 1| contextid| 90| 2| false| 15| 1| 1| 1|
| 2| Sentence| 172| 11| false| 118| 1| 1| 1|
| 3| String| 12| 0| true| 11| 1| 1| 1|
| 4|version-style| 16| 0| false| 13| 1| 1| 1|
| 5| Sentence| 339| 42| false| 110| 1| 1| 1|
| 6|version-style| 16| 0| false| 13| 1| 1| 1|
| 7| url_variable| 10| 2| false| 9| 1| 1| 1|
| 8| url_variable| 10| 2| false| 9| 1| 1| 1|
| 9| Sentence| 172| 11| false| 117| 1| 1| 1|
| 10| contextid| 90| 2| false| 15| 1| 1| 1|
| 11| Sentence| 170| 11| false| 114| 1| 1| 1|
| 12|version-style| 16| 0| false| 13| 1| 1| 1|
| 13| Sentence| 68| 10| false| 59| 1| 1| 1|
| 14| String| 12| 0| true| 11| 1| 1| 1|
| 15| Sentence| 173| 11| false| 118| 1| 1| 1|
| 16| String| 12| 0| true| 11| 1| 1| 1|
| 17| Sentence| 132| 8| false| 96| 1| 1| 1|
| 18| String| 12| 0| true| 11| 1| 1| 1|
| 19| contextid| 88| 2| false| 15| 1| 1| 1|
+---+-------------+------+------------+-------------+-----------------+----+-------+-------+
If I exclude index column id
the code works but somehow it messes up the order (due to unwanted sorting/ordering) and results are not going to be assigned to the right record/row as follows:
+--------+------+------------+-------------+-----------------+----+-------+-------+
| Type|Length|Token_number|Encoding_type|Character_feature|Freq|MaxFreq|MinFreq|
+--------+------+------------+-------------+-----------------+----+-------+-------+
|Sentence| 7| 1| false| 6| 2| 1665| 1|
|Sentence| 7| 1| false| 6| 2| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 17| 4| false| 14| 6| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
|Sentence| 19| 4| false| 17| 33| 1665| 1|
+--------+------+------------+-------------+-----------------+----+-------+-------+
In the end, I wanted to add this function and normalized it between 0
and 1
using simple mathematic formula and use it as a new feature. during normalizing I faced problems also and get null
values. I already implemented the pythonic version and it is so easy but I'm so fed up in spark:
#Statistical Preprocessing
def add_freq_to_features(df):
frequencies_df = df.groupby(list(df.columns)).size().to_frame().rename(columns={0: "Freq"})
frequencies_df["Freq"] = frequencies_df["Freq"] / frequencies_df["Freq"].sum() # Normalzing 0 & 1
new_df = pd.merge(df, frequencies_df, how='left', on=list(df.columns))
return new_df
# Apply frequency allocation and merge with extracted features df
features_df = add_freq_to_features(oba)
features_df.head(20)
and it turns following right results as I expected:
I also tried to litterally translted the pythonic scripts using df.groupBy(df.columns).count()
but I couldn't:
# this is to build "raw" Freq based on @pltc answer
sdf2 = (sdf
.groupBy(sdf.columns)
.agg(F.count('*').alias('Freq'))
.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
)
sdf2.cache().count()
sdf2.show()
Here is the PySpark full code of what we have tried on simplified example available in this colab notebook based on the answer of @ggordon:
def add_freq_to_features_(df):
from pyspark.sql import functions as F
from pyspark.sql import Window
sdf_pltc = df.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
print("Before Any Modification") # only included for debugging purposes
sdf_pltc.show(5,truncate=0)
# fill missing values with 0 using `na.fill(0)` before applying count as window function
sdf2 = (
sdf_pltc.na.fill(0).withColumn(
'Freq',
F.count("*").over(Window.partitionBy(sdf_pltc.columns))
).withColumn(
'MaxFreq',
F.max('Freq').over(Window.partitionBy())
).withColumn(
'MinFreq',
F.min('Freq').over(Window.partitionBy())
)
.withColumn('id' , F.col('id'))
)
print("After replacing null with 0 and counting by partitions") # only included for debugging purposes
# use orderby as your last operation, only included here for debugging purposes
#sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
sdf2.show(5,truncate=False) # only included for debugging purposes
sdf2 = (
sdf2.withColumn('Freq' , F.when(
F.col('MaxFreq')==0.000000000 , 0
).otherwise(
(F.col('Freq')-F.col('MinFreq')) / (F.col('MaxFreq') - F.col('MinFreq'))
)
) # Normalzing between 0 & 1
)
sdf2 = sdf2.drop('MinFreq').drop('MaxFreq')
sdf2 = sdf2.withColumn('Encoding_type', F.col('Encoding_type').cast('string'))
#sdf2 = sdf2.orderBy(F.col('Type').desc(),F.col('Length').desc() )
print("After normalization, encoding transformation and order by ") # only included for debugging purposes
sdf2.show(50,truncate=False)
return sdf2
Sadly due to dealing BigData I can't hack it with df.toPandas()
it is inexpensive and cause OOM error.
Any help will be forever appreciated.
The pandas behavior is different because the ID field is the DataFrame index so it does not count in the "group by all" you do. You can get the same behavior in Spark with one change.
partitionBy takes any ordinary list of strings, Try removing the id column from your partition key list like this:
bo = features_sdf.select('id', 'Type', 'Length', 'Token_number', 'Encoding_type', 'Character_feature')
partition_columns = bo.columns.remove('id')
sdf2 = (
bo.na.fill(0).withColumn(
'Freq',
F.count("*").over(Window.partitionBy(partition_columns))
).withColumn(
'MaxFreq',
F.max('Freq').over(Window.partitionBy())
).withColumn(
'MinFreq',
F.min('Freq').over(Window.partitionBy())
)
)
That will give you the results you said worked but keep the ID field. You'll need to figure out how to do the division for your frequencies but that should get you started.