Search code examples
pythonjsonhashpysparkdata-partitioning

How to detect duplicates in large json file using PySpark HashPartitioner


I have a large json file with over 20GB of json-structured metadata. It contains simple user metadata across some application, and I would like to sift through it to detect duplicates. Here is an example of how the data looks like:

{"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
{"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

The json file contains, line by line, json objects that look very similar to this. A duplicate occures when the "name" field of two json objects are the same. So, this is a duplicate:

{"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}
{"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}

Just as much as two json objects that are exactly the same.

Now, I want to go through an entire json file that is way too large to fit into memory and, by using the best standard, figure out all the duplicates and what they are duplicates of and then do some logic - the logic part is trivial, but I am somewhat not sure how to find the duplicates.

What I thought about:

  1. The first thing I considered using was a bloom filter. They aren't that confusing and work pretty well and fast, and I think they essentially boil down to O(n). However, bloom filters will not let me know what the duplicate string is a duplicate of, which is a nogo for me.

  2. I thought about using external merge sort. I would basically partition the file into multiple smaller files that would fit into memory, sort each chunk and search for duplicates (which are now clustered together). But I'm not terribly sure this implementation is what I want.

  3. The next thing I ran across was hashing by partition, which I suspect is what I want. Hashing is essentially the best way to find duplicates when dealing with data that fits in memory, so why not use it for something that doesn't? I am a bit confused about how to hash by partition though. I am not sure if this is what I'm looking for.

So, I think I am supposed to use option 3, hashing by partition, and I know that Spark has that. I was hoping if someone could let me know if I am on the right track, and maybe give me some instructions on whether or not I am correct. There are a couple of specific questions I have, conceptually:

  1. Let us say I create 100 partitions that fit perfectly into memory (so, in my case, each partition would be 100MB). Let us say I hash the first x elements in my json file into one partition and I find no duplicates. Let us say I have another partition with the second 100MB of data that also contains no duplicates. If I can only load 100MB of data at a time, how would I check that partition 1 and partition 2 do not have any duplicates from each other? To clarify, if partition 1 has an element and partition 2 has an element that is the same, how do I figure that out? I imagine I would need to load both into memory, right? And if I can't... then what do I do? Maybe I am misunderstanding...

  2. Which brings be to my second question - it seems like this is not how partitioning works, and when you hash by partition, elements with a similar hash or hash range go into a specific file. So if two elements are duplicates, I would know because the algorithm would try and put it into a file where the hash already exists. Is that the case?

I know I have more questions, I just can't think of them. Does anyone have any tips? Especially regarding pyspark and how to use this best? Or is pyspark not what I'm looking for?


Solution

  • The problem is simpler than you might think. You really only need to aggregate the data by name as @Hitobat suggests. I would solve the problem with pyspark.sql.Window to simplify the aggregation output.

    Given the data as follows is a file with name data.json (this could also be a directory of files as opposed to the single file)

    Content of data.json

    {"created": "2015-08-04", "created_at": "2010-03-15", "username": "koleslawrulez333"}
    {"created": "2016-01-19", "created_at": "2012-05-25", "name": "arthurking231"}
    {"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "starklord1943"}
    {"created": "2015-11-08", "created_at": "2010-01-19", "name": "Assasinator5827"}
    {"created": "2016-07-23", "type": "Username", "created_at": "2011-08-27", "name": "Assasinator5827"}
    

    Then the pyspark code would look like this:

    from pyspark.sql import Window
    from pyspark.sql import functions as F
    
    df = spark.read.json("data.json") # can be a directory of files as well 
    df.show()
    

    Output

    +----------+----------+---------------+--------+----------------+
    |   created|created_at|           name|    type|        username|
    +----------+----------+---------------+--------+----------------+
    |2015-08-04|2010-03-15|           null|    null|koleslawrulez333|
    |2016-01-19|2012-05-25|  arthurking231|    null|            null|
    |2016-07-23|2011-08-27|  starklord1943|Username|            null|
    |2015-11-08|2010-01-19|Assasinator5827|    null|            null|
    |2016-07-23|2011-08-27|Assasinator5827|Username|            null|
    +----------+----------+---------------+--------+----------------+ 
    

    Then partition and count with pyspark.sql.Window

    name_partition_window = Window.partitionBy("name")
    df_with_repeat_counts = df.select("*", F.count("*").over(name_partition_window).alias("name_counts"))
    df_with_repeat_counts.show()
    

    Output

    +----------+----------+---------------+--------+----------------+-----------+
    |   created|created_at|           name|    type|        username|name_counts|
    +----------+----------+---------------+--------+----------------+-----------+
    |2016-01-19|2012-05-25|  arthurking231|    null|            null|          1|
    |2015-08-04|2010-03-15|           null|    null|koleslawrulez333|          1|
    |2015-11-08|2010-01-19|Assasinator5827|    null|            null|          2|
    |2016-07-23|2011-08-27|Assasinator5827|Username|            null|          2|
    |2016-07-23|2011-08-27|  starklord1943|Username|            null|          1|
    +----------+----------+---------------+--------+----------------+-----------+
    

    Then filter the dataframe on the name_count column and order by name for inspection

    duplicates = df_with_repeat_counts.where(F.col("name_counts") > 1).orderBy("name")
    duplicates.show()
    

    Output

    +----------+----------+---------------+--------+--------+-----------+
    |   created|created_at|           name|    type|username|name_counts|
    +----------+----------+---------------+--------+--------+-----------+
    |2015-11-08|2010-01-19|Assasinator5827|    null|    null|          2|
    |2016-07-23|2011-08-27|Assasinator5827|Username|    null|          2|
    +----------+----------+---------------+--------+--------+-----------+
    

    At this point you can analyze the duplicates dataframe as needed for your use case.