I have a UDF written in Scala that I'd like to be able to call through a Pyspark session. The UDF takes two parameters, string column value and a second string parameter. I've been able to successfully call the UDF if it takes only a single parameter (column value). I'm struggling to call the UDF if there's multiple parameters required. Here's what I've been able to do so far in Scala and then through Pyspark:
Scala UDF:
class SparkUDFTest() extends Serializable {
def stringLength(columnValue: String, columnName: String): Int =
LOG.info("Column name is: " + columnName)
return columnValue.length
}
When using this in Scala, I've been able to register and use this UDF:
Scala main class:
val udfInstance = new SparkUDFTest()
val stringLength = spark.sqlContext.udf.register("stringlength", udfInstance.stringLength _)
val newDF = df.withColumn("name", stringLength(col("email"), lit("email")))
The above works successfully. Here's the attempt through Pyspark:
def testStringLength(colValue, colName):
package = "com.test.example.udf.SparkUDFTest"
udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().stringLength().apply
return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column), colName))
Call the UDF in Pyspark:
df.withColumn("email", testStringLength("email", lit("email")))
Doing the above and making some adjustments in Pyspark gives me following errors:
py4j.Py4JException: Method getStringLength([]) does not exist
or
java.lang.ClassCastException: com.test.example.udf.SparkUDFTest$$anonfun$stringLength$1 cannot be cast to scala.Function1
or
TypeError: 'Column' object is not callable
I was able to modify the UDF to take just a single parameter (the column value) and was able to successfully call it and get back a new Dataframe.
Scala UDF Class
class SparkUDFTest() extends Serializable {
def testStringLength(): UserDefinedFunction = udf(stringLength _)
def stringLength(columnValue: String): Int =
LOG.info("Column name is: " + columnName)
return columnValue.length
}
Updating Python code:
def testStringLength(colValue, colName):
package = "com.test.example.udf.SparkUDFTest"
udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().testStringLength().apply
return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column)))
The above works successfully. I'm still struggling to call the UDF if the UDF takes an extra parameter. How can the second parameter be passed to the UDF through in Pyspark?