Search code examples
pysparkuniquedistinctaggregationreduce

Pyspark: How to get a unique value pairs with reduceByKey()? (without using Distinct() method)


I am trying to get unique value pairs from multiple columns of [1,2,3]. The size of data is very large with multiple files (the total size about 1TB).

I just want to filter the lines with "client" string and grep the columns of [1,2,3] that are unique in each file. I have used tuple and Distinct() function first, but the process stops with Java memory error.

if __name__ == "__main__":
    sc=SparkContext(appName="someapp")
    cmd = 'hdfs dfs -ls /user/path'.split()
    files = subprocess.check_output(cmd).strip().split('\n')
    rdds=[]
    for ff in files[1:]:
        rdd=sc.textFile(ff.split()[-1])
        rdd2=rdd.filter(lambda x: "client" in x.lower())
        rdd3=rdd2.map(lambda x: tuple(x.split("\t")[y] for y in [1,2,3]))
        rdd4=rdd3.distinct()
        rdds.append(rdd4)

     rdd0=sc.union(rdds)
     rdd0.collect()
     rdd0.saveAsTextFile('/somedir')

So I have tried another script using reduceByKey() method, which works fine.

if __name__ == "__main__":
        sc=SparkContext(appName="someapp")
        cmd = "hdfs dfs -ls airties/eventU".split()
        files = subprocess.check_output(cmd).strip().split('\n')
        rdds=[]
        for ff in files[1:]:
                rdd=sc.textFile(ff.split()[-1])
                rdd2=rdd.filter(lambda x: "client" in x.lower())
                rdd3=rdd2.map(lambda x: ','.join([x.split("\t")[y] for y in [1,2,3]]))
                rdds.append(rdd3)
        rdd0=sc.union(rdds)
        rddA=rdd0.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
        rddA.collect()
        rddA.saveAsTextFile('/somedir')

But I try to understand why Distinct() does not work well, but reduceByKey() method works. Is distinct() not a proper way to find unique values?

Also trying to find is there a better way to optimize the processing of multiple files finding unique value pairs in each file and aggregate them. When each files contain exclusive contents, then I just need to apply unique to each file and aggregate altogether in the final step. But it seems my current code creates too much overhead to the system.

The data is like this: Lots of redundancy

+-----+---+------+----------+
|1    |2  |3     |4         |
+-----+---+------+----------+
|    1|  1|     A|2017-01-01|
|    2|  6|client|2017-01-02|
|    2|  3|     B|2017-01-02|
|    3|  5|     A|2017-01-03|
|    3|  5|client|2017-01-03|
|    2|  2|client|2017-01-02|
|    3|  5|     A|2017-01-03|
|    1|  3|     B|2017-01-02|
|    3|  5|client|2017-01-03|
|    3|  5|client|2017-01-04|
+-----+---+------+----------+

The data is like this: Lots of redundancy

+-----+---+------+
|1    |2  |3     |
+-----+---+------+
|    2|  6|client|
|    3|  5|client|
|    2|  2|client|
|    3|  5|client|
|    3|  5|client|
+-----+---+------+

Column 3 is redundant, but just use this as an example scenario.


Solution

  • distinct is implemented using reduceByKey. In effect, you just reimplemented distinct (nearly) exactly how it is currently implemented.

    But, your 2 code snippets are not equal. In the

    First snippet it will

    • Process RDD
    • Save distinct elements of RDD
    • Append RDD to a list, to later create an aggregate RDD

    Second snippet it will

    • Process RDD
    • Append RDD to a list, to later create an aggregate RDD
    • Save distinct elements among the aggregate RDD

    If there are duplicate rows in different files, they will be duplicated in first snippet, but not the 2nd. This may be why you're running out of memory. Note, bringing all of the records to the driver with the collect call is not necessary here and will significantly hurt performance