I am trying to copy data from an iceberg table to a postgres table using a glue job. I have this code:
def execute_job(spark, factory: DependencyFactory, environment, logger):
print("Starting job")
sql = f"SELECT * FROM {DATABASE_NAME}.transactions ORDER BY unique_identifier, in_date"
df = spark.sql(sql)
df.show(10)
config = Config(environment)
window_spec = Window.partitionBy("unique_identifier").orderBy("in_date")
df = df.withColumn("version", row_number().over(window_spec))
df = df.withColumn("start_date", col("in_date"))
df = df.withColumn("end_date", lit("9999-12-31").cast("string"))
df.show(10)
df = df.withColumn("prev_start_date", lag("start_date").over(window_spec))
df = df.withColumn("end_date", when(col("version") == 1, col("end_date")).otherwise(col("prev_start_date")))
df = df.withColumn("id", col("unique_identifier").cast("string") + ":" + col("version").cast("string"))
df.show(10)
total_count = df.count()
print(f"Total records: {total_count}")
if total_count == 0:
print("No records to insert. Exiting.")
return
num_partitions = max(1, total_count // 100000)
processed_df = df.repartition(num_partitions)
print(f"Number of partitions: {processed_df.rdd.getNumPartitions()}")
processed_df.show(10)
def process_partition(partition):
logger.info("Processing partition")
print("Processing partition")
partition_list = list(partition)
insert_query = """
INSERT INTO transaction.transactions_master (start_date, end_date, in_date, out_date, update_time, id, version, unique_identifier, id_tr, alias, update_correlation_id, correlation_id)
VALUES %s
"""
db_client = PostgresDbClient(DbConfigBuilder(config, config['vault']['service_type'], ssl_cert_file=None))
if not partition_list:
print("Skipping empty partition")
return
batch_size = 100000
try:
values = [tuple(row) for row in partition_list]
db_client.execute_values_query(insert_query, values, page_size=batch_size)
print(f"Inserted {len(values)} records successfully")
except Exception as e:
print(f"Error inserting partition: {e}")
try:
print("Starting foreachPartition")
processed_df.foreachPartition(process_partition)
print("Finished foreachPartition")
except Exception as e:
print(f"Error during foreachPartition: {e}")
print("Data successfully copied to transaction.transactions_master PostgreSQL table")
everything seems to be working (it's printing results in the logs, also tested db connection and it seems fine). However, when it gets to processed_df.foreachPartition(process_partition) it's like the code inside process_partition is not getting executed. I'm not seeing anything in the logs. Last entries are just "Starting foreachPartition" and "Data successfully copied to transaction.transactions_master PostgreSQL table"
What could be the issue? Am I using foreachPartition wrong?
foreachPartition function works on executor level, you can see details logs on the executor logs rather than driver logs. Worth checking the executor logs. Also, do you see the data getting updated in PostgreSQL table?