Search code examples
apache-sparkpysparkrdd

Spark Core How to fetch max n rows of an RDD function without using Rdd.max()


I have an RDD having below elements:

('09', [25, 66, 67])
('17', [66, 67, 39])
('04', [25])
('08', [120, 122])
('28', [25, 67])
('30', [122])

I need to fetch the elements having a max number of elements in the list which is 3 in the above RDD O/p should be filtered into another RDD and not use the max function and spark dataframes:

('09', [25, 66, 67])
('17', [66, 67, 39])
max_len = uniqueRDD.max(lambda x: len(x[1]))
maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == len(max_len[1])))

I am able to do with the above lines of code but spark streaming won't support this as max_len is a tuple and not RDD

Can someone suggest? Thanks in advance


Solution

  • Does this work for you? I tried filtering on the streaming rdds as well. Seems to work.

    from pyspark.sql import Row
    from pyspark.sql.types import *
    from pyspark import SparkContext, SQLContext
    from pyspark.sql.functions import *
    from pyspark.streaming import StreamingContext
    
    sc = SparkContext('local')
    sqlContext = SQLContext(sc)
    ssc = StreamingContext(sc,1)
    
    
    
    data1 = [
        ('09', [25, 66, 67]),
        ('17', [66, 67, 39]),
        ('04', [25]),
        ('08', [120, 122]),
        ('28', [25, 67]),
        ('30', [122])
    ]
    
    
    df1Columns = ["id", "list"]
    df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
    df1.show(20, truncate=False)
    
    uniqueRDD = df1.rdd
    
    
    max_len = uniqueRDD.map(lambda x: len(x[1])).max(lambda x: x)
    maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == max_len))
    
    print("printing out maxlength = ", max_len)
    
    dStream = ssc.queueStream([uniqueRDD])
    
    resultStream = dStream.filter(lambda x : (len(x[1]) == max_len))
    
    print("Printing the filtered streaming result")
    
    def printResultStream(rdd):
        mylist = rdd.collect()
        for ele in mylist:
            print(ele)
    
    resultStream.foreachRDD(printResultStream)
    
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()
    

    Here's output :

    +---+------------+
    |id |list        |
    +---+------------+
    |09 |[25, 66, 67]|
    |17 |[66, 67, 39]|
    |04 |[25]        |
    |08 |[120, 122]  |
    |28 |[25, 67]    |
    |30 |[122]       |
    +---+------------+
    
    printing out maxlength =  3
    Printing the filtered streaming result
    Row(id='09', list=[25, 66, 67])
    Row(id='17', list=[66, 67, 39])
    

    You can try something like this:

    dStream = ssc.queueStream([uniqueRDD, uniqueRDD, uniqueRDD])
    def maxOverRDD(input_rdd):
        if not input_rdd.isEmpty():
            reduced_rdd = input_rdd.reduce(lambda acc, value : value if (len(acc[1]) < len(value[1])) else acc)
            internal_result = input_rdd.filter(lambda x: len(x[1]) == len(reduced_rdd[1]))
            return internal_result
    
    
    result = dStream.transform(maxOverRDD)
    print("Printing the finalStream")
    result.foreachRDD(printResultStream)
    

    Output would be like (Output is repeated because the same RDD is provided 3 times in the stream):

    Printing the finalStream
    Row(id='09', list=[25, 66, 67])
    Row(id='17', list=[66, 67, 39])
    Row(id='09', list=[25, 66, 67])
    Row(id='17', list=[66, 67, 39])
    Row(id='09', list=[25, 66, 67])
    Row(id='17', list=[66, 67, 39])