I have a dataframe like this:
column_1 column_2 column_3 column_4 column_5 column_6 column_7
34432 apple banana mango pine lemon j93jk84
98389 grape orange pine kiwi cherry j93jso3
94749 apple banana mango pine lemon ke03jfr
48948 apple banana mango pine lemon 9jef3f4
. . . . . . .
90493 pear apricot papaya plum lemon 93jd30d
90843 grape orange pine kiwi cherry 03nd920
I want to have two dataframes.
Dataframe_1:
I want to ignore column_1 and column_7, and drop all the duplicated data and keep only unique rows based on all other columns.
Dataframe_2:
column_1 column_2 column_3 column_4 column_5 column_6 column_7 type Tag
34432 apple banana mango pine lemon j93jk84 unique 1
98389 grape orange pine kiwi cherry j93jso3 unique 2
94749 apple banana mango pine lemon ke03jfr duplicated 1
48948 apple banana mango pine lemon 9jef3f4 duplicated 1
. . . . . . .
90493 pear apricot papaya plum lemon 93jd30d unique 3
90843 grape orange pine kiwi cherry 03nd920 duplicated 2
As you can see in the example daraframe_2, I need two new column's "type" which specifies if the row is either a unique or duplicated. "tag" to easily identify which is the unique row and other duplicated rows which belongs to that duplicated row
Can someone tell me, how to achieve both these dataframes in pyspark?
Code I tried:
# to drop the duplicates ignoring column_1 and column_7
df_unique = df.dropDuplicates(["column_6","column_2","column_3","column_4","column_5"])
df_duplicates = df.subtract(df_unique)
# adding a type column to both dataframes and concatinating two dataframes
df_unique = df_unique.withColumn("type", F.lit("unique"))
df_duplicates = df_duplicated.withColumn("type", F.lit("duplicate"))
df_combined = df_unique.unionByName(df_duplicates )
# unable to create the tag column
..
If I understood your question correctly, essentially you need to -
Let me know if this is not the case
Using row_number
: Use all columns to compare as the partition key and generate Row Number for each partition, if there are more rows with for a set of column values - they'll fall in same set and their will have row_number accordingly. (you can use orderBy
to mark specific rows unique if that's a requirement):
df.withColumn("asArray", F.array(*[x for x in df.schema.names if x!="column_1" and x!="column_7"]))\
.withColumn("rn", F.row_number().over(Window.partitionBy("asArray").orderBy(F.lit("dummy"))))\
.withColumn("type", F.when(F.col("rn")==1, "Unique").otherwise("Duplicated"))\
.withColumn("tag", F.dense_rank().over(Window.orderBy(F.col("asArray"))))\
.show(truncate=False)
I've collected the values of all columns to compare in an array to make it easy.
Edit - Output for data similar to your dataset, with duplicates more than 2. Also, corrected tag logic