Search code examples
scalaapache-sparkrdd

Scala: How to get the content of PortableDataStream instance from an RDD


As I want to extract data from binaryFiles I read the files using val dataRDD = sc.binaryRecord("Path") I get the result as org.apache.spark.rdd.RDD[(String, org.apache.spark.input.PortableDataStream)]

I want to extract the content of my files which is under the form of PortableDataStream

For that I tried: val data = dataRDD.map(x => x._2.open()).collect() but I get the following error:

java.io.NotSerializableException:org.apache.hadoop.hdfs.client.HdfsDataInputStream

If you have an idea how can I solve my issue, please HELP!

Many Thanks in advance.


Solution

  • Actually, the PortableDataStream is Serializable. That's what it is meant for. Yet, open() returns a simple DataInputStream (HdfsDataInputStream in your case because your file is on HDFS) which is not Serializable, hence the error you get.

    In fact, when you open the PortableDataStream, you just need to read the data right away. In scala, you can use scala.io.Source.fromInputStream:

    val data : RDD[Array[String]] = sc
        .binaryFiles("path/.../")
        .map{ case (fileName, pds) => {
            scala.io.Source.fromInputStream(pds.open())
                .getLines().toArray
        }}
    

    This code assumes that the data is textual. If it is not, you can adapt it to read any kind of binary data. Here is an example to create a sequence of bytes, that you could process the way you want.

    val rdd : RDD[Seq[Byte]] = sc.binaryFiles("...")
        .map{ case (file, pds) => {
            val dis = pds.open()
            val bytes = Array.ofDim[Byte](1024)
            val all = scala.collection.mutable.ArrayBuffer[Byte]()
            while( dis.read(bytes) != -1) {
                all ++= bytes
            }
            all.toSeq
        }}
    

    See the javadoc of DataInputStream for more possibilities. For instance, it possesses readLong, readDouble (and so on) methods.