Search code examples
apache-sparkpysparkrddcase-classpython-dataclasses

Creating an Apache Spark RDD of a Class in PySpark


I have to convert a Scala code to python.

The scala code converts an RDD of string to RDD of case-class. The code is as follow :

case class Stock(
                  stockName: String,
                  dt: String,
                  openPrice: Double,
                  highPrice: Double,
                  lowPrice: Double,
                  closePrice: Double,
                  adjClosePrice: Double,
                  volume: Double
                )


  def parseStock(inputRecord: String, stockName: String): Stock = {
    val column = inputRecord.split(",")
    Stock(
      stockName,
      column(0),
      column(1).toDouble,
      column(2).toDouble,
      column(3).toDouble,
      column(4).toDouble,
      column(5).toDouble,
      column(6).toDouble)
  }

  def parseRDD(rdd: RDD[String], stockName: String): RDD[Stock] = {
    val header = rdd.first
    rdd.filter((data) => {
      data(0) != header(0) && !data.contains("null")
    })
      .map(data => parseStock(data, stockName))
  }  

Is it possible to implement this in PySpark? I tried to use following code and it gave error

from dataclasses import dataclass

@dataclass(eq=True,frozen=True)
class Stock:
    stockName : str
    dt: str
    openPrice: float
    highPrice: float
    lowPrice: float
    closePrice: float
    adjClosePrice: float
    volume: float


 

def parseStock(inputRecord, stockName):
  column = inputRecord.split(",")
  return Stock(stockName,
               column[0],
               column[1],
               column[2],
               column[3],
               column[4],
               column[5],
               column[6])

def parseRDD(rdd, stockName):
  header = rdd.first()
  res = rdd.filter(lambda data : data != header).map(lambda data : parseStock(data, stockName))
  return res

Error Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 31, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

File "/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 364, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command command = serializer._read_with_length(file) File "/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length return self.loads(obj) File "/content/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 587, in loads return pickle.loads(obj, encoding=encoding) AttributeError: Can't get attribute 'main' on <module 'builtins' (built-in)>


Solution

  • The Dataset API is not available for python.

    "A Dataset is a distributed collection of data. Dataset is a new interface added in Spark 1.6 that provides the benefits of RDDs (strong typing, ability to use powerful lambda functions) with the benefits of Spark SQL’s optimized execution engine. A Dataset can be constructed from JVM objects and then manipulated using functional transformations (map, flatMap, filter, etc.). The Dataset API is available in Scala and Java. Python does not have the support for the Dataset API. But due to Python’s dynamic nature, many of the benefits of the Dataset API are already available (i.e. you can access the field of a row by name naturally row.columnName). The case for R is similar."

    https://spark.apache.org/docs/latest/sql-programming-guide.html