Search code examples
pythonapache-sparkpysparkrddazure-synapse

Why is my PySpark row_number column messed up when applying a schema?


I want to apply a schema to specific non-technical columns of a Spark DataFrame. Beforehand, I add an artificial ID using Window and row_number so that I can later join some other technical columns to the new DataFrame from the initial DataFrame. However, after applying the schema, the generated ID is messed up. Below is a code sample. Can someone explain why this happens and how to resolve the issue?

from pyspark.sql.functions import row_number, lit, col, monotonically_increasing_id, sum
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

# Sample DataFrame
data = [(1, "Alice"), (2, "Bob"), (3, "Charlie")]
df = spark.createDataFrame(data, ["id", "name"])


# Schema to apply
schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
])

# Create ID column
w = Window().orderBy(lit('A'))
df = df.withColumn('_special_surrogate_id', row_number().over(w))

# Improved method
surrogate_key_field = StructField("_special_surrogate_id", StringType(), False)
schema_with_surrogate = StructType(schema.fields + [surrogate_key_field])

# Loop because sometimes it works and sometimes it does't work
for i in range(11):
    
    df_filtered = df.select("id", "name", "_special_surrogate_id")   
    df_filtered = spark.createDataFrame(df_filtered.rdd, schema_with_surrogate)

    combined_df = df.withColumnRenamed("id", "id1").join(df_filtered.withColumnRenamed("id", "id2"), on="_special_surrogate_id")

    print("Diffs in Iteration " + str(i) + ":")
    print(combined_df.withColumn("diff", (col("id1") != col("id2")).cast("integer")).agg(sum("diff")).collect()[0][0])


Solution

  • The issue:

    orderBy(lit('A')) is used to determine the row_number() ordering. It is not a deterministic way to define the row number value.

    Behind the curtain of Spark:

    Spark is using multiple executors to complete the row numbering process. To do this, Spark is doing a first pass where each executor is ordering the data that executor has. Then doing another pass to order all the data.

    Example Scenario -

    When the Alice data row and the Bob data row are each being processed by a different executor, they could both be considered to be the "first row" for their executor.

    Then Spark has to decide in the second pass, which data row will ultimately be assigned the row_number value of 1.

    However, your logic does not tell Spark how to make this decision.

    The shuffling of the data, among other Spark mechanics, can result in Spark not making the same decision every time it assigns the row number value.

    Recommendation:

    If you have a column combination that could result in unique set of values for each row, that should be used in the orderBy statement to give Spark the information it needs for consistent and deterministic row number assignment.

    e.g orderBy(F.col('id'),F.col('name'))