I have a dataframe like that:
+-----------------+------------------+-----------+--------+---+
| conversation_id | message_body | timestamp | sender | |
+-----------------+------------------+-----------+--------+---+
| A | hi | 9:00 | John | |
| A | how are you? | 10:00 | John | |
| A | can we meet? | 10:05 | John | * |
| A | not bad | 10:30 | Steven | * |
| A | great | 10:40 | John | |
| A | yeah, let's meet | 10:35 | Steven | |
| B | Hi | 12:00 | Anna | * |
| B | Hello | 12:05 | Ken | * |
+-----------------+------------------+-----------+--------+---+
For each conversation I would like to have the last message in the first block of the 1st sender and the first message of the 2nd sender. I marked them with an asterisk.
One idea that I had is to assign 0s for the first user and 1s for the second user.
Ideally I would like to have something like that:
+-----------------+---------+------------+--------------+---------+------------+----------+
| conversation_id | sender1 | timestamp1 | message1 | sender2 | timestamp2 | message2 |
+-----------------+---------+------------+--------------+---------+------------+----------+
| A | John | 10:05 | can we meet? | Steven | 10:30 | not bad |
| B | Anna | 12:00 | Hi | Ken | 12:05 | Hello |
+-----------------+---------+------------+--------------+---------+------------+----------+
How could I do that in Spark?
Interesting issues arose.
Done in DataBricks Notebook
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import spark.implicits._
val df =
Seq(("A", "hi", "09:00", "John"), ("A", "how are you?", "10:00", "John"),
("A", "can we meet?", "10:05", "John"), ("A", "not bad", "10:30", "Steven"),
("A", "great", "10:40", "John"), ("A", "yeah, let's meet", "10:45", "Steven"),
("B", "Hi", "12:00", "Anna"), ("B", "Hello", "12:05", "Ken")
).toDF("conversation_id", "message_body", "timestampX", "sender")
// Get rank, 1 is who were initiates conversation, the other values can be used to infer relationships
// Note no @Transient required here with Window
val df2 = df.withColumn("rankC", row_number().over(Window.partitionBy($"conversation_id").orderBy($"timestampX".asc)))
// A value <> 1 is the first message of second Sender.
// The 1 value of this is the last message of first "block"
val df3 = df2.select('conversation_id as "df3_conversation_id", 'sender as "df3_sender", 'rankC as "df3_rank")
// To avoid pipelined renaming issues that occur
val df3a = df3.groupBy("df3_conversation_id", "df3_sender").agg(min("df3_rank") as "rankC2").filter("rankC2 != 1")
//JOIN the values with some smarts. Some odd errors in Spark thru pipe-lining gotten. Need to drop pipelined row(), ranking etc.
val df4 = df3a.join(df2, (df3a("df3_conversation_id") === df2("conversation_id")) && (df3a("rankC2") === df2("rankC") + 1)).drop("rankC").drop("rankC2")
val df4a = df3a.join(df2, (df3a("df3_conversation_id") === df2("conversation_id")) && (df3a("rankC2") === df2("rankC"))).drop("rankC").drop("rankC2")
// The get other missing data, could have all been combined but done in steps for simplicity. Just a simple final JOIN and you ahve the answer.
val df5 = df4.join(df4a, (df4("df3_conversation_id") === df4a("df3_conversation_id")))
df5.show(false)
returns:
Output will not completely format here, run it in REPL to see titles.
|B |Ken |B |Hi |12:00 |Anna |B |Ken |B |Hello |12:05 |Ken |
|A |Steven |A |can we meet?|10:05 |John |A |Steven |A |not bad |10:30 |Steven|
You can further manipulate the data, the heavy lifting is done now. The Catalyst Optimizer has some issues compiling etc. so this is why I worked around in this fashion.