Search code examples
apache-sparkpysparkapache-spark-sqlwindow-functions

CodeGen grows beyond 64 KB error when normalizing large PySpark dataframe


I have a PySpark dataframe with 13 million rows and 800 columns. I need to normalize this data so have been using this code, which works with a smaller development data set.

def z_score_w(col, w):
    avg_ = avg(col).over(w)
    stddev_ = stddev_pop(col).over(w)
    return (col - avg_) / stddev_

w = Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    
norm_exprs = [z_score_w(signalsDF[x], w).alias(x) for x in signalsDF.columns]

normDF = signalsDF.select(norm_exprs)

However, when using the full data set I run into an exception with the codegen:

        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893
)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
        at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
        at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
        at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
        ... 44 more
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.
spark.sql.catalyst.expressions.GeneratedClass$SpecificMutableProjection" grows beyond 64 KB
        at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
        at org.codehaus.janino.CodeContext.write(CodeContext.java:836)
        at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:10251)
        at org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:8933)
        at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4346)
        at org.codehaus.janino.UnitCompiler.access$7100(UnitCompiler.java:185)
        at org.codehaus.janino.UnitCompiler$10.visitBooleanLiteral(UnitCompiler.java:3267)

There are a few Spark JIRA issues around that appear similar, but these are all marked resolved. There is also this SO question which is relevant, but the answer is an alternative technique.

I have my own workaround where I normalize batches of columns of the dataframe. This works, but I end up with multiple dataframes that I then have to join, which is slow.

So, my question is - is there an alternative technique for normalizing large dataframes that I'm missing?

I'm using spark-2.0.1.


Solution

  • One obvious problem is the way you use window functions. The following frame:

    Window().partitionBy().rowsBetween(-sys.maxsize, sys.maxsize)    
    

    is a bit useless in practice. Without partition column it reshuffles all data to a single partition first. This method of scaling is useful only to perform scaling in groups.

    Spark provides two classes which can be used to scale features:

    • pyspark.ml.feature.StandardScaler
    • pyspark.mllib.feature.StandardScaler

    Unfortunately both require Vector data as an input. With ML

    from pyspark.ml.feature import StandardScaler as MLScaler, VectorAssembler
    from pyspark.ml import Pipeline
    
    scaled = Pipeline(stages=[
        VectorAssembler(inputCols=df.columns, outputCol="features"), 
        MLScaler(withMean=True, inputCol="features", outputCol="scaled")
    ]).fit(df).transform(df).select("scaled")
    

    This require further expanding of the scaled column if you need the original shape.

    With MLlib:

    from pyspark.mllib.feature import StandardScaler as MLLibScaler
    from pyspark.mllib.linalg import DenseVector
    
    rdd = df.rdd.map(DenseVector)
    scaler = MLLibScaler(withMean=True, withStd=True)
    
    scaler.fit(rdd).transform(rdd).map(lambda v: v.array.tolist()).toDF(df.columns)
    

    The latter method can be more useful if there is a codegen issues related to the number of columns.

    Another way you can approach this problem to compute global statistics

    from pyspark.sql.functions import avg, col, stddev_pop, struct
    
    stats = df.agg(*[struct(avg(c), stddev_pop(c)) for c in df.columns]).first()
    

    and select:

    df.select(*[
        ((col(c) - mean) / std).alias(c)
        for (c, (mean, std)) in zip(df.columns, stats)
    ])
    

    Following your comments the simplest solution you can think can be expressed using NumPy and a few basic transformations:

    rdd = df.rdd.map(np.array)  # Convert to RDD of NumPy vectors
    stats = rdd.stats()  # Compute mean and std
    scaled = rdd.map(lambda v: (v - stats.mean()) / stats.stdev())  # Normalize
    

    and converted back to DataFrame:

    scaled.map(lambda x: x.tolist()).toDF(df.columns)