Search code examples
python-3.xpyspark

PySpark fill data with previous value every milliseonds


I want to fill the previous data for every millisecond.

input dataframe:

node value timestamp
node1 7777 '2023-10-28 14:22:41.9'
node1 8888 '2023-10-28 14:22:42.5'
node1 1111 '2023-10-28 14:22:42.7'
node2 2222 '2023-10-28 14:22:41.2'
node2 6666 '2023-10-28 14:22:41.5'

output dataframe:

node value timestamp
node1 7777 '2023-10-28 14:22:41.9'
node1 7777 '2023-10-28 14:22:42.0'
node1 7777 '2023-10-28 14:22:42.1'
node1 7777 '2023-10-28 14:22:42.2'
node1 7777 '2023-10-28 14:22:42.3'
node1 7777 '2023-10-28 14:22:42.4'
node1 8888 '2023-10-28 14:22:42.5'
node1 8888 '2023-10-28 14:22:42.6'
node1 1111 '2023-10-28 14:22:42.7'
node2 2222 '2023-10-28 14:22:41.2'
node2 2222 '2023-10-28 14:22:41.3'
node2 2222 '2023-10-28 14:22:41.4'
node2 6666 '2023-10-28 14:22:41.5'

It didn't works for me with the milliseconds.


Solution

  • I have one solution which works for your sample code. I am not sure if its going to handle all edge cases but it should be a good starting point

    I am using spark sql sequence function together with explode to generate new rows between current and next timestamps, i am using interval = 100 ms to match your use case, it can be adjusted.

    I am dropping overlapping records because i was unable to handle this case within explode generator (in Spark we can't use conditional expressions within generators)

    from pyspark.sql import Window
    from pyspark.sql.functions import expr, lead, col, to_timestamp
    
    data = [
        ("node1", 7777, "2023-10-28 14:22:41.9"),
        ("node1", 8888, "2023-10-28 14:22:42.5"),
        ("node1", 1111, "2023-10-28 14:22:42.7"),
        ("node2", 2222, "2023-10-28 14:22:41.2"),
        ("node2", 6666, "2023-10-28 14:22:41.5"),
    ]
    columns = ["node", "value", "timestamp"]
    df = spark.createDataFrame(data, columns)
    
    window_spec = Window.partitionBy("node").orderBy("node", "timestamp")
    
    df = df.withColumn(
        "next_timestamp", to_timestamp(lead(col("timestamp")).over(window_spec))
    )
    
    result_df = df.withColumn(
        "timestamp",
        expr(
            "explode(sequence(to_timestamp(timestamp), nvl(next_timestamp, to_timestamp(timestamp)), interval 100 milliseconds))"
        ),
    ).withColumn(
        "to_drop",
        when((col("timestamp") == col("next_timestamp")), True).otherwise(False)
    )
    result_df.filter(col('to_drop') == False).drop("next_timestamp", "to_drop").show(truncate=False)
    

    My output:

    +-----+-----+---------------------+
    |node |value|timestamp            |
    +-----+-----+---------------------+
    |node1|7777 |2023-10-28 14:22:41.9|
    |node1|7777 |2023-10-28 14:22:42  |
    |node1|7777 |2023-10-28 14:22:42.1|
    |node1|7777 |2023-10-28 14:22:42.2|
    |node1|7777 |2023-10-28 14:22:42.3|
    |node1|7777 |2023-10-28 14:22:42.4|
    |node1|8888 |2023-10-28 14:22:42.5|
    |node1|8888 |2023-10-28 14:22:42.6|
    |node1|1111 |2023-10-28 14:22:42.7|
    |node2|2222 |2023-10-28 14:22:41.2|
    |node2|2222 |2023-10-28 14:22:41.3|
    |node2|2222 |2023-10-28 14:22:41.4|
    |node2|6666 |2023-10-28 14:22:41.5|
    +-----+-----+---------------------+