Search code examples
apache-sparkpysparkapache-spark-sqlunique

create a dataframe with the duplicated and non-duplicated rows


I have a dataframe like this:

column_1   column_2   column_3   column_4  column_5  column_6  column_7
 34432      apple      banana     mango     pine     lemon     j93jk84
 98389      grape      orange     pine      kiwi     cherry    j93jso3
 94749      apple      banana     mango     pine     lemon    ke03jfr
 48948      apple      banana     mango     pine     lemon     9jef3f4
  .         .          .          .         .        .         .       
 90493      pear       apricot    papaya    plum     lemon     93jd30d
 90843      grape      orange     pine      kiwi     cherry    03nd920

I want to have two dataframes.

Dataframe_1:

I want to ignore column_1 and column_7, and drop all the duplicated data and keep only unique rows based on all other columns.

Dataframe_2:

column_1   column_2   column_3   column_4  column_5  column_6  column_7  type          Tag
 34432      apple      banana     mango     pine     lemon     j93jk84   unique         1
 98389      grape      orange     pine      kiwi     cherry    j93jso3   unique         2
 94749      apple      banana     mango     pine     lemon    ke03jfr   duplicated     1
 48948      apple      banana     mango     pine     lemon     9jef3f4   duplicated     1
  .         .          .          .         .        .         .       
 90493      pear       apricot    papaya    plum     lemon     93jd30d   unique         3
 90843      grape      orange     pine      kiwi     cherry    03nd920   duplicated     2

As you can see in the example daraframe_2, I need two new column's "type" which specifies if the row is either a unique or duplicated. "tag" to easily identify which is the unique row and other duplicated rows which belongs to that duplicated row

Can someone tell me, how to achieve both these dataframes in pyspark?

Code I tried:

# to drop the duplicates ignoring column_1 and column_7
df_unique = df.dropDuplicates(["column_6","column_2","column_3","column_4","column_5"])

df_duplicates = df.subtract(df_unique)
# adding a type column to both dataframes and concatinating two dataframes

df_unique = df_unique.withColumn("type", F.lit("unique"))
df_duplicates = df_duplicated.withColumn("type", F.lit("duplicate"))
df_combined = df_unique.unionByName(df_duplicates )

# unable to create the tag column
..

Solution

  • If I understood your question correctly, essentially you need to -

    1. Tag the first row as unique
    2. Tag all subsequent rows as duplicate if the values of all columns are the same except column_1 and column_2

    Let me know if this is not the case

    Using row_number: Use all columns to compare as the partition key and generate Row Number for each partition, if there are more rows with for a set of column values - they'll fall in same set and their will have row_number accordingly. (you can use orderBy to mark specific rows unique if that's a requirement):

    df.withColumn("asArray", F.array(*[x for x in df.schema.names if x!="column_1" and x!="column_7"]))\
    .withColumn("rn", F.row_number().over(Window.partitionBy("asArray").orderBy(F.lit("dummy"))))\
    .withColumn("type", F.when(F.col("rn")==1, "Unique").otherwise("Duplicated"))\
    .withColumn("tag", F.dense_rank().over(Window.orderBy(F.col("asArray"))))\
    .show(truncate=False)
    

    I've collected the values of all columns to compare in an array to make it easy.

    Edit - Output for data similar to your dataset, with duplicates more than 2. Also, corrected tag logic

    Input: Input

    Output: Output