Search code examples
scalaapache-sparkscala-cats

Generic method which works with RDD and Seq


I would like to write a method which would accept RDD and Seq without having to duplicate my code.

def myMethod[F[_]](input: F[InputClass]): F[OutputClass] = {
    // do something here like
    input.map{ i => 
       // transformed input OutputClass
    }
}

F could be Seq or RDD since they both have the method map implemented.

For more unique method like count or cache can I make the Seq do nothing for cache and use length for count?


Solution

  • What you want is a Type Class. If you only need map and flatMap methods, I would recommend you to use a Monad (maybe the Cats one) and provide a the implementation for RDD.

    Now, if you want more methods, you can implement your own Type Class.

    import scala.language.higherKinds
    
    trait DataCollection[F[_]] {
      def map[A, B](col: F[A])(f: A => B): F[B]
      def cache[A](col: F[A]): F[A]
      def count[A](col: F[A]): Long
    }
    
    object DataCollection { 
      implicit val RddDataCollection: DataCollection[RDD] = new DataCollection[RDD] {
        override def map[A, B](rdd: RDD[A])(f: A => B): RDD[B] = rdd.map(f)
        override def cache[A](rdd: RDD[A]): RDD[A] = rdd.cache()
        override def count[A](rdd: RDD[A]): Long = rdd.count()
      }
    
      implicit val SeqDataCollection: DataCollection[Seq] = new DataCollection[Seq] {
        override def map[A, B](seq: Seq[A])(f: A => B): Seq[B] = seq.map(f)
        override def cache[A](seq: Seq[A]): Seq[A] = seq
        override def count[A](seq: Seq[A]): Long = seq.length
      }
    
      implicit class Ops[F[_], A](val col: F[A]) extends AnyVal {
        @inline
        def map[B](f: A => B)(implicit DC: DataCollection[F]): F[B] = DC.map(col)(f)
    
        @inline
        def cache()(implicit DC: DataCollection[F]): F[A] = DC.cache(col)
    
        @inline
        def count()(implicit DC: DataCollection[F]): Long = DC.count(col)
      }
    }
    
    def myGenericMethod[F[_]: DataCollection, T](col: F[T]): Long = {
      import DataCollection.Ops
      col.map(x => x).cache().count()
    }