Search code examples
apache-sparkpysparkapache-spark-sqlrdd

Pyspark reduce function causes StackOverflowError


I'm working with a fairly big dataframe (around 100 thousand rows, with the intent to reach 10 Mil) and it has the following structure:

+------+--------------------+--------+--------------------+-------------------+
|LineId|             Content| EventId|       EventTemplate|          Timestamp|
+------+--------------------+--------+--------------------+-------------------+
|     1|Receiving block b...|ef6f4915|Receiving block <...|2009-11-08 20:35:18|
|     2|BLOCK* NameSystem...|9bc09482|BLOCK* NameSystem...|2009-11-08 20:35:18|
|     3|Receiving block b...|9ca53bce|Receiving block <...|2009-11-08 20:35:19|
+------+--------------------+--------+--------------------+-------------------+

I'd like to add a label and I'm using the following function to do so:

from functools import reduce
label_condition = reduce(lambda a, b: a|b, (df['Content'].like('%'+pat+"%") for pat in blocks))

where blocks is a list containing the block (let's call it a token) defining wether or not a a row is anomalous. This functions checks if the Content field contains any value of the blocks list.
The size of this list is around 17k, which is what I think is causing the problem.

When I try to add this to the dataframe, or simply or evaluate this operation I get the following error:

Py4JJavaError: An error occurred while calling o50581.toString.
: java.lang.StackOverflowError
    at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
    at org.apache.spark.sql.catalyst.util.package$$anonfun$usePrettyExpression$1.applyOrElse(package.scala:128)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDown$1(TreeNode.scala:318)
...

Looking online I saw that it might be a problem with the execution of an overly complex plan from Spark and/or to use checkpoint to avoid this sort of thing, but I'm not sure how to go about it. I tried adding a checkpoint before evaluating this, and I also tried using a select to reduce the df to just the 'Content' column, but to no avail.

I found this solution in Scala to optimize the reduce function, but I don't know how to translate it for python.

Is there a way to optimize this or make it at least go step by step or iteratively to avoid the stack overflow?

Thanks in advance.


Solution

  • you could try using rlike method which accepts regex - pass the regex pattern as 'element1|element2|...'.

    data_sdf. \
        withColumn('label', func.col('log').rlike('|'.join(anomalous_blocks))). \
        show()
    
    # +---+---------------+-----+
    # | id|            log|label|
    # +---+---------------+-----+
    # |  1|Test logX blk_A| true|
    # |  2|Test logV blk_B|false|
    # |  3|Test logF blk_D| true|
    # |  4|Test logD blk_F|false|
    # |  5|Test logB blk_K|false|
    # |  6|Test logY blk_A| true|
    # |  7|Test logE blk_C| true|
    # +---+---------------+-----+