I'm trying to insert rows of dataframe in postgres databases and insert the generated primary keys in this dataframe. I'm doing this :
def createConnexionRds():
host = "..."
database = "..."
conn = pg8000.connect(
user="...",
host=host,
database=database,
password="...",
ssl_context=True)
return conn
def insertProcess(r):
conn = createConnexionRds()
insertResults = conn.run(r["tmp_query"])
insertResult = "NOT_INSERTED"
if len(insertResults) > 0:
insertResult = insertResults[0][0]
conn.commit()
conn.close()
return insertResult
def insertPerQuery(myDataframe):
query = sf.lit("insert into tabAAA (colBBB) values ('valueCCC') returning idAAA")
myDataframe = myDataframe.withColumn("tmp_query", query)
myDataframe = myDataframe.drop("idAAA")
rdd=myDataframe.rdd.map(
lambda x:(*x, insertProcess(x))
)
myDataframe = myDataframe.withColumn("idAAA", sf.lit(""))
myDataframe = sqlContext.createDataFrame(rdd,myDataframe.schema)
myDataframe = myDataframe.drop("tmp_query")
return myDataframe
df = insertPerQuery(df)
# df.show(100, False)
The issue is when I comment df.show(...) (the last line), the insert is not process. And if I launch a second df.show(), the insert is duplicate.
This is for a AWS glue job.
Thanks.
This is due to the lazy-evaluation-nature of Spark. The code gets only executed on the executors as soon you call an action, in this case .show()