Search code examples
apache-sparkjson4s

Error calling `JValue.extract` from distributed operations in spark-shell


I am trying to use the case class extraction feature of json4s in Spark, ie calling jvalue.extract[MyCaseClass]. It works fine if I bring the JValue objects into the master and do the extraction there, but the same calls fail in the workers:

import org.json4s._
import org.json4s.jackson.JsonMethods._
import scala.util.{Try, Success, Failure}

val sqx = sqlContext

val data = sc.textFile(inpath).coalesce(2000)

case class PageView(
 client:  Option[String]
)

def extract(json: JValue) = {
  implicit def formats = org.json4s.DefaultFormats
  Try(json.extract[PageView]).toOption
}

val json = data.map(parse(_)).sample(false, 1e-6).cache()

// count initial inputs
val raw = json.count 


// count successful extractions locally -- same value as above
val loc = json.toLocalIterator.flatMap(extract).size

// distributed count -- always zero
val dist = json.flatMap(extract).count // always returns zero

// this throws  "org.json4s.package$MappingException: Parsed JSON values do not match with class constructor"
json.map(x => {implicit def formats = org.json4s.DefaultFormats; x.extract[PageView]}).count

The implicit for Formats is defined locally in the extract function since DefaultFormats is not serializable and defining it at top level caused it to be serialized to for transmission to the workers rather than constructed there. I think the proble still has something to do with the remote initialization of DefaultFormats, but I am not sure what it is.

When I call the extract method directly, insted of my extract function, like in the last example, it no longer complains about serialization but just throws an error that the JSON does not match the expected structure.

How can I get the extraction to work when distributed to the workers?


Edit

@WesleyMiao has reproduced the problem and found that it is specific to spark-shell. He reports that this code works as a standalone application.


Solution

  • I got the same exception as yours when running your code in spark-shell. However when I turn your code into a real spark app and submit it to a standalone spark cluster, I got expected results with no exception.

    Below is the code I put in a simple spark app.

    val data = sc.parallelize(Seq("""{"client":"Michael"}""", """{"client":"Wesley"}"""))
    
    val json = data.map(parse(_))
    
    val dist = json.mapPartitions { jsons =>
      implicit val formats = org.json4s.DefaultFormats
      jsons.map(_.extract[PageView])
    }
    
    dist.collect() foreach println
    

    And when I run it using spark-submit, I got the following result.

    PageView(Some(Michael))                                                                                                                                       
    PageView(Some(Wesley))
    

    And I am also sure that it is running not in "local[*]" mode.

    Now I suspect the reason we got exceptions while running in spark-shell has something to do with the case class PageView definition in spark-shell and how spark-shell serialize / distribute it to executor.