I want to fill the previous data for every millisecond.
input dataframe:
node | value | timestamp |
---|---|---|
node1 | 7777 | '2023-10-28 14:22:41.9' |
node1 | 8888 | '2023-10-28 14:22:42.5' |
node1 | 1111 | '2023-10-28 14:22:42.7' |
node2 | 2222 | '2023-10-28 14:22:41.2' |
node2 | 6666 | '2023-10-28 14:22:41.5' |
output dataframe:
node | value | timestamp |
---|---|---|
node1 | 7777 | '2023-10-28 14:22:41.9' |
node1 | 7777 | '2023-10-28 14:22:42.0' |
node1 | 7777 | '2023-10-28 14:22:42.1' |
node1 | 7777 | '2023-10-28 14:22:42.2' |
node1 | 7777 | '2023-10-28 14:22:42.3' |
node1 | 7777 | '2023-10-28 14:22:42.4' |
node1 | 8888 | '2023-10-28 14:22:42.5' |
node1 | 8888 | '2023-10-28 14:22:42.6' |
node1 | 1111 | '2023-10-28 14:22:42.7' |
node2 | 2222 | '2023-10-28 14:22:41.2' |
node2 | 2222 | '2023-10-28 14:22:41.3' |
node2 | 2222 | '2023-10-28 14:22:41.4' |
node2 | 6666 | '2023-10-28 14:22:41.5' |
It didn't works for me with the milliseconds.
I have one solution which works for your sample code. I am not sure if its going to handle all edge cases but it should be a good starting point
I am using spark sql sequence function together with explode to generate new rows between current and next timestamps, i am using interval = 100 ms to match your use case, it can be adjusted.
I am dropping overlapping records because i was unable to handle this case within explode generator (in Spark we can't use conditional expressions within generators)
from pyspark.sql import Window
from pyspark.sql.functions import expr, lead, col, to_timestamp
data = [
("node1", 7777, "2023-10-28 14:22:41.9"),
("node1", 8888, "2023-10-28 14:22:42.5"),
("node1", 1111, "2023-10-28 14:22:42.7"),
("node2", 2222, "2023-10-28 14:22:41.2"),
("node2", 6666, "2023-10-28 14:22:41.5"),
]
columns = ["node", "value", "timestamp"]
df = spark.createDataFrame(data, columns)
window_spec = Window.partitionBy("node").orderBy("node", "timestamp")
df = df.withColumn(
"next_timestamp", to_timestamp(lead(col("timestamp")).over(window_spec))
)
result_df = df.withColumn(
"timestamp",
expr(
"explode(sequence(to_timestamp(timestamp), nvl(next_timestamp, to_timestamp(timestamp)), interval 100 milliseconds))"
),
).withColumn(
"to_drop",
when((col("timestamp") == col("next_timestamp")), True).otherwise(False)
)
result_df.filter(col('to_drop') == False).drop("next_timestamp", "to_drop").show(truncate=False)
My output:
+-----+-----+---------------------+
|node |value|timestamp |
+-----+-----+---------------------+
|node1|7777 |2023-10-28 14:22:41.9|
|node1|7777 |2023-10-28 14:22:42 |
|node1|7777 |2023-10-28 14:22:42.1|
|node1|7777 |2023-10-28 14:22:42.2|
|node1|7777 |2023-10-28 14:22:42.3|
|node1|7777 |2023-10-28 14:22:42.4|
|node1|8888 |2023-10-28 14:22:42.5|
|node1|8888 |2023-10-28 14:22:42.6|
|node1|1111 |2023-10-28 14:22:42.7|
|node2|2222 |2023-10-28 14:22:41.2|
|node2|2222 |2023-10-28 14:22:41.3|
|node2|2222 |2023-10-28 14:22:41.4|
|node2|6666 |2023-10-28 14:22:41.5|
+-----+-----+---------------------+