I'm trying to understand what is the optimal way in Kedro to convert Spark dataframe coming out of one node into Pandas required as input for another node without creating a redundant conversion step.
Kedro currently supports 2 strategies for that:
This requires one to define two DataCatalog
entries for the same dataset, working with the same file in a common format (Parquet, JSON, CSV, etc.), in your catalog.yml
:
my_dataframe@spark:
type: kedro.contrib.io.pyspark.SparkDataSet
filepath: data/02_intermediate/data.parquet
my_dataframe@pandas:
type: ParquetLocalDataSet
filepath: data/02_intermediate/data.parquet
And then use them in the pipeline like this:
Pipeline([
node(my_func1, "spark_input", "my_dataframe@spark"),
node(my_func2, "my_dataframe@pandas", "output"),
])
In this case, kedro
understands that my_dataframe
is the same dataset in both cases and resolves the node execution order properly. At the same time, kedro
would use the SparkDataSet
implementation for saving and ParquetLocalDataSet
for loading, so the first node should output pyspark.sql.DataFrame
, while the second node would receive a pandas.Dataframe
.
Note: Spark <-> Pandas
in-memory conversion is notorious for its memory demands, so this is a viable option only if the dataframe is known to be small.
One can decorate the node as per the docs:
from spark import get_spark
from kedro.contrib.decorators import pandas_to_spark
@pandas_to_spark(spark_session)
def my_func3(data):
data.show() # data is pyspark.sql.DataFrame
Or even the whole pipeline:
Pipeline([
node(my_func4, "pandas_input", "some_output"),
...
]).decorate(pandas_to_spark)