Search code examples
apache-sparkspark-graphx

Creating a graph in Spark from a time series table


Suppose I have a table with three columns; user, time, place. I want to create a graph of the place transitions for each user if the time between them is below a certain threshold; i.e., after grouping by user and ordering by time, create a directed graph of edges place_i to place_j from contiguous rows (i, j), incrementing the weight of the edge for each instance of (place_i, place_j). The source table's rows are in no particular order. Is this possible with the Python API? If not, how can I do it in Scala?

Sample table:

user,time,place
joe,1,A
jack,1,B
joe,2,B
jack,3,C
joe,4,D
jane,5,A
jane,1,B

If we ignore the time threshold constraint, the graph should have four vertices (A,B,C,D) and edges from {(A,B), (B,C), (B,D), (B,A)}.


Solution

  • I used groupBy followed by flatMapGroups. Inside the map I instantiated the iterator into a list in order to sort it. Then I iterated over the list in pairs using sliding and created the edges.

    ds.groupBy(_.user).flatMapGroups( (uid, iter) =>
     val result = ListBuffer[MySchema]()
     iter.toList.sortBy(_.time).sliding(2).foreach { case List(x,y =>
       result += MySchema(uid, x.place, if (y.time - x.time < Threshold) y.place else 0)
     }
     result.toList
    }.as[AggSchema].groupBy($"src, $"dst).count.as[Schema]