I have a Spark DataFrame like this:
+-------+------+-----+---------------+
|Account|nature|value| time|
+-------+------+-----+---------------+
| a| 1| 50|10:05:37:293084|
| a| 1| 50|10:06:46:806510|
| a| 0| 50|11:19:42:951479|
| a| 1| 40|19:14:50:479055|
| a| 0| 50|16:56:17:251624|
| a| 1| 40|16:33:12:133861|
| a| 1| 20|17:33:01:385710|
| b| 0| 30|12:54:49:483725|
| b| 0| 40|19:23:25:845489|
| b| 1| 30|10:58:02:276576|
| b| 1| 40|12:18:27:161290|
| b| 0| 50|12:01:50:698592|
| b| 0| 50|08:45:53:894441|
| b| 0| 40|17:36:55:827330|
| b| 1| 50|17:18:41:728486|
+-------+------+-----+---------------+
I want to compare nature column of one row to other rows with the same Account and value,I should look forward, and add new column named Repeated. The new column get true for both rows, if nature changed, from 1 to 0 or vise versa. For example, the above dataframe should look like this:
+-------+------+-----+---------------+--------+
|Account|nature|value| time|Repeated|
+-------+------+-----+---------------+--------+
| a| 1| 50|10:05:37:293084| true |
| a| 1| 50|10:06:46:806510| true|
| a| 0| 50|11:19:42:951479| true |
| a| 0| 50|16:56:17:251624| true |
| b| 0| 50|08:45:53:894441| true |
| b| 0| 50|12:01:50:698592| false|
| b| 1| 50|17:18:41:728486| true |
| a| 1| 40|16:33:12:133861| false|
| a| 1| 40|19:14:50:479055| false|
| b| 1| 40|12:18:27:161290| true|
| b| 0| 40|17:36:55:827330| true |
| b| 0| 40|19:23:25:845489| false|
| b| 1| 30|10:58:02:276576| true|
| b| 0| 30|12:54:49:483725| true |
| a| 1| 20|17:33:01:385710| false|
+-------+------+-----+---------------+--------+
My solution is that I have to do group by or window on Account and value columns; then in each group, compare nature of each row to nature of other rows and as a result of comperation, Repeated column become full. I did this calculation with Spark Window functions. Like this:
windowSpec = Window.partitionBy("Account","value").orderBy("time")
df.withColumn("Repeated", coalesce(f.when(lead(df['nature']).over(windowSpec)!=df['nature'],lit(True)).otherwise(False))).show()
The result was like this which is not the result that I wanted:
+-------+------+-----+---------------+--------+
|Account|nature|value| time|Repeated|
+-------+------+-----+---------------+--------+
| a| 1| 50|10:05:37:293084| false|
| a| 1| 50|10:06:46:806510| true|
| a| 0| 50|11:19:42:951479| false|
| a| 0| 50|16:56:17:251624| false|
| b| 0| 50|08:45:53:894441| false|
| b| 0| 50|12:01:50:698592| true|
| b| 1| 50|17:18:41:728486| false|
| a| 1| 40|16:33:12:133861| false|
| a| 1| 40|19:14:50:479055| false|
| b| 1| 40|12:18:27:161290| true|
| b| 0| 40|17:36:55:827330| false|
| b| 0| 40|19:23:25:845489| false|
| b| 1| 30|10:58:02:276576| true|
| b| 0| 30|12:54:49:483725| false|
| a| 1| 20|17:33:01:385710| false|
+-------+------+-----+---------------+--------+
UPDATE: To explain more, if we suppose the first Spark Dataframe is named "df",in the following, I write what exactly want to do in each group of "Account" and "value":
a = df.withColumn('repeated',lit(False))
for i in range(len(group)):
j = i+1
for j in j<=len(group):
if a.loc[i,'nature']!=a.loc[j,'nature'] and a.loc[j,'repeated']==False:
a.loc[i,'repeated'] = True
a.loc[j,'repeated'] = True
Would you please guide me how to do that using Pyspark Window?
Any help is really appreciated.
Problem solved. Even though this way costs a lot,but it's ok.
def check(part):
df = part
size = len(df)
for i in range(size):
if (df.loc[i,'repeated'] == True):
continue
else:
for j in range((i+1),size):
if (df.loc[i,'nature']!=df.loc[j,'nature']) & (df.loc[j,'repeated']==False):
df.loc[j,'repeated'] = True
df.loc[i,'repeated'] = True
break
return df
df.groupby("Account","value").applyInPandas(check, schema="Account string, nature int,value long,time string,repeated boolean").show()
Update1: Another solution without any iterations.
def check(df):
df = df.sort_values('verified_time')
df['index'] = df.index
df['IS_REPEATED'] = 0
df1 = df.sort_values(['nature'],ascending=[True]).reset_index(drop=True)
df2 = df.sort_values(['nature'],ascending=[False]).reset_index(drop=True)
df1['IS_REPEATED']=df1['nature']^df2['nature']
df3 = df1.sort_values(['index'],ascending=[True])
df = df3.drop(['index'],axis=1)
return df
df = df.groupby("account", "value").applyInPandas(gf.check2,schema=gf.get_schema('trx'))
UPDATE2: Solution with Spark window:
def is_repeated_feature(df):
windowPartition = Window.partitionBy("account", "value", 'nature').orderBy('nature')
df_1 = df.withColumn('rank', F.row_number().over(windowPartition))
w = (Window
.partitionBy('account', 'value')
.orderBy('nature')
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing))
df_1 = df_1.withColumn("count_nature", F.count('nature').over(w))
df_1 = df_1.withColumn('sum_nature', F.sum('nature').over(w))
df_1 = df_1.select('*')
df_2 = df_1.withColumn('min_val',
when((df_1.sum_nature > (df_1.count_nature - df_1.sum_nature)),
(df_1.count_nature - df_1.sum_nature)).otherwise(df_1.sum_nature))
df_2 = df_2.withColumn('more_than_one', when(df_2.count_nature > 1, '1').otherwise('0'))
df_2 = df_2.withColumn('is_repeated',
when(((df_2.more_than_one == 1) & (df_2.count_nature > df_2.sum_nature) & (
df_2.rank <= df_2.min_val)), '1')
.otherwise('0'))
return df_2