Search code examples
pysparkapache-spark-sqlaws-glue

pyspark sql concat empty col


With pyspark sql functions, I'm trying to do this

from pyspark.sql import functions as sf
query = sf.concat(sf.lit("UPDATE abc"), sf.lit(" SET col1= '"), sf.col("col1"), sf.lit("'"), sf.lit(", col2= '"), sf.col("col2"), sf.lit("'"), sf.lit(" WHERE col3 = 1")
myDataframe = myDataframe.withColumn("query", query)
query_collect = myDataframe.collect()
conn = createConnexion(args, username, password)
try:
    for row in query_collect:
        print(row["query"])
        conn.run(row["query"])
        conn.commit()

But it doesn't works. It work with just col1, but col2 make an error because sometime, this col2 is empty (null) query column is null and conn.run(row["query"]) throw this exception : None 'NoneType' object has no attribute 'encode'

I'm trying to use pyspark sql.when like this but this is the same issue :

myDataframe = myDataframe.fillna(value="NO_SQL")
query = sf.concat(sf.lit("UPDATE abc"), 
sf.lit(" SET col1= '"), 
sf.col("col1"), 
sf.lit("'"), 
sf.when(sf.col("col2") != "NO_SQL", sf.concat(sf.lit(", col2= '"), sf.col("col2"), sf.lit("'"))), 
sf.lit(" WHERE col3 = 1")

Edit for @Linus : I'm tryin this

@udf(returnType=StringType())
def sql_worker(col1, col2, colWhere):
    col2_setting = ", {col2} = '{col2}'" if col2 is not None else ""
    return f" UPDATE entreprise SET {col1} = '{col1}'{col2_setting} WHERE abc = {colWhere} "

def aaa(dynToInsert, colonne, args, username, password, forLog):
    dfToInsert = dynToInsert.toDF()
    dfToInsert.withColumn("query", sql_worker(sf.col('col1'), sf.col('col2'), sf.col('col3')))

But I have this exception : Invalid returnType: returnType should be DataType or str but is StringType({})

Thanks


Solution

  • It works with when().otherwise(). At the beginning, I'm trying without the otherwise and it is an error. Thanks.

    query = sf.concat(
        sf.lit("UPDATE def"),
        sf.lit(" SET " + colonne + " = "), sf.col(colonne),
        sf.when(sf.col("abc").isNull(), "").otherwise(sf.concat(sf.lit(" , abc = '"), sf.col("abc"), sf.lit("'"))),
        sf.lit(" WHERE " + colonne + " = "), sf.col(colonne)
        )