Search code examples
scalahadoopapache-sparkhl7

Transforming a collections of strings in an RDD


I am trying to parse HL7 values for that i am using few jar files, those jar files parse HL7 values from string, i executed the code and it is working fine in scala but now i want the same thing to be run in scala for that i am using below code with the sample but i am getting the below error. So to solve the issue i want to convert org.apache.spark.rdd.RDD[String] to String.

Code:

 object ExampleUseTerser {
  def main(args: Array[String]): Unit = {
    val textfile = sc.textFile("/user/cloudera/xxxx/File")
    val context : HapiContext = new DefaultHapiContext();
    val p  = context.getGenericParser();
    val hapiMsg  = p.parse(textfile);
    val terser  = new Terser(hapiMsg);
    val secondAllergyType:String  = terser.get("/PID-7");
    println(secondAllergyType);

  } 
}

Update

Sample :

PID|1|5897450M|58974650M|58977650M|CSTO^TES||19320722|F|||745 - 81 ST^^IAMI BEH^FL^341|||||||332165520
ORC||5033220|503320||||||20150202|||1689659096^HAM^MISH^^^^PI
OBR|1||64564|DF DEFAULT|||20150202|2015002||||||||16096^^^^^I|||||||||
HHH|1|NM|6690-2^^LN^0^^L||7|XE/UL|3.4-18||||F|||20150202| ^^L
HHH|9|NM|777-3^LOINC^LN^015172^PLATELETS^L||185|X10E3/UL|150-379||||F|||201202|TA   ^^L
HHH|10|NM|770-8^^LN^015107^^L||65|%|||||F|||20150202|TA   ^^L
HHH|11|NM|736-9^LOINC^LN^015123^^L||26|%|||||F|||20150202|TA   ^^L
HHH|12|NM|5905-5^LOINC^LN^015131^^L||8|%|||||F|||20150202|TA   ^^L
HHH|13|NM|713-8^LOINC^LN^015149^^L||1|%|||||F|||20150202|TA   ^^L

Error:

error: type mismatch;
 found   : org.apache.spark.rdd.RDD[String]
 required: String
           val hapiMsg  = p.parse(textfile);

Solution

  • When you're working with an RDD, you need to remember that the abstraction is over a collection of values (it's actually a little bit more complex, but lets leave it at that for the moment), and not a single value.

    Given your example, we need to map over all the elements in the collection and try and extract their PID. We can do it using mapPartitions so we don't have to allocate a new HapiContext for each value:

    object ExampleUseTerser {
      def main(args: Array[String]): Unit = {
        val textfile = sc.textFile("/user/cloudera/xxxx/File")
        val parsedData = textfile.mapPartitions { it => 
          val context: HapiContext = new DefaultHapiContext()
          val parser = context.getGenericParser()
    
          it.map { file => 
            val hapiMsg = parser.parse(file)
            val terser = new Terser(hapiMsg)
            terser.get("/PID-7")
          }
        }
      } 
    }
    

    If you're testing this out and want to view the parsed files, you can use RDD.collect as I've mentioned in my previous answer (but of course don't do that when using this in any kind of production environment).