Search code examples
pythonpandaspyspark

How to create Pandas data frame with dynamic values within a for loop


I'm literally new to data engineering where I'm using Python, PySpark and Pandas to create a data frame and I'm since a very long time I'm blocked and could not put my head around it. It is a simple problem but I'm stuck here.

Here is my code snippet which works fine (assuming there is only 1 primary key) without any loop iteration and generates dataframe in the end.

        primary_keys.append(primary_key)            
        primary_keys = [primary_key]
        df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
        
        # Define the schema for the Spark DataFrame
        schema = T.StructType([
            T.StructField("primary_key", T.IntegerType(), True),  # Integer primary key
            T.StructField(mapped_column[0], T.StringType(), True)  # Integer primary key
        ])
        
        self.logger.info("$$$$$$$$$$$$$$$ Creating DF $$$$$$$$$$$$$$$$$$$$$")
        # Create the Spark DataFrame from the Pandas DataFrame
        spark_df = spark.createDataFrame(df, schema)
        
        # (Optional) Verify the Spark DataFrame
        spark_df.printSchema()
        spark_df.show()

However, my requirement is to get this primary_keys from a loop where every primary key is generated with the dynamic value for each iteration. I tried to do following but it keeps only last object and I ended up getting the same values (values for primary key and mapped_column) in the final data frame generated.

        primary_keys = []

        # Iterate over the list and access values directly
        for row in column_values:
        
           ### some logic to generate the primary key from the loop iteration
           primary_keys.append(primary_key)
           df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
        
        
        self.logger.info("$$$$$$$$$$$$$$$ For Loop Completed $$$$$$$$$$$$$$$$$$$$$")
        # Define the schema for the Spark DataFrame
        schema = T.StructType([
            T.StructField("primary_key", T.IntegerType(), True),  # Integer primary key
            T.StructField(mapped_column[0], T.StringType(), True)  # Integer primary key
        ])
        
        self.logger.info("$$$$$$$$$$$$$$$ Creating DF $$$$$$$$$$$$$$$$$$$$$")
        # Create the Spark DataFrame from the Pandas DataFrame
        spark_df = spark.createDataFrame(df, schema)
        
        # (Optional) Verify the Spark DataFrame
        spark_df.printSchema()
        spark_df.show()

I suspect the problem is here in the df object in this statement df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value}) as it is replacing it rather than append to the existing one.

I would really appreciate if someone can assist me here, thank you


Solution

  • Your suspicion is correct. The issue lies in redefining the DataFrame df within each iteration of the loop. Consequently, only the values from the last iteration are retained. To ensure that data from each iteration is included, you should append data to the DataFrame instead of redefining it. Here's how you can address this problem:

    # Initialize an empty list to store dictionaries
    data = []
    
    # Iterate over the list and access values directly
    for row in column_values:
        # Generate primary key and value dynamically
        primary_key = generate_primary_key()  # replace generate_primary_key() with your logic
        value = generate_value()  # replace generate_value() with your logic
        
        # Append data to the list as a dictionary
        data.append({'primary_key': primary_key, mapped_column[0]: value})
    
    # Create a DataFrame from the list of dictionaries
    df = pd.DataFrame(data)
    
    # Define the schema for the Spark DataFrame
    schema = T.StructType([
        T.StructField("primary_key", T.IntegerType(), True),  # Integer primary key
        T.StructField(mapped_column[0], T.StringType(), True)  # Integer primary key
    ])
    
    # Create the Spark DataFrame from the Pandas DataFrame
    spark_df = spark.createDataFrame(df, schema)
    
    # (Optional) Verify the Spark DataFrame
    spark_df.printSchema()
    spark_df.show()
    

    This ensures that all data from each iteration is retained in the DataFrame.