Search code examples
apache-sparkpysparkstrong-typing

Method pow does not exist error Pyspark error


This is a bit strange.

When I create a dataframe and then do some transformations with the function pow, it works. But when I push it to run in a real world scenario, it does not. The datatype of columns in my dummy and the real world scenario is the same

Error

Method pow([class java.lang.Double, class java.lang.Double]) does not exist

This works (with made up data)

from pyspark.sql.types import StructType,StructField, IntegerType, DoubleType

columns = ["CounterpartID","Year","Month","Day","churnprobability", "deadprobability"]
data = [(1234, 2021,5,12, 0.85,0.6),(1224, 2022,6,12, 0.75,0.6),(1345, 2022,5,13, 0.8,0.2),(234, 2021,7,12, 0.9,0.8), (1654, 2021,7,12, 1.40,20.0), (7548, 2021,7,12, -1.40,20.0),  (6582, 2021,7,12, -1.40,20.0)]



schema = StructType([ \
    StructField("CounterpartID",IntegerType(),False), \
    StructField("Year",IntegerType(),False), \
    StructField("Month",IntegerType(),False), \
    StructField("Day", IntegerType(), False), \
    StructField("churnprobability", DoubleType(), False), \
    StructField("deadprobability", DoubleType(), False) \
  ])
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show(truncate=False)

abc=df.withColumn("client_id", f.col("CounterpartID"))\
      .withColumn("year", f.col("Year"))\
      .withColumn("month", f.col("Month"))\
      .withColumn("day", f.col("Day"))\
      .withColumn("churn_probability_unit", f.col("churnprobability").cast(IntegerType()))\
      .withColumn("churn_probability_nanos", ((f.col("churnprobability") - f.col("churnprobability").cast(IntegerType())) * pow(10,9)).cast(IntegerType()))\
      .withColumn("dead_probability_unit", f.col("deadprobability").cast(IntegerType()))\
      .withColumn("dead_probability_nanos", (f.col("deadprobability") %1 * pow(10,9)).cast(IntegerType()))\
      .select("client_id", "year", "month", "day", "churn_probability_unit", "churn_probability_nanos", "dead_probability_unit","dead_probability_nanos")\

abc.show()

However, instead of df, in the real world scenario (Production job), I have a real dataframe ( of course) and all columns in that has the same datatype as my dummy dataframe above:

e.g. here:

Production Dataframe

However, when I do the same transformation, it complains of not able to find the pow function with double parameters. Here's the stack trace (below). I have also looked up the docs here, for pow, but it does not talk anything about datatypes. Some S.O posts suggests Double should be ok. where is it going wrong? I can of course change it multiply by the actual number , instead of using pow, but I would still like to understand this better. Any questions, and I can help answer.

----> 7       .withColumn("churn_probability_nanos", ((f.col("churnprobability") % 1.0) * pow(10,9)).cast(IntegerType()))\
      8       .withColumn("dead_probability_unit", f.col("deadprobability").cast(IntegerType()))\
      9       .withColumn("dead_probability_nanos", (f.col("deadprobability") %1 * pow(10,9)).cast(IntegerType()))\

/databricks/spark/python/pyspark/sql/functions.py in pow(col1, col2)
    737     Returns the value of the first argument raised to the power of the second argument.
    738     """
--> 739     return _invoke_binary_math_function("pow", col1, col2)
    740 
    741 

/databricks/spark/python/pyspark/sql/functions.py in _invoke_binary_math_function(name, col1, col2)
     73     and wraps the result with :class:`~pyspark.sql.Column`.
     74     """
---> 75     return _invoke_function(
     76         name,
     77         # For legacy reasons, the arguments here can be implicitly converted into floats,

/databricks/spark/python/pyspark/sql/functions.py in _invoke_function(name, *args)
     57     """
     58     jf = _get_get_jvm_function(name, SparkContext._active_spark_context)
---> 59     return Column(jf(*args))
     60 
     61 

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    108     def deco(*a, **kw):
    109         try:
--> 110             return f(*a, **kw)
    111         except py4j.protocol.Py4JJavaError as e:
    112             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    328                     format(target_id, ".", name), value)
    329             else:
--> 330                 raise Py4JError(
    331                     "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332                     format(target_id, ".", name, value))

Py4JError: An error occurred while calling z:org.apache.spark.sql.functions.pow. Trace:
py4j.Py4JException: Method pow([class java.lang.Double, class java.lang.Double]) does not exist
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:341)
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:362)
    at py4j.Gateway.invoke(Gateway.java:289)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

Solution

  • Functions in (py)Spark are developped in Scala, and therefore, are strongly typed. It means that the signature of the function is part of the function. For example :

    def foo(bar:str):
       ...
    
    # AND
    
    def foo(bar:int):
       ...
    

    These two functions are the same in python. They are different functions in Scala because the input type is different.

    In your case, pyspark pow with input as double does not exists. But, pow that accepts columns as input exists. It probably works in your first example because you are not using the pyspark pow but, instead, the built-in python pow.

    I'd advice you to always use F.function_name. Many python functions and pyspark functions have the same name so Python will overwrite its built-in functions with the imports.

    You should simply change your code to :

    from pyspark.sql import functions as F
    
    F.pow(F.lit(10), F.lit(9))
    

    NB: lit creates a column using the input param.