Search code examples
apache-spark

What is stringWritableConverter used for


There are a couple of implicit conversions defined for basic types to their Writable counterpart in SparkContext.scala,such as Int,Long,String.

Take String for example, there are two definitions defined as below:

  implicit def stringWritableConverter(): WritableConverter[String] =
    simpleWritableConverter[String, Text](_.toString)

And

  implicit val stringWritableConverterFn: () => WritableConverter[String] =
    () => simpleWritableConverter[String, Text](_.toString)

I would ask how these method and variable are used. They are used for implicit type conversion, but there is no input argument for them(stringWritableConverter takes no argument, and also stringWritableConverterFn).

How does these implicit conversion is to be converted to WritableConverter

There are no argument for them, I just can't get how/when these conversions are used

Thanks.

UPDATE

To better illustate the problem with code, I write following simple code to minic the spark stuff

import scala.reflect.{ClassTag, classTag}

trait Writable

class IntWritable(value: Int) extends Writable

class Text(value: String) extends Writable

class WritableConverter[T](
                            val writableClass: ClassTag[T] => Class[_ <: Writable],
                            val convert: Writable => T)

object implicit_test {

  private def simpleWritableConverter[T, W <: Writable : ClassTag](convert: W => T)
  : WritableConverter[T] = {
    println("Hello, simpleWritableConverter")
    val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
    new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
  }

  implicit val stringWritableConverterFn: () => WritableConverter[String] = {
    println("Hello  stringWritableConverterFn")
    () => simpleWritableConverter[String, Text](_.toString)
  }

  implicit def stringWritableConverter(): WritableConverter[String] = {
    println("Hello  stringWritableConverter")
    simpleWritableConverter[String, Text](_.toString)
  }


  def do_convert(a: String) = println(s"a is $a")

  def main(args: Array[String]): Unit = {

    //Compile Error: Required String, Found Text
    do_convert(new Text("abc"))
  }

}

When I call the do_convert(new Text("abc")), the compiler complains that Required String, Found Text,which means implicit conversion doesn't take effect.


Solution

  • I don't understand 100% of the story here, but I feel I might have some useful insights to bring you. I'll be using Spark version 3.4.1 source code (and corresponding Hadoop version 3.3.4) for this explanation.

    The full story looks a bit more like this:

    /**
     * A class encapsulating how to convert some type `T` from `Writable`. It stores both the `Writable`
     * class corresponding to `T` (e.g. `IntWritable` for `Int`) and a function for doing the
     * conversion.
     * The getter for the writable class takes a `ClassTag[T]` in case this is a generic object
     * that doesn't know the type of `T` when it is created. This sounds strange but is necessary to
     * support converting subclasses of `Writable` to themselves (`writableWritableConverter()`).
     */
    private[spark] class WritableConverter[T](
        val writableClass: ClassTag[T] => Class[_ <: Writable],
        val convert: Writable => T)
      extends Serializable
    
    object WritableConverter {
    
      // Helper objects for converting common types to Writable
      private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
      : WritableConverter[T] = {
        val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
        new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
      }
    
      // The following implicit functions were in SparkContext before 1.3 and users had to
      // `import SparkContext._` to enable them. Now we move them here to make the compiler find
      // them automatically. However, we still keep the old functions in SparkContext for backward
      // compatibility and forward to the following functions directly.
    
      // The following implicit declarations have been added on top of the very similar ones
      // below in order to enable compatibility with Scala 2.12. Scala 2.12 deprecates eta
      // expansion of zero-arg methods and thus won't match a no-arg method where it expects
      // an implicit that is a function of no args.
    
      ...
    
      implicit val stringWritableConverterFn: () => WritableConverter[String] =
        () => simpleWritableConverter[String, Text](_.toString)
      
      ...
    
      // These implicits remain included for backwards-compatibility. They fulfill the
      // same role as those above.
    
      ...
    
      implicit def stringWritableConverter(): WritableConverter[String] =
        simpleWritableConverter[String, Text](_.toString)
    
      ...
    }
    
    

    What are these Writable objects?

    I'll quote a part of this great answer here:

    As we already know, data needs to be transmitted between different nodes in a distributed computing environment. This requires serialization and deserialization of data to convert the data that is in structured format to byte stream and vice-versa. Hadoop therefore uses simple and efficient serialization protocol to serialize data between map and reduce phase and these are called Writable(s). Some of the examples of writables as already mentioned before are IntWritable, LongWritable, BooleanWritable and FloatWritable.

    So we know we will be bouncing onto a bunch of these Writable objects when using Hadoop. In order to do our trusty calculations using primitive types, we need to have a way to go from a Writable to our primitives.

    Why do we seem to have 2 of these stringWritableConverter?

    If you look closely to the code comments in the bit I pasted here, you'll see that stringWritableConverter and stringWritableConverterFn do exactly the same thing. stringWritableConverter is only there for backwards compatibility.

    So we can simplify our problem: only stringWritableConverterFn matters here.

    What happens with this stringWritableConverterFn?

    In Scala, implicits are resolved at compile time. So at compile time, the following will be called:

    simpleWritableConverter[String, Text](_.toString)
    

    Pay close attention to the types here [String, Text]. String is the primitive type, Text is the Writable class org.apache.hadoop.io.Text.

    Let's have a look at the simpleWritableConverter function in isolation:

      private[spark] def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
      : WritableConverter[T] = {
        val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
        new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
      }
    

    So, a new WritableConverter object is created that stores 2 things:

    • the Writable class
    • the convert function, whose signature is W => T. This is the toString method of Text in Hadoop.

    Conclusion

    In Hadoop, Writable classes are used ubiquitously. If we want to be able to retrieve the primitive types they represent, we need to be able to convert those classes to their primitive types. That is what these WritableConverter objects are used for.

    In the end, we created this WritableConverter[String] class that contains a way to convert a Text Writable to a String primitive through that toString method!

    Answer to your update

    If you want to see this converter in action, you can see a test example:

      test("BytesWritable implicit conversion is correct") {
        // Regression test for SPARK-3121
        val bytesWritable = new BytesWritable()
        val inputArray = (1 to 10).map(_.toByte).toArray
        bytesWritable.set(inputArray, 0, 10)
        bytesWritable.set(inputArray, 0, 5)
    
        val converter = WritableConverter.bytesWritableConverter()
        val byteArray = converter.convert(bytesWritable)
        assert(byteArray.length === 5)
    
        bytesWritable.set(inputArray, 0, 0)
        val byteArray2 = converter.convert(bytesWritable)
        assert(byteArray2.length === 0)
      }
    

    In here, you see that we create this converter object that is able to do the conversion for you using its convert method (from the WritableConverter[T] class in the start of this post).

    The difference with your example is that you're not calling the convert method of the WritableConverter. So Scala can't find any function that converts a Text into a String.