Search code examples
apache-sparkpysparkspark-streamingdstream

Transformed DStream in pyspark gives error when pprint called on it


I'm exploring Spark Streaming through PySpark, and hitting an error when I try to use the transform function with take.

I can successfully use sortBy against the DStream via transform and pprint the result.

author_counts_sorted_dstream = author_counts_dstream.transform\
  (lambda foo:foo\
   .sortBy(lambda x:x[0].lower())\
   .sortBy(lambda x:x[1],ascending=False))
author_counts_sorted_dstream.pprint()

But if I use take following the same pattern and try to pprint it:

top_five = author_counts_sorted_dstream.transform\
  (lambda rdd:rdd.take(5))
top_five.pprint()

the job fails with

Py4JJavaError: An error occurred while calling o25.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/local/spark/python/pyspark/streaming/util.py", line 67, in call
    return r._jrdd
AttributeError: 'list' object has no attribute '_jrdd'

You can see the full code and output in the notebook here.

What am I doing wrong?


Solution

  • Function you pass to transform should transform from RDD to RDD. If you use an action, like take, you have to convert the result back to RDD:

    sc: SparkContext = ...
    
    author_counts_sorted_dstream.transform(
      lambda rdd: sc.parallelize(rdd.take(5))
    )
    

    In contrast RDD.sortBy used is a transformation (returns an RDD) so there is no need for further parallelization.

    On a side note following function:

    lambda foo: foo \
        .sortBy(lambda x:x[0].lower()) \
        .sortBy(lambda x:x[1], ascending=False)
    

    doesn't make much sense. Remember that Spark sort by shuffle therefore it is not stable. If you want to sort by multiple fields you should use a composite key like:

    lambda x: (x[0].lower(), -x[1])