Search code examples
pythonscalaapache-sparkjep

Pass dataframe from scala to python using JEP


Here is what I am trying to do:

  1. I read the data into scala
  2. Extract few columns
  3. Using JEP pass the created dataframe to Python script
  4. Python script converts dataframe to pandas performs some operation and returns it back

However I am not sure how to pass the dataframe to python script. Here is python script (This is just sample script and not the actual one):

import findspark
findspark.init()
import pandas as pd
#from pyspark.sql import types.*
from pyspark.sql import DataFrame as dataframe

def tes(df: dataframe):
    df = df.toPandas()
    df['concatenate'] = df['country'] + df['datasourceProvidedCountry']
    return dataframe(df)

and it keeps failing with the following error:

jep.JepException: <class 'ImportError'>: py4j.protocol
  at /usr/local/lib64/python3.6/site-packages/jep/java_import_hook.__getattr__(java_import_hook.py:57)
  at /home/hadoop/testpy.<module>(testpy.py:5)
  at jep.Jep.run(Native Method)
  at jep.Jep.runScript(Jep.java:359)
  at jep.Jep.runScript(Jep.java:335)
  ... 49 elided
Caused by: java.lang.ClassNotFoundException: py4j.protocol
  at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
  ... 52 more
spark-shell --conf spark.driver.extraLibraryPath=:/usr/local/lib64/python3.6/site-packages/jep:/usr/local/lib/python3.6/site-packages/py4j/ --jars /home/hadoop/jep-3.8.2.jar

can anyone advise how can I pass the dataframe from scala to pyspark using Jep ( If this is duplicate please point me to the right thread, because i was not able to find one)?


Solution

  • I have the same requirement and tried with Jep as well. Unfortunately, Jep doesn't work for this use case.

    The py4j.protocol not found is caused by the Jep ClassEnquirer, when both python and jave have the library with the same name, Jep will consider java library. You can resolve this by exclude the py4j from spark package from you java application, or create a customized ClassEnquirer to consider python py4j.

    You also need to update Jep constructor, set the useSubInterpreter value to false and rebuild it.

    public Jep(JepConfig config) throws JepException {
        this(config, false);
    }
    

    Now the error should be resolved. However, the object pass to python function is the PyObject that contains java reference, which is not a pyspark dataframe object, so it doesn't have the toPandas() function.

    The alternative way may be using gRPC or Apache thrift, you can check the document for more details.