2

I have a UDF written in Scala that I'd like to be able to call through a Pyspark session. The UDF takes two parameters, string column value and a second string parameter. I've been able to successfully call the UDF if it takes only a single parameter (column value). I'm struggling to call the UDF if there's multiple parameters required. Here's what I've been able to do so far in Scala and then through Pyspark:

Scala UDF:

class SparkUDFTest() extends Serializable {
  def stringLength(columnValue: String, columnName: String): Int =
      LOG.info("Column name is: " + columnName)
      return columnValue.length
}

When using this in Scala, I've been able to register and use this UDF:

Scala main class:

val udfInstance = new SparkUDFTest()
val stringLength = spark.sqlContext.udf.register("stringlength", udfInstance.stringLength _)
val newDF = df.withColumn("name", stringLength(col("email"), lit("email")))

The above works successfully. Here's the attempt through Pyspark:

def testStringLength(colValue, colName):
  package = "com.test.example.udf.SparkUDFTest"
udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().stringLength().apply
  return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column), colName))

Call the UDF in Pyspark:

df.withColumn("email", testStringLength("email", lit("email")))

Doing the above and making some adjustments in Pyspark gives me following errors:

py4j.Py4JException: Method getStringLength([]) does not exist
or
java.lang.ClassCastException: com.test.example.udf.SparkUDFTest$$anonfun$stringLength$1 cannot be cast to scala.Function1
or
TypeError: 'Column' object is not callable

I was able to modify the UDF to take just a single parameter (the column value) and was able to successfully call it and get back a new Dataframe.

Scala UDF Class

class SparkUDFTest() extends Serializable {
  def testStringLength(): UserDefinedFunction = udf(stringLength _)
  def stringLength(columnValue: String): Int =
      LOG.info("Column name is: " + columnName)
      return columnValue.length
}

Updating Python code:

def testStringLength(colValue, colName):
  package = "com.test.example.udf.SparkUDFTest"
  udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().testStringLength().apply
  return Column(udfInstance(_to_seq(sc, [colValue], _to_java_column)))

The above works successfully. I'm still struggling to call the UDF if the UDF takes an extra parameter. How can the second parameter be passed to the UDF through in Pyspark?

4
  • why don't you implement the same udf in pyspark? then you won't have to go through such complexity Commented Feb 12, 2018 at 10:12
  • @RameshMaharjan couple of reasons: Performance since Python UDFs have considerable overhead due to the calls made to the Python interpreter. Also, I wan to limit code duplication and not have the same exact function written in Python if it can be called through a Pyspark session Commented Feb 13, 2018 at 8:19
  • By not wanting code duplication, you are inviting performance issues. code serialization and deserialization and code compilation and decompilation. Commented Feb 13, 2018 at 8:31
  • @RameshMaharjan Yes that's the plan but just wanted to see if calling the Scala UDF through Pyspark was possible without any complexities before heading in that direction. Commented Feb 14, 2018 at 20:29

1 Answer 1

1

I was able to resolve this by using currying. First registered the UDF as

def testStringLength(columnName): UserDefinedFunction = udf((colValue: String) => stringLength(colValue, colName)

Called the UDF

udfInstance = sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(testpackage).newInstance().testStringLength("email").apply
df.withColumn("email", Column(udfInstance(_to_seq(sc, [col("email")], _to_java_column))))

This can be cleaned up a bit more but it's how I got it to work.

Edit: The reason I went with currying is because even when I was using 'lit' on the second argument that I wanted to pass in as a String to the UDF, I kept exerperiencing the "TypeError: 'Column' object is not callable" error. In Scala I did not experience this issue. I am not sure as to why this was happening in Pyspark. It's possible it could be due to some complication that may occur between the Python interpreter and the Scala code. Still unclear but currying works for me.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.