Search code examples
pythonlistdictionarypysparkrdd

Pyspark: can slice list, but can't index


This in pyspark where r_parsed is an RDD,

 r_parsed = r_parsed.map(lambda x: ([k for k in x.keys()][:3]))
 x = r_parsed.collect()[666]
 print(x)

 ['is_quote_status', 'place', 'in_reply_to_user_id']

But then..

r_parsed = r_parsed.map(lambda x: ([k for k in x.keys()][1]))
x = r_parsed.collect()[666]

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 120.0 failed 1 times, most recent failure: Lost task 1.0 in stage 120.0 (TID 241, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/home/filipe/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 229, in main process() File "/home/filipe/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 224, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/home/filipe/anaconda3/lib/python3.5/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 372, in dump_stream vs = list(itertools.islice(iterator, batch)) File "", line 5, in IndexError: list index out of range

What sorcery is this? Why can I map a list slice, but not a list index?


Solution

  • This is because the slicing syntax can handle the case where the length of the list is less than the slice length.

    For example:

    my_list = [1]
    print(my_list[:3])
    #[1]
    

    However, indexing is not as forgiving:

    print(my_list[1])
    IndexErrorTraceback (most recent call last)
    <ipython-input-2-f6bea31a1a9e> in <module>()
    ----> 1 print(my_list[1])
    
    IndexError: list index out of range
    

    Somewhere in your rdd, you have a row where the length of the keys is less than 2.

    You may want to first filter:

    r_parsed = r_parsed.filter(lambda x: len(x.keys()) > 1).map(lambda x: x.keys()[1])
    

    Or build the logic into your map function:

    r_parsed = r_parsed.map(lambda x: x.keys()[1] if len(x.keys()) > 1 else None)