Search code examples
pythonpysparkapache-spark-sqlattributeerrorrlike

AttributeError: 'NoneType' object has no attribute '_jvm' in Pyspark


I'm trying to print a dataframe by looping through each row of that dataframe. I then used the map() transformation to the dataframe's RDD to apply lambda function and tried converting it back into a dataframe. I'm running this program on Jupyter Notebook through conda env. My guess is there is some problem in applying the rlike() function because the mapping works fine without that. Here's the code below :

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window




spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate();



data = [("Sunday",20000), ("734",1000), ("Fruday",5001),("Tuesday",17000)]
rdd=spark.sparkContext.parallelize(data)
columns = ["day","users_count"]



df = spark.createDataFrame(data=data, schema=columns)




def func1(x):
    day = x.day
    users_count = x.users_count
    cond1 = F.when(F.col(day).rlike('^(Sun|Mon|Tues|Wednes|Thurs|Fri|Satur)day$'),"Success").otherwise("Error")
    return (day, users_count, id, cond1)



rdd2 = df.rdd.map(lambda x: func1(x))
Columns = ["day","users_count","cond1"]



dffinal = rdd2.toDF(Columns)
dffinal.show()

Here is the error :

Py4JJavaError                             Traceback (most recent call last)
Input In [7], in <cell line: 1>()
----> 1 dffinal = rdd2.toDF(Columns)
      2 dffinal.show()



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:66, in _monkey_patch_RDD.<locals>.toDF(self, schema, sampleRatio)
     39 def toDF(self, schema=None, sampleRatio=None):
     40     """
     41     Converts current :class:`RDD` into a :class:`DataFrame`
     42
   (...)
     64     [Row(name='Alice', age=1)]
     65     """
---> 66     return sparkSession.createDataFrame(self, schema, sampleRatio)



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:675, in SparkSession.createDataFrame(self, data, schema, samplingRatio, verifySchema)
    671 if has_pandas and isinstance(data, pandas.DataFrame):
    672     # Create a DataFrame from pandas DataFrame.
    673     return super(SparkSession, self).createDataFrame(
    674         data, schema, samplingRatio, verifySchema)
--> 675 return self._create_dataframe(data, schema, samplingRatio, verifySchema)



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:698, in SparkSession._create_dataframe(self, data, schema, samplingRatio, verifySchema)
    695     prepare = lambda obj: obj
    697 if isinstance(data, RDD):
--> 698     rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
    699 else:
    700     rdd, schema = self._createFromLocal(map(prepare, data), schema)



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:486, in SparkSession._createFromRDD(self, rdd, schema, samplingRatio)
    482 """
    483 Create an RDD for DataFrame from an existing RDD, returns the RDD and schema.
    484 """
    485 if schema is None or isinstance(schema, (list, tuple)):
--> 486     struct = self._inferSchema(rdd, samplingRatio, names=schema)
    487     converter = _create_converter(struct)
    488     rdd = rdd.map(converter)



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\session.py:460, in SparkSession._inferSchema(self, rdd, samplingRatio, names)
    444 def _inferSchema(self, rdd, samplingRatio=None, names=None):
    445     """
    446     Infer schema from an RDD of Row, dict, or tuple.
    447
   (...)
    458     :class:`pyspark.sql.types.StructType`
    459     """
--> 460     first = rdd.first()
    461     if not first:
    462         raise ValueError("The first row in RDD is empty, "
    463                          "can not infer schema")



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\rdd.py:1586, in RDD.first(self)
   1573 def first(self):
   1574     """
   1575     Return the first element in this RDD.
   1576
   (...)
   1584     ValueError: RDD is empty
   1585     """
-> 1586     rs = self.take(1)
   1587     if rs:
   1588         return rs[0]



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\rdd.py:1566, in RDD.take(self, num)
   1563         taken += 1
   1565 p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1566 res = self.context.runJob(self, takeUpToNumLeft, p)
   1568 items += res
   1569 partsScanned += numPartsToTry



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\context.py:1233, in SparkContext.runJob(self, rdd, partitionFunc, partitions, allowLocal)
   1229 # Implementation note: This is implemented as a mapPartitions followed
   1230 # by runJob() in order to avoid having to pass a Python lambda into
   1231 # SparkContext#runJob.
   1232 mappedRDD = rdd.mapPartitions(partitionFunc)
-> 1233 sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
   1234 return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer))



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\py4j\java_gateway.py:1309, in JavaMember.__call__(self, *args)
   1303 command = proto.CALL_COMMAND_NAME +\
   1304     self.command_header +\
   1305     args_command +\
   1306     proto.END_COMMAND_PART
   1308 answer = self.gateway_client.send_command(command)
-> 1309 return_value = get_return_value(
   1310     answer, self.gateway_client, self.target_id, self.name)
   1312 for temp_arg in temp_args:
   1313     temp_arg._detach()



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\sql\utils.py:111, in capture_sql_exception.<locals>.deco(*a, **kw)
    109 def deco(*a, **kw):
    110     try:
--> 111         return f(*a, **kw)
    112     except py4j.protocol.Py4JJavaError as e:
    113         converted = convert_exception(e.java_exception)



File ~\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))



Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (skdjsjkdksad.com executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 604, in main
  File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\worker.py", line 596, in process
  File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py", line 259, in dump_stream
    vs = list(itertools.islice(iterator, batch))
  File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\rdd.py", line 1561, in takeUpToNumLeft
    except StopIteration:
  File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\util.py", line 73, in wrapper
    return f(*args, **kwargs)
  File "D:\Users\AJ\AppData\Local\Temp\1\ipykernel_4312\679190413.py", line -1, in <lambda>
  File "D:\Users\AJ\AppData\Local\Temp\1\ipykernel_4312\679190413.py", line 5, in func1
  File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 106, in col
    return _invoke_function("col", col)
  File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 57, in _invoke_function
    jf = _get_get_jvm_function(name, SparkContext._active_spark_context)
  File "D:\Users\AJ\Anaconda3\envs\pyspark_codegeneration\lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\sql\functions.py", line 49, in _get_get_jvm_function
    return getattr(sc._jvm.functions, name)
AttributeError: 'NoneType' object has no attribute '_jvm'

NOTE: The map() and RDD transformation are a must. If the problem is in rlike(), please suggest an alternative to check RegEx and print results accordingly. Please help me in mitigating this error, thanks in advance!!


Solution

  • AFAIK, one can't use pyspark sql function within an rdd.map(), it should be pure python implementation, as the sql functions work on dataframes.

    As you want to use RDD transformation, you can solve your problem using python's re module.

    Here's an example.

    def func1(x):
        import re
        # only keeping `day` for this example
        day = x.day
        if re.match('^(Sun|Mon|Tues|Wednes|Thurs|Fri|Satur)day$', day):
            cond1 = 'Success'
        else:
            cond1 = 'Error'
        
        return (day, cond1)
    
    spark.sparkContext.parallelize([('Sunday',), ('Sun',)]).toDF(['day']). \
        rdd. \
        map(lambda x: func1(x)). \
        toDF(['day', 'cond1']). \
        show()
    
    # +------+-------+
    # |   day|  cond1|
    # +------+-------+
    # |Sunday|Success|
    # |   Sun|  Error|
    # +------+-------+