I have an RDD having below elements:
('09', [25, 66, 67])
('17', [66, 67, 39])
('04', [25])
('08', [120, 122])
('28', [25, 67])
('30', [122])
I need to fetch the elements having a max number of elements in the list which is 3 in the above RDD O/p should be filtered into another RDD and not use the max function and spark dataframes:
('09', [25, 66, 67])
('17', [66, 67, 39])
max_len = uniqueRDD.max(lambda x: len(x[1]))
maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == len(max_len[1])))
I am able to do with the above lines of code but spark streaming won't support this as max_len is a tuple and not RDD
Can someone suggest? Thanks in advance
Does this work for you? I tried filtering on the streaming rdds as well. Seems to work.
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import *
from pyspark.streaming import StreamingContext
sc = SparkContext('local')
sqlContext = SQLContext(sc)
ssc = StreamingContext(sc,1)
data1 = [
('09', [25, 66, 67]),
('17', [66, 67, 39]),
('04', [25]),
('08', [120, 122]),
('28', [25, 67]),
('30', [122])
]
df1Columns = ["id", "list"]
df1 = sqlContext.createDataFrame(data=data1, schema = df1Columns)
df1.show(20, truncate=False)
uniqueRDD = df1.rdd
max_len = uniqueRDD.map(lambda x: len(x[1])).max(lambda x: x)
maxRDD = uniqueRDD.filter(lambda x : (len(x[1]) == max_len))
print("printing out maxlength = ", max_len)
dStream = ssc.queueStream([uniqueRDD])
resultStream = dStream.filter(lambda x : (len(x[1]) == max_len))
print("Printing the filtered streaming result")
def printResultStream(rdd):
mylist = rdd.collect()
for ele in mylist:
print(ele)
resultStream.foreachRDD(printResultStream)
ssc.start()
ssc.awaitTermination()
ssc.stop()
Here's output :
+---+------------+
|id |list |
+---+------------+
|09 |[25, 66, 67]|
|17 |[66, 67, 39]|
|04 |[25] |
|08 |[120, 122] |
|28 |[25, 67] |
|30 |[122] |
+---+------------+
printing out maxlength = 3
Printing the filtered streaming result
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
You can try something like this:
dStream = ssc.queueStream([uniqueRDD, uniqueRDD, uniqueRDD])
def maxOverRDD(input_rdd):
if not input_rdd.isEmpty():
reduced_rdd = input_rdd.reduce(lambda acc, value : value if (len(acc[1]) < len(value[1])) else acc)
internal_result = input_rdd.filter(lambda x: len(x[1]) == len(reduced_rdd[1]))
return internal_result
result = dStream.transform(maxOverRDD)
print("Printing the finalStream")
result.foreachRDD(printResultStream)
Output would be like (Output is repeated because the same RDD is provided 3 times in the stream):
Printing the finalStream
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])
Row(id='09', list=[25, 66, 67])
Row(id='17', list=[66, 67, 39])