Search code examples
scalaapache-sparkrdd

How to read PDF files and xml files in Apache Spark scala?


My sample code for reading text file is

val text = sc.hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
    var rddwithPath = text.asInstanceOf[HadoopRDD[LongWritable, Text]].mapPartitionsWithInputSplit { (inputSplit, iterator) ⇒
      val file = inputSplit.asInstanceOf[FileSplit]
      iterator.map { tpl ⇒ (file.getPath.toString, tpl._2.toString) }
    }.reduceByKey((a,b) => a)

In this way how can I use PDF and Xml files


Solution

  • PDF & XML can be parsed using Tika:

    look at Apache Tika - a content analysis toolkit enter image description here look at - https://tika.apache.org/1.9/api/org/apache/tika/parser/xml/
    - http://tika.apache.org/0.7/api/org/apache/tika/parser/pdf/PDFParser.html
    - https://tika.apache.org/1.9/api/org/apache/tika/parser/AutoDetectParser.html
    Below is example integration of Spark with Tika :

    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    import org.apache.spark.input.PortableDataStream
    import org.apache.tika.metadata._
    import org.apache.tika.parser._
    import org.apache.tika.sax.WriteOutContentHandler
    import java.io._
    
    object TikaFileParser {
    
      def tikaFunc (a: (String, PortableDataStream)) = {
    
        val file : File = new File(a._1.drop(5))
        val myparser : AutoDetectParser = new AutoDetectParser()
        val stream : InputStream = new FileInputStream(file)
        val handler : WriteOutContentHandler = new WriteOutContentHandler(-1)
        val metadata : Metadata = new Metadata()
        val context : ParseContext = new ParseContext()
    
        myparser.parse(stream, handler, metadata, context)
    
        stream.close
    
        println(handler.toString())
        println("------------------------------------------------")
      }
    
    
      def main(args: Array[String]) {
    
        val filesPath = "/home/user/documents/*"
        val conf = new SparkConf().setAppName("TikaFileParser")
        val sc = new SparkContext(conf)
        val fileData = sc.binaryFiles(filesPath)
        fileData.foreach( x => tikaFunc(x))
      }
    }