Search code examples
javaapache-sparkudf

How Java code register none parameter UDF with Spark


I have set of APIs so can define different UDF to use. Such as:

import scala.Function0;
class UDF0 {
	private String targetField;
	private Function0 function0;
}

import scala.runtime.AbstractFunction0;
udf0.setFunction0(new AbstractFunction0<String>() {
		@Override
		public String apply() {
			return "IA";
		}
	})

class UDF0Parser implement Parser<UDF0> {
	public void parse(UDF0 udf0) {
		String udfName = "udf0";
		getSparkSession().udf().register(udfName, ()-> udf0.getFunction0().apply(), ???);
		Column col = functions.callUDF(udfName);
		getDateSet().withColumn("newCol", col);
	}
}

  • Question1

How can I get the scala String TypeTag (position ???, third parameter) in Java?

I turn to write UDF0Parser using scala:

class UDF0Parser implement Parser<UDF0> {
	def parse(udf0: UDF0): Unit = {
		val udfName = "udf0"
		getSparkSession.udf.register(udfName, udf0.getFunction0)
		val col = functions.callUDF(udfName)
		getDateSet.withColumn("new", col)
	}
}

But I got a runtime error:

Error:(14, 65) type mismatch;
 found   : Function0
 required: () => ?
    stepContext.getSparkSession.udf.register(udfName, transform.getFunction0);
                                                                ^

  • Question2

Isn't ()->xxx just a instance of Function0? What should I do?

Appreciate any help.


Solution

  • I found a solution myself, passing whole row as parameter to UDF, not need to write UDF for one or more columns. See: How to pass whole Row to UDF - Spark DataFrame filter