Search code examples
pysparkapache-spark-sqlaws-glue

pg8000 get inserted id into dataframe


I'm trying to insert rows of dataframe in postgres databases and insert the generated primary keys in this dataframe. I'm doing this :

def createConnexionRds():
    host = "..."
    database = "..."
    conn = pg8000.connect(
        user="...",
        host=host,
        database=database,
        password="...",
        ssl_context=True)
    return conn

def insertProcess(r):
    conn = createConnexionRds()
    insertResults = conn.run(r["tmp_query"])
    insertResult = "NOT_INSERTED"
    if len(insertResults) > 0:
        insertResult = insertResults[0][0]
    conn.commit()
    conn.close()
    return insertResult

def insertPerQuery(myDataframe):
    query = sf.lit("insert into tabAAA (colBBB) values ('valueCCC') returning idAAA")
    
    myDataframe = myDataframe.withColumn("tmp_query", query)
    myDataframe = myDataframe.drop("idAAA")
    
    rdd=myDataframe.rdd.map(
        lambda x:(*x, insertProcess(x))
    )
    
    myDataframe = myDataframe.withColumn("idAAA", sf.lit(""))
    myDataframe = sqlContext.createDataFrame(rdd,myDataframe.schema)
    myDataframe = myDataframe.drop("tmp_query")
    return myDataframe
    
df = insertPerQuery(df)
# df.show(100, False)

The issue is when I comment df.show(...) (the last line), the insert is not process. And if I launch a second df.show(), the insert is duplicate.

This is for a AWS glue job.

Thanks.


Solution

  • This is due to the lazy-evaluation-nature of Spark. The code gets only executed on the executors as soon you call an action, in this case .show()