Suppose I have a table with three columns; user, time, place
. I want to create a graph of the place
transitions for each user if the time between them is below a certain threshold; i.e., after grouping by user and ordering by time, create a directed graph of edges place_i to place_j from contiguous rows (i, j), incrementing the weight of the edge for each instance of (place_i, place_j). The source table's rows are in no particular order. Is this possible with the Python API? If not, how can I do it in Scala?
Sample table:
user,time,place
joe,1,A
jack,1,B
joe,2,B
jack,3,C
joe,4,D
jane,5,A
jane,1,B
If we ignore the time threshold constraint, the graph should have four vertices (A,B,C,D) and edges from {(A,B), (B,C), (B,D), (B,A)}.
I used groupBy
followed by flatMapGroups
. Inside the map I instantiated the iterator into a list in order to sort it. Then I iterated over the list in pairs using sliding
and created the edges.
ds.groupBy(_.user).flatMapGroups( (uid, iter) =>
val result = ListBuffer[MySchema]()
iter.toList.sortBy(_.time).sliding(2).foreach { case List(x,y =>
result += MySchema(uid, x.place, if (y.time - x.time < Threshold) y.place else 0)
}
result.toList
}.as[AggSchema].groupBy($"src, $"dst).count.as[Schema]