Search code examples
scalahadoopapache-sparkdata-sciencehadoop-partitioning

How to read multiple line elements in Spark , where each record of log is starting with yyyy-MM-dd format and each record of log is multi-line?


I have implemented below logic in scala so far for this :

val hadoopConf = new Configuration(sc.hadoopConfiguration); 
    //hadoopConf.set("textinputformat.record.delimiter", "2016-")
    hadoopConf.set("textinputformat.record.delimiter", "^([0-9]{4}.*)")

    val accessLogs = sc.newAPIHadoopFile("/user/root/sample.log", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf).map(x=>x._2.toString) 

I want to put regex to recognize the if line started with date format then treat it as a new record else continue to add lines in old record.

But this is not working. If i am passing date manually then its working fine. Below is the same code like this i want to put the regex:

//hadoopConf.set("textinputformat.record.delimiter", "2016-")

Please help on this.thanks in advance.

Here below is the sample format:

2016-12-23 07:00:09,693 [jetty-51 - /app/service] INFO  org.apache.cxf.interceptor.LoggingOutInterceptor S:METHOD_NAME=METHNAME : WebAppSessionId= : ChannelSessionId=web-xxx-xxx-xxx : ClientIp=xxxxxxx :  - Outbound Message

---------------------------
    ID: 1978
    Address: https://sample.domain.com/SampleService.xxx/basic
    Encoding: UTF-8
    Content-Type: text/xml
    Headers: {Accept=[*/*], SOAPAction=["WebDomain.Service/app"]}
    Payload: <soap:Envelope>
    </soap:Envelope>
2016-12-26 08:00:01,514 [jetty-1195 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip :  - ActiveSpaceCacheHandler:getServiceResponseFromCache(); exception: java.lang.Exception: getServiceResponseData: com.tibco.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey:Request.US
2016-12-26 08:00:01,624 [jetty-979 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip :  - ActiveSpaceCacheHandler:setServiceResponseInCache(); exception: com.test.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey

Solution

  • I couldn't get it working with a regex. The best I could do was hadoopConf.set("textinputformat.record.delimiter", "\n20") which may work for you if you don't have those characters in the middle of a log entry. This approach will also and give you future-proofing, supporting dates up to 2099

    If you need a regex, you could try http://dronamk.blogspot.co.uk/2013/03/regex-custom-input-format-for-hadoop.html

    My code:

    // Create some dummy data
    val s = """2016-12-23 07:00:09,693 [jetty-51 - /app/service] INFO  org.apache.cxf.interceptor.LoggingOutInterceptor S:METHOD_NAME=METHNAME : WebAppSessionId= : ChannelSessionId=web-xxx-xxx-xxx : ClientIp=xxxxxxx :  - Outbound Message
              |---------------------------
              |    ID: 1978
              |    Address: https://sample.domain.com/SampleService.xxx/basic
              |    Encoding: UTF-8
              |    Content-Type: text/xml
              |    Headers: {Accept=[*/*], SOAPAction=["WebDomain.Service/app"]}
              |    Payload: <soap:Envelope>
              |    </soap:Envelope>
              |2016-12-26 08:00:01,514 [jetty-1195 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip :  - ActiveSpaceCacheHandler:getServiceResponseFromCache(); exception: java.lang.Exception: getServiceResponseData: com.tibco.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey:Request.US
              |2016-12-26 08:00:01,624 [jetty-979 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip :  - ActiveSpaceCacheHandler:setServiceResponseInCache(); exception: com.test.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey
    """.stripMargin
    
    import java.io._
    val pw = new PrintWriter(new File("log.txt"))
    pw.write(s)
    pw.close
    
    // Now process the data
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
    import org.apache.hadoop.io.Text
    import org.apache.hadoop.io.LongWritable
    import org.apache.spark.{SparkContext, SparkConf}
    
    val conf = sc.getConf
    sc.stop()
    conf.registerKryoClasses(Array(classOf[org.apache.hadoop.io.LongWritable]))
    val sc = new SparkContext(conf)
    val hadoopConf = new Configuration(sc.hadoopConfiguration)
    hadoopConf.set("textinputformat.record.delimiter", "\n20")
    
    val accessLogs = sc.newAPIHadoopFile("log.txt", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], hadoopConf)
    accessLogs.map(x => x._2.toString).zipWithIndex().collect().foreach(println)
    

    Note that I'm using zipWithIndex just for debugging purposes. The output is:

        (2016-12-23 07:00:09,693 [jetty-51 - /app/service] INFO  org.apache.cxf.interceptor.LoggingOutInterceptor S:METHOD_NAME=METHNAME : WebAppSessionId= : ChannelSessionId=web-xxx-xxx-xxx : ClientIp=xxxxxxx :  - Outbound Message
        ---------------------------
            ID: 1978
            Address: https://sample.domain.com/SampleService.xxx/basic
            Encoding: UTF-8
            Content-Type: text/xml
            Headers: {Accept=[*/*], SOAPAction=["WebDomain.Service/app"]}
            Payload: 
            ,0)
        (16-12-26 08:00:01,514 [jetty-1195 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip :  - ActiveSpaceCacheHandler:getServiceResponseFromCache(); exception: java.lang.Exception: getServiceResponseData: com.tibco.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey:Request.US,1)
        (16-12-26 08:00:01,624 [jetty-979 - /app/service/serviceName] ERROR com.testservices.cache.impl.ActiveSpaceCacheHandler S:METHOD_NAME=ServiceInquiryWithBands : WebAppSessionId= : ChannelSessionId=SERVICE : ClientIp=client-ip :  - ActiveSpaceCacheHandler:setServiceResponseInCache(); exception: com.test.as.space.RuntimeASException: field key is not nullable and is missing in tuple for cachekey
        ,2)
    

    Note the index is the second field in the output.

    I ran this code on an IBM Datascience Exerience notebook running Scala 2.10 and Spark 1.6