Search code examples
apache-sparkserializationrddbufferedimage

Spark can not serialize the BufferedImage class


I have a Not Serializable Class exception in Spark 2.2.0. The following procedure is what I am trying to do in Scala:

  1. To read from HDFS a set of JPEG images.
  2. To build an array of java.awt.image.BufferedImageS.
  3. To extract the java.awt.image.BufferedImage buffer and store it in a 2D array for each image, by building an array of two-dimensional arrays containing the image buffer information Array[Array[Int]].
  4. Transform the Array[Array[Int]] into an org.apache.spark.rdd.RDD[Array[Array[Int]]] by using sc.parallelize method.
  5. Perform image processing operations distributelly by transforming the initial org.apache.spark.rdd.RDD[Array[Array[Int]]].

This is the code:

import org.apache.spark.sql.SparkSession
import javax.imageio.ImageIO
import java.io.ByteArrayInputStream

def binarize(image: Array[Array[Int]], threshold: Int) : Array[Array[Int]] = {
    val height = image.size
    val width = image(0).size
    val result = Array.ofDim[Int](height, width)
    for (i <- 0 until height) {
        for (j <- 0 until width){
            result(i)(j) = if (image(i)(j) <= threshold)  0 else 255
        }
    }
    result
}

object imageTestObj {
    def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
        val sc = spark.sparkContext
        val saveToHDFS = false
        val threshold: Int = 128
        val partitions = 32
        val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
        val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"

        val files = sc.binaryFiles(inPathStr).collect

        val AWTImageArray = files.map { binFile =>
            val input = binFile._2.open()
            val name = binFile._1
            var buffer: Array[Byte] = Array.fill(input.available)(0)
            input.readFully(buffer)
            ImageIO.read(new ByteArrayInputStream(buffer))
        }

        val ImgBuffers = AWTImageArray.map { image =>
            val height = image.getHeight
            val width = image.getWidth
            val buffer = Array.ofDim[Int](height, width)
            for (i <- 0 until height) {
                for (j <- 0 until width){
                    buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
                }
            }
            buffer
        }

        val inputImages = sc.parallelize(ImgBuffers, partitions).cache()

        val op1 = inputImages.map(image => binarize(image, threshold))
    }
}

This algorithm gets a very well-known exception:

org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: java.awt.image.BufferedImage
Serialization stack:
- object not serializable (class: java.awt.image.BufferedImage, ...

I do not understand why Spark attempts to serialize the BufferedImage class when it is used before creating the first RDD in the application. Isn't it supposed that the BufferedImage class should be serialized if I try to create an RDD[BufferedImage]?

Can somebody explain me what is going on?

Thank you in advance...


Solution

  • Actually you are serializing a function in Spark. This function cannot contain references to non serializable classes. You can instantiate in the function non-serializable classes (OK), but NOT refer to instances of non serializable classes in the function.

    Most probably you are referencing in one of the functions you use to an instance of a BufferedImage.

    Check your code and see if you are not referencing from a function a BufferedImage object.

    By inlining some code and not serializing BufferedImage objects, I guess you can overcome the exception. Can you try out this code (did not execute it myself)?:

    object imageTestObj {
      def main(args: Array[String]) {
        val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
        val sc = spark.sparkContext
        val saveToHDFS = false
        val threshold: Int = 128
        val partitions = 32
        val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
        val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"
    
        val ImgBuffers = sc.binaryFiles(inPathStr).collect.map { binFile =>
    
          val input = binFile._2.open()
          val name = binFile._1
          var buffer: Array[Byte] = Array.fill(input.available)(0)
          input.readFully(buffer)
          val image = ImageIO.read(new ByteArrayInputStream(buffer))
          // Inlining must be here, so that BufferedImage is not serialized.
          val height = image.getHeight
          val width = image.getWidth
          val buffer = Array.ofDim[Int](height, width)
          for (i <- 0 until height) {
            for (j <- 0 until width){
              buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
            }
          }
          buffer
        }
    
        val inputImages = sc.parallelize(ImgBuffers, partitions).cache()
    
        val op1 = inputImages.map(image => binarize(image, threshold))
      }
    }