I have a Not Serializable Class exception in Spark 2.2.0. The following procedure is what I am trying to do in Scala:
java.awt.image.BufferedImageS
.java.awt.image.BufferedImage
buffer and store it in a 2D array for each image, by building an array of two-dimensional arrays containing the image buffer information Array[Array[Int]]
.Array[Array[Int]]
into an org.apache.spark.rdd.RDD[Array[Array[Int]]]
by using sc.parallelize
method.org.apache.spark.rdd.RDD[Array[Array[Int]]]
.This is the code:
import org.apache.spark.sql.SparkSession
import javax.imageio.ImageIO
import java.io.ByteArrayInputStream
def binarize(image: Array[Array[Int]], threshold: Int) : Array[Array[Int]] = {
val height = image.size
val width = image(0).size
val result = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
result(i)(j) = if (image(i)(j) <= threshold) 0 else 255
}
}
result
}
object imageTestObj {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
val sc = spark.sparkContext
val saveToHDFS = false
val threshold: Int = 128
val partitions = 32
val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"
val files = sc.binaryFiles(inPathStr).collect
val AWTImageArray = files.map { binFile =>
val input = binFile._2.open()
val name = binFile._1
var buffer: Array[Byte] = Array.fill(input.available)(0)
input.readFully(buffer)
ImageIO.read(new ByteArrayInputStream(buffer))
}
val ImgBuffers = AWTImageArray.map { image =>
val height = image.getHeight
val width = image.getWidth
val buffer = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
}
}
buffer
}
val inputImages = sc.parallelize(ImgBuffers, partitions).cache()
val op1 = inputImages.map(image => binarize(image, threshold))
}
}
This algorithm gets a very well-known exception:
org.apache.spark.SparkException: Task not serializable
...
Caused by: java.io.NotSerializableException: java.awt.image.BufferedImage
Serialization stack:
- object not serializable (class: java.awt.image.BufferedImage, ...
I do not understand why Spark attempts to serialize the BufferedImage
class when it is used before creating the first RDD
in the application. Isn't it supposed that the BufferedImage
class should be serialized if I try to create an RDD[BufferedImage]
?
Can somebody explain me what is going on?
Thank you in advance...
Actually you are serializing a function in Spark. This function cannot contain references to non serializable classes. You can instantiate in the function non-serializable classes (OK), but NOT refer to instances of non serializable classes in the function.
Most probably you are referencing in one of the functions you use to an instance of a BufferedImage.
Check your code and see if you are not referencing from a function a BufferedImage object.
By inlining some code and not serializing BufferedImage objects, I guess you can overcome the exception. Can you try out this code (did not execute it myself)?:
object imageTestObj {
def main(args: Array[String]) {
val spark = SparkSession.builder().appName("imageTest2").getOrCreate()
val sc = spark.sparkContext
val saveToHDFS = false
val threshold: Int = 128
val partitions = 32
val inPathStr = "hdfs://192.168.239.218:9000/vitrion/input"
val outPathStr = if (saveToHDFS) "hdfs://192.168.239.54:9000/vitrion/output/" else "/home/vitrion/IdeaProjects/imageTest2/output/"
val ImgBuffers = sc.binaryFiles(inPathStr).collect.map { binFile =>
val input = binFile._2.open()
val name = binFile._1
var buffer: Array[Byte] = Array.fill(input.available)(0)
input.readFully(buffer)
val image = ImageIO.read(new ByteArrayInputStream(buffer))
// Inlining must be here, so that BufferedImage is not serialized.
val height = image.getHeight
val width = image.getWidth
val buffer = Array.ofDim[Int](height, width)
for (i <- 0 until height) {
for (j <- 0 until width){
buffer(i)(j) = image.getRaster.getDataBuffer.getElem(0, i * width + j)
}
}
buffer
}
val inputImages = sc.parallelize(ImgBuffers, partitions).cache()
val op1 = inputImages.map(image => binarize(image, threshold))
}
}