Introduction
I am not very certain the title is clear. I am not a native english speaker so if someones has a better summary for what this post is about, please edit !
Environment
python 3.5.2
pyspark 2.3.0
The context
I have a spark dataframe. This data, before being written to elastic search, gets transformed.
In my case, I have two transformations. They are map
functions on the rdd of the dataframe.
However, instead of hard writing them, I want to make it so that I can give my function (that handles the data transformation) X functions that will be, one by one, applied to the dataframe (for the first function) and/or the result of the previous transformation function.
Initial work
This is the previous state, not desired, hard written:
df.rdd.map(transfo1) \
.map(transfo2) \
.saveAsNewAPIHadoopFile
What I have so far
def write_to_index(self, transformation_functions: list, dataframe):
// stuff
for transfo in transformation_functions:
dataframe = dataframe.rdd.map(transfo)
dataframe.saveAsNewAPIHadoopFile
However, this has a problem: if the return of the first transformation is not a dataframe, it will fail on the second iteration of the loop because the resulting object does not have a rdd property.
Working solution
object_to_process = dataframe.rdd
for transfo in transformation_functions:
object_to_process = object_to_process.map(transfo)
object_to_process.saveAsNewAPIHadoopFile
The solution above seems to work (does throw any error at least). But I would like to know if there is a more elegant solution or any built-in python solution for this.
You can use this one-liner:
from functools import reduce
def write_to_index(self, transformation_functions: list, dataframe):
reduce(lambda x, y: x.map(y), transformation_functions, dataframe.rdd).saveAsNewAPIHadoopFile
which, if written verbosely, should be identical to
dataframe.rdd.map(transformation_functions[0]) \
.map(transformation_functions[1]) \
.map(...) \
.saveAsNewAPIHadoopFile