I've got an RDD
of text files which I want to parse. I achieve this by mapping a function over them which returns Either[String, Book]
where Book
is the structured type resulting from parsing, or String
is the text which could not be parsed. The result of this is an RDD[Either[String, Book]]
. I would like to have an RDD[String]
and an RDD[Book]
, because the former should be logged and discarded, and the latter should be processed more.
My splitter is:
implicit class EitherRDDOps[L, R](rdd: RDD[Either[L, R]]) {
def split(): (RDD[L], RDD[R]) = {
// toSeq on Either provides empty Seq for Right and one-element Seq for Left
val left: RDD[L] = rdd.flatMap(_.swap.toSeq)
val right: RDD[R] = rdd.flatMap(_.toSeq)
(left, right)
}
}
The splitter is called as input.map(parseBook).cache.split
, where input
is an RDD[String]
and parseBook
is a (String) => Either[String, Book]
.
I get the following compilation errors:
value toSeq is not a member of Product with Serializable with scala.util.Either
val left: RDD[L] = rdd.flatMap(_.swap.toSeq)
^
value toSeq is not a member of Either[L,R]
val right: RDD[R] = rdd.flatMap(_.toSeq)
^
type mismatch;
found : org.apache.spark.rdd.RDD[Nothing]
required: org.apache.spark.rdd.RDD[L]
Note: Nothing <: L, but class RDD is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
(left, right)
^
found : org.apache.spark.rdd.RDD[Nothing]
required: org.apache.spark.rdd.RDD[R]
Note: Nothing <: R, but class RDD is invariant in type T.
You may wish to define T as +T instead. (SLS 4.5)
(left, right)
^
But the documentation clearly lists a toSeq
method on Either
. Any idea? Should I be going about this differently?
It seems as if you are using a slightly older version of Scala, maybe 2.11.x or something like that. The Either
has been updated recently, older versions might have no toSeq
: link to 2.11.8 documentation.
Try this instead:
val left = rdd.filter(_.isRight).map(_.right.get)
val right = rdd.filter(_.isLeft).map(_.left.get)