I'm literally new to data engineering where I'm using Python, PySpark and Pandas to create a data frame and I'm since a very long time I'm blocked and could not put my head around it. It is a simple problem but I'm stuck here.
Here is my code snippet which works fine (assuming there is only 1 primary key) without any loop iteration and generates dataframe in the end.
primary_keys.append(primary_key)
primary_keys = [primary_key]
df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
# Define the schema for the Spark DataFrame
schema = T.StructType([
T.StructField("primary_key", T.IntegerType(), True), # Integer primary key
T.StructField(mapped_column[0], T.StringType(), True) # Integer primary key
])
self.logger.info("$$$$$$$$$$$$$$$ Creating DF $$$$$$$$$$$$$$$$$$$$$")
# Create the Spark DataFrame from the Pandas DataFrame
spark_df = spark.createDataFrame(df, schema)
# (Optional) Verify the Spark DataFrame
spark_df.printSchema()
spark_df.show()
However, my requirement is to get this primary_keys
from a loop where every primary key is generated with the dynamic value for each iteration. I tried to do following but it keeps only last object and I ended up getting the same values (values for primary key and mapped_column) in the final data frame generated.
primary_keys = []
# Iterate over the list and access values directly
for row in column_values:
### some logic to generate the primary key from the loop iteration
primary_keys.append(primary_key)
df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
self.logger.info("$$$$$$$$$$$$$$$ For Loop Completed $$$$$$$$$$$$$$$$$$$$$")
# Define the schema for the Spark DataFrame
schema = T.StructType([
T.StructField("primary_key", T.IntegerType(), True), # Integer primary key
T.StructField(mapped_column[0], T.StringType(), True) # Integer primary key
])
self.logger.info("$$$$$$$$$$$$$$$ Creating DF $$$$$$$$$$$$$$$$$$$$$")
# Create the Spark DataFrame from the Pandas DataFrame
spark_df = spark.createDataFrame(df, schema)
# (Optional) Verify the Spark DataFrame
spark_df.printSchema()
spark_df.show()
I suspect the problem is here in the df
object in this statement df = pd.DataFrame({'primary_key': primary_keys, mapped_column[0]: value})
as it is replacing it rather than append to the existing one.
I would really appreciate if someone can assist me here, thank you
Your suspicion is correct. The issue lies in redefining the DataFrame df within each iteration of the loop. Consequently, only the values from the last iteration are retained. To ensure that data from each iteration is included, you should append data to the DataFrame instead of redefining it. Here's how you can address this problem:
# Initialize an empty list to store dictionaries
data = []
# Iterate over the list and access values directly
for row in column_values:
# Generate primary key and value dynamically
primary_key = generate_primary_key() # replace generate_primary_key() with your logic
value = generate_value() # replace generate_value() with your logic
# Append data to the list as a dictionary
data.append({'primary_key': primary_key, mapped_column[0]: value})
# Create a DataFrame from the list of dictionaries
df = pd.DataFrame(data)
# Define the schema for the Spark DataFrame
schema = T.StructType([
T.StructField("primary_key", T.IntegerType(), True), # Integer primary key
T.StructField(mapped_column[0], T.StringType(), True) # Integer primary key
])
# Create the Spark DataFrame from the Pandas DataFrame
spark_df = spark.createDataFrame(df, schema)
# (Optional) Verify the Spark DataFrame
spark_df.printSchema()
spark_df.show()
This ensures that all data from each iteration is retained in the DataFrame.