Search code examples
pythonpandaspysparkkedro

How to convert Spark data frame to Pandas and back in Kedro?


I'm trying to understand what is the optimal way in Kedro to convert Spark dataframe coming out of one node into Pandas required as input for another node without creating a redundant conversion step.


Solution

  • Kedro currently supports 2 strategies for that:

    Using Transcoding feature

    This requires one to define two DataCatalog entries for the same dataset, working with the same file in a common format (Parquet, JSON, CSV, etc.), in your catalog.yml:

    my_dataframe@spark:
      type: kedro.contrib.io.pyspark.SparkDataSet
      filepath: data/02_intermediate/data.parquet
    
    my_dataframe@pandas:
      type: ParquetLocalDataSet
      filepath: data/02_intermediate/data.parquet
    

    And then use them in the pipeline like this:

    Pipeline([
        node(my_func1, "spark_input", "my_dataframe@spark"),
        node(my_func2, "my_dataframe@pandas", "output"),
    ])
    

    In this case, kedro understands that my_dataframe is the same dataset in both cases and resolves the node execution order properly. At the same time, kedro would use the SparkDataSet implementation for saving and ParquetLocalDataSet for loading, so the first node should output pyspark.sql.DataFrame, while the second node would receive a pandas.Dataframe.

    Using Pandas to Spark and Spark to Pandas node decorators

    Note: Spark <-> Pandas in-memory conversion is notorious for its memory demands, so this is a viable option only if the dataframe is known to be small.

    One can decorate the node as per the docs:

    from spark import get_spark
    from kedro.contrib.decorators import pandas_to_spark
    
    @pandas_to_spark(spark_session)
    def my_func3(data):
        data.show() # data is pyspark.sql.DataFrame
    

    Or even the whole pipeline:

    Pipeline([
        node(my_func4, "pandas_input", "some_output"),
        ...
    ]).decorate(pandas_to_spark)