I'm exploring Spark Streaming through PySpark, and hitting an error when I try to use the transform
function with take
.
I can successfully use sortBy
against the DStream
via transform
and pprint
the result.
author_counts_sorted_dstream = author_counts_dstream.transform\
(lambda foo:foo\
.sortBy(lambda x:x[0].lower())\
.sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()
But if I use take
following the same pattern and try to pprint
it:
top_five = author_counts_sorted_dstream.transform\
(lambda rdd:rdd.take(5))
top_five.pprint()
the job fails with
Py4JJavaError: An error occurred while calling o25.awaitTermination. : org.apache.spark.SparkException: An exception was raised by Python: Traceback (most recent call last): File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call return r._jrdd AttributeError: 'list' object has no attribute '_jrdd'
You can see the full code and output in the notebook here.
What am I doing wrong?
Function you pass to transform
should transform from RDD
to RDD
. If you use an action, like take
, you have to convert the result back to RDD
:
sc: SparkContext = ...
author_counts_sorted_dstream.transform(
lambda rdd: sc.parallelize(rdd.take(5))
)
In contrast RDD.sortBy
used is a transformation (returns an RDD) so there is no need for further parallelization.
On a side note following function:
lambda foo: foo \
.sortBy(lambda x:x[0].lower()) \
.sortBy(lambda x:x[1], ascending=False)
doesn't make much sense. Remember that Spark sort by shuffle therefore it is not stable. If you want to sort by multiple fields you should use a composite key like:
lambda x: (x[0].lower(), -x[1])