Search code examples

spark rdd: grouping and filtering

I have an Rdd "labResults" of objects:

case class LabResult(patientID: String, date: Long, labName: String, value: String)

I want to transform this rdd such that it includes only one row for each patientID and labName combination. This row should be the latest row for this combination of patientID and labName ( I am interested only in the latest date when a patient had this lab). I do it in this way:

//group rows by patient and lab and take only the last one
val cleanLab = labResults.groupBy(x => (x.patientID, x.labName)).map(_._2).map { events =>
  val latest_date = events.maxBy(
  val lab = events.filter(x=> == latest_date)

Late I want to create the edges from this RDD:

val edgePatientLab: RDD[Edge[EdgeProperty]] = cleanLab
  .map({ lab =>
    Edge(lab.patientID.toLong, lab2VertexId(lab.labName), PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty])

and I am getting an error:

value patientID is not a member of Iterable[edu.gatech.cse6250.model.LabResult]

[error] Edge(lab.patientID.toLong, lab2VertexId(lab.labName), PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty]) [error] ^ [error] /hw4/stu_code/src/main/scala/edu/gatech/cse6250/graphconstruct/GraphLoader.scala:94:53: value labName is not a member of Iterable[edu.gatech.cse6250.model.LabResult] [error] Edge(lab.patientID.toLong, lab2VertexId(lab.labName), PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty]) [error] ^ [error] /hw4/stu_code/src/main/scala/edu/gatech/cse6250/graphconstruct/GraphLoader.scala:94:86: type mismatch; [error] found : Iterable[edu.gatech.cse6250.model.LabResult] [error] required: edu.gatech.cse6250.model.LabResult [error] Edge(lab.patientID.toLong, lab2VertexId(lab.labName), PatientLabEdgeProperty(lab).asInstanceOf[EdgeProperty])

So, it looks like the problem is that "cleanLab" is nor a RDD of LabResult as I expected, but an RDD of Iterable[edu.gatech.cse6250.model.LabResult]

How could I fix it?


  • Here's my approach for the first part. The stuff about Edge and those other classes I cannot help as I don't know where they come from (is that from here?)

    scala> val ds = List(("1", 1, "A", "value 1"), ("1", 3, "A", "value 3"), ("1", 3, "B", "value 3"), ("1", 2, "A", "value 2"), ("1", 3, "B", "value 3"), ("1", 5, "B", "value 5") ).toDF("patientID", "date", "labName", "value").as[LabResult]
    ds: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, date: int ... 2 more fields]
    |patientID|date|labName|  value|
    |        1|   1|      A|value 1|
    |        1|   3|      A|value 3|
    |        1|   3|      B|value 3|
    |        1|   2|      A|value 2|
    |        1|   3|      B|value 3|
    |        1|   5|      B|value 5|
    scala> val grouped = ds.groupBy("patientID", "labName").agg(max("date") as "date")
    grouped: org.apache.spark.sql.DataFrame = [patientID: string, labName: string ... 1 more field]
    |        1|      A|   3|
    |        1|      B|   5|
    scala> val cleanLab = ds.join(grouped, Seq("patientID", "labName", "date")).as[LabResult]
    cleanLab: org.apache.spark.sql.Dataset[LabResult] = [patientID: string, labName: string ... 2 more fields]
    |patientID|labName|date|  value|
    |        1|      A|   3|value 3|
    |        1|      B|   5|value 5|
    scala> cleanLab.head
    res45: LabResult = LabResult(1,3,A,value 3)