Search code examples
apache-sparkpysparkrddgraphframes

How to write a transformation function to transform RDD with reference to a Graphframe object?


I have a Graphframe object: g and a RDD object: candidate:

g = GraphFrame(v,e)
candidates_rdd.collect() 
#  [Row(source=u'a', target=u'b'),
#   Row(source=u'a', target=u'c'),
#   Row(source=u'e', target=u'a')]

I want to compute a path from "source" to "target" in candidates_rdd and generate a result rdd with key, value pairs ((source, target), path_list) using graphframe's breadth first search, where path_list is a list of paths from source to target.

Example outputs:

(('a','b'),['a-c-b','a-d-e-b']), 
(('f','c'),[]),
(('a',d'),['a-b-e-d']

I wrote the below function:

def bfs_(row):    
    arg1 = "id = '" + row.source + "'"
    arg2 = "id = '" + row.target + "'"        
    return ((row.source, row.target), g.bfs(arg1,arg2).rdd)

results = candidates_rdd.map(bfs_)

I got this error:

Py4JError: An error occurred while calling o274.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist

I have tried to make the graph global or broadcast it, neither works.

Could anyone help me on this?

Thanks very much!!


Solution

  • TL;DR It is not possible.

    Spark doesn't support nested operations like this. Outer loop has to be not-distributed:

    >>> [g.bfs(arg1, arg2) for arg1, arg2 in candidates_rdd.collect()]