I have a Graphframe object: g and a RDD object: candidate:
g = GraphFrame(v,e)
candidates_rdd.collect()
# [Row(source=u'a', target=u'b'),
# Row(source=u'a', target=u'c'),
# Row(source=u'e', target=u'a')]
I want to compute a path from "source" to "target" in candidates_rdd and generate a result rdd with key, value pairs ((source, target), path_list) using graphframe's breadth first search, where path_list is a list of paths from source to target.
Example outputs:
(('a','b'),['a-c-b','a-d-e-b']),
(('f','c'),[]),
(('a',d'),['a-b-e-d']
I wrote the below function:
def bfs_(row):
arg1 = "id = '" + row.source + "'"
arg2 = "id = '" + row.target + "'"
return ((row.source, row.target), g.bfs(arg1,arg2).rdd)
results = candidates_rdd.map(bfs_)
I got this error:
Py4JError: An error occurred while calling o274.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
I have tried to make the graph global or broadcast it, neither works.
Could anyone help me on this?
Thanks very much!!
TL;DR It is not possible.
Spark doesn't support nested operations like this. Outer loop has to be not-distributed:
>>> [g.bfs(arg1, arg2) for arg1, arg2 in candidates_rdd.collect()]