Search code examples
pyspark

spark_partition_id takes 0 positional arguments but 1 was given error


I am trying to use a pyspark function similar to pandas merge_asof function, but I am running into an error.

Here is an example code.

# create two example DataFrames
df1 = spark.createDataFrame([(1, 'a', 1.0), (3, 'b', 2.0), (5, 'c', 3.0)], ['key', 'value1', 'data'])
df2 = spark.createDataFrame([(1, 'd', 4.0), (2, 'e', 5.0), (4, 'f', 6.0)], ['key', 'value2', 'data'])

# join the DataFrames on the nearest key using spark_partition_id
merged_df = df1.join(df2, (df2['key'] == df1['key']) | (spark_partition_id() == spark_partition_id(df2['key']) - 1), 'leftOuter')

merged_df.show()

Solution

  • The error you are encountering is likely due to the use of spark_partition_id() in the join condition. spark_partition_id() is a function that returns the ID of the partition in which a given row is located, and it cannot be used in a join condition like this.

    To perform a merge similar to pandas merge_asof in PySpark, you can use the rangeBetween function on a window partitioned by the key column. Here's an example code snippet that achieves this.

    from pyspark.sql.functions import col, expr
    from pyspark.sql.window import Window
    import pyspark.sql.functions as func
    from pyspark.sql import SparkSession
    # create two example DataFrames
    
    spark = SparkSession.builder.config("driver-class-path","/home/cdsw/ojdbc6-11.2.0.3.jar").config("jars","/home/cdsw/ojdbc6-11.2.0.3.jar").appName("e2ereview").getOrCreate()
    
    df1 = spark.createDataFrame([(1, 'a', 1.0), (3, 'b', 2.0), (5, 'c', 3.0)], ['key', 'value1', 'data'])
    df2 = spark.createDataFrame([(1, 'd', 4.0), (2, 'e', 5.0), (4, 'f', 6.0)], ['key', 'value2', 'data'])
    
    # define a window to use for the join
    w = Window.partitionBy('key').orderBy('key', 'data').rangeBetween(Window.unboundedPreceding, 0)
    
    # merge the DataFrames on the nearest key using the window
    merged_df = df1.join(df2, df1.key==df2.key, 'leftOuter') \
                  .withColumn('nearest_data', func.first(df1.data).over(Window.partitionBy(df1.key).orderBy(df1.key, df1.data).rangeBetween(Window.unboundedPreceding, 0))) \
                  .orderBy(df1.key)
    
    merged_df.show()