I have the following pyspark application that generates sequences of child/parent processes from a csv of child/parent process id's. Considering the problem as a tree, I'm using an iterative depth-first search starting at leaf nodes (a process that has no children) and iterating through my file to create these closures where process 1 is the parent to process 2 which is the parent of process 3 so on and so forth.
In other words, given a csv as shown below, is it possible to implement a depth-first search (iteratively or recursively) using pyspark dataframes & appropriate pyspark-isms to generate said closures without having to use the .collect() function (which is incredible expensive)?
from pyspark.sql.functions import monotonically_increasing_id
import copy
from pyspark.sql import SQLContext
from pyspark import SparkContext
class Test():
def __init__(self):
self.process_list = []
def main():
test = Test()
sc = SparkContext.getOrCreate()
sqlContext = SQLContext(sc)
df = sc.textFile("<path to csv>")
df = df.map(lambda line: line.split(","))
header = df.first()
data = df.filter(lambda row: row != header)
data = data.toDF(header)
data.createOrReplaceTempView("flat")
data = sqlContext.sql("select doc_process_pid, doc_parent_pid from flat
where doc_parent_pid is not null AND
doc_process_pid is not null")
data = data.select(monotonically_increasing_id().alias("rowId"), "*")
data.createOrReplaceTempView("data")
leaf_df = sqlContext.sql("select doc_process_pid, doc_parent_pid from data
where doc_parent_pid != -1 AND
doc_process_pid == -1")
leaf_df = leaf_df.rdd.collect()
data = sqlContext.sql("select doc_process_pid, doc_parent_pid from data
where doc_process_pid != -1")
data.createOrReplaceTempView("data")
for row in leaf_df:
path = []
rowID = row[0]
data = data.filter(data['rowId'] != rowID)
parentID = row[4]
path.append(parentID)
while (True):
next_df = sqlContext.sql(
"select doc_process_pid, doc_parent_pid from data where
doc_process_pid == " + str(parentID))
next_df_rdd = next_df.collect()
print("parent: ", next_df_rdd[0][1])
parentID = next_df_rdd[0][1]
if (int(parentID) != -1):
path.append(next_df_rdd[0][1])
else:
test.process_list.append(copy.deepcopy(path))
break
print("final: ", test.process_list)
main()
Here is my csv:
doc_process_pid doc_parent_pid
1 -1
2 1
6 -1
7 6
8 7
9 8
21 -1
22 21
24 -1
25 24
26 25
27 26
28 27
29 28
99 6
107 99
108 -1
109 108
222 109
1000 7
1001 1000
-1 9
-1 22
-1 29
-1 107
-1 1001
-1 222
-1 2
It represents child/parent process relationships. If we consider this as a tree, then leaf nodes are defined by doc_process_id == -1 and root nodes are process where doc_parent_process == -1.
The code above generates two data frames:
Leaf Nodes:
+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
| -1| 9|
| -1| 22|
| -1| 29|
| -1| 107|
| -1| 1001|
| -1| 222|
| -1| 2|
+---------------+--------------+
The remaining child/parent processes sans leaf nodes:
+---------------+--------------+
|doc_process_pid|doc_parent_pid|
+---------------+--------------+
| 1| -1|
| 2| 1|
| 6| -1|
| 7| 6|
| 8| 7|
| 9| 8|
| 21| -1|
| 22| 21|
| 24| -1|
| 25| 24|
| 26| 25|
| 27| 26|
| 28| 27|
| 29| 28|
| 99| 6|
| 107| 99|
| 108| -1|
| 109| 108|
| 222| 109|
| 1000| 7|
+---------------+--------------+
The output would be:
[[1, 2],
[6, 99, 107],
[6, 99, 7, 1000, 1001],
[6, 7, 1000, 8, 9],
[21, 22],
[24, 25, 26, 27, 28, 29],
[108, 109, 222]])
Thoughts? While this it a bit specific, I want to emphasize the generalized question of performing depth-first searches to generate closures of sequences represented in this DataFrame format.
Thanks in advance for the help!
I don't think pyspark it's the best language to do this.
A solution would be to iterate through the tree node levels joining the dataframe with itself everytime.
Let's create our dataframe, no need to split it into leaf and other nodes, we'll just keep the original dataframe:
data = spark.createDataFrame(
sc.parallelize(
[[1, -1], [2, 1], [6, -1], [7, 6], [8, 7], [9, 8], [21,-1], [22,21], [24,-1], [25,24], [26,25], [27,26], [28,27],
[29,28], [99, 6], [107,99], [108,-1], [109,108], [222,109], [1000,7], [1001,1000], [ -1,9], [ -1,22], [ -1,29],
[ -1,107], [ -1, 1001], [ -1,222], [ -1,2]]
),
["doc_process_pid", "doc_parent_pid"]
)
We'll now create two dataframes from this tree, one will be our building base and the other one will be our construction bricks:
df1 = data.filter("doc_parent_pid = -1").select(data.doc_process_pid.alias("node"))
df2 = data.select(data.doc_process_pid.alias("son"), data.doc_parent_pid.alias("node")).filter("node != -1")
Let's define a function for step i
of the construction:
def add_node(df, i):
return df.filter("node != -1").join(df2, "node", "inner").withColumnRenamed("node", "node" + str(i)).withColumnRenamed("son", "node")
Let's define our initial state:
from pyspark.sql.types import *
df = df1
i = 0
df_end = spark.createDataFrame(
sc.emptyRDD(),
StructType([StructField("branch", ArrayType(LongType()), True)])
)
When a branch is fully constructed we take it out of df
and put it in df_end
:
import pyspark.sql.functions as psf
while df.count() > 0:
i = i + 1
df = add_node(df, i)
df_end = df.filter("node = -1").drop('node').select(psf.array(*[c for c in reversed(df.columns) if c != "node"]).alias("branch")).unionAll(
df_end
)
df = df.filter("node != -1")
At the end, df is empty and we have
df_end.show(truncate=False)
+------------------------+
|branch |
+------------------------+
|[24, 25, 26, 27, 28, 29]|
|[6, 7, 8, 9] |
|[6, 7, 1000, 1001] |
|[108, 109, 222] |
|[6, 99, 107] |
|[21, 22] |
|[1, 2] |
+------------------------+
The worst case for this algorithm is as many joins as the maximum branch length.