I have the Pyspark Program with several UDF Functions. To improve its performance I decided to write UDF functions in Scala and use its Jar file in Pyspark. So, for one of the UDF Function in Scala which converts Persian Date to Gregorian I did these steps.
First, I wrote the Scala function in Intellij which is in the following:
import org.apache.spark.sql.api.java.UDF1
import com.bahmanm.persianutils.DateConverter._
class dateconverter extends UDF1[String, String] {
override def call(ds: String): String = {
val year = ds.substring(0, 4)
val month = ds.substring(4, 6)
val day = ds.substring(6, 8)
val outputDate = s"$year/$month/$day"
println(outputDate)
val pDate4 = SimpleDate(outputDate) // initialising from a String
println("Date : "+pDate4)
persianToGregorian(pDate4).toString()
}
}
Moreover, I have the build.sbt and plugins.sbt in the Scala project structure. build.sbt is in following:
name := "UDFLib"
version := "0.1"
scalaVersion := "2.12.8"
val root = project.in(file("."))
.settings(Seq(libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "3.1.2" % Provided,
"org.apache.spark" %% "spark-sql" % "3.1.2" % Provided,
"com.bahmanm" %% "persianutils" % "4.0"
))
)
plugins.sbt has this contents:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")
After that, I run this command in Terminal:
sbt clean assembly
Then I copy the Jar file in the Pyspark project folder and set the program like this:
conf = SparkConf() \
.setAppName(appname) \
.setMaster(master) \
.set("spark.jars", driver)\
.set("spark.jars","./UDFLib-assembly-0.1.jar")
from pyspark.sql.column import Column, _to_java_column, _to_seq
def datecovert(data):
logFeaturesUDF = spark._jvm.dateconverter()
return Column(logFeaturesUDF.apply(_to_seq(spark.sparkContext, [data], _to_java_column)))
df.select(datecovert(col("persian_date"))).show()
When I run Pyspark program I receive this error:
An error occurred while calling o809.apply. Trace:
py4j.Py4JException: Method apply([class scala.collection.convert.Wrappers$JListWrapper]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:274)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
Would you please guide me what is the correct way to create a Scala UDF Jar file to use in Pyspark program?
Any help is really appreciated.