Search code examples
pythonapache-sparkpysparkmicrosoft-fabric

Can I manipulate a table directly in pyspark?


I am using pyspark in MS Fabric. I have a logging table, and I am trying to delete older entries from it.

That's easy enough, given:

  • a table whose name is stored in TABLE_LOG

  • a column in that table by the name LOG_TIME with dtype TimestampType

The following example would keep only the last two days of logs (very roughly, because of the integer cast):

RUNTIME = current_timestamp()
PERSISTENCE = 2
SECONDS_IN_DAY=24*60*60

log = spark.read.format('delta').load(f'Tables/{TABLE_LOG}')
log = log.withColumn('age', (RUNTIME.cast(IntegerType()) - log[LOG_TIME].cast(IntegerType())) / SECONDS_IN_DAY)
log = log.where(log.age < PERSISTENCE)
log.write.mode('overwrite').format('delta').save(f'Tables/{TABLE_LOG}')

In SQL, you would partition and delete the data in-place, which should be neatly parallel, and thus more efficient.

But I want to keep using pyspark, because the whole environment (including the constants and variables) is in python.

Is it possible to do that using pyspark?


Solution

  • Okay, answering myself here. It works, at least for delta tables:

    import delta
    from pyspark.sql.functions import col, date_add
    
    dt = delta.DeltaTable.forPath(spark, f'Tables/{TABLE_LOG}')
    dt.delete(col(LOG_TIME) < date_add(RUNTIME, -PERSISTENCE))
    

    ...and that's it. Thank you for listening :-)