0

I have the Pyspark Program with several UDF Functions. To improve its performance I decided to write UDF functions in Scala and use its Jar file in Pyspark. So, for one of the UDF Function in Scala which converts Persian Date to Gregorian I did these steps.

First, I wrote the Scala function in Intellij which is in the following:

  import org.apache.spark.sql.api.java.UDF1
  import com.bahmanm.persianutils.DateConverter._

  class dateconverter extends UDF1[String, String] {

     override def call(ds: String): String = {
       val year = ds.substring(0, 4)
       val month = ds.substring(4, 6)
       val day = ds.substring(6, 8)
       val outputDate = s"$year/$month/$day"
       println(outputDate)
       val pDate4 = SimpleDate(outputDate) // initialising from a String
       println("Date : "+pDate4)
       persianToGregorian(pDate4).toString()
     }
   }

Moreover, I have the build.sbt and plugins.sbt in the Scala project structure. build.sbt is in following:

  name := "UDFLib"

  version := "0.1"

  scalaVersion := "2.12.8"

  val root = project.in(file("."))
      .settings(Seq(libraryDependencies ++= Seq(
       "org.apache.spark" %% "spark-core" % "3.1.2" % Provided,
       "org.apache.spark" %% "spark-sql" % "3.1.2" % Provided,
       "com.bahmanm" %% "persianutils" % "4.0"
       ))
     )

plugins.sbt has this contents:

 addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "2.1.1")

After that, I run this command in Terminal:

 sbt clean assembly

Then I copy the Jar file in the Pyspark project folder and set the program like this:

 conf = SparkConf() \
        .setAppName(appname) \
        .setMaster(master) \
        .set("spark.jars", driver)\
        .set("spark.jars","./UDFLib-assembly-0.1.jar")

 from pyspark.sql.column import Column, _to_java_column, _to_seq
 def datecovert(data):
    logFeaturesUDF = spark._jvm.dateconverter()
     return Column(logFeaturesUDF.apply(_to_seq(spark.sparkContext, [data], _to_java_column)))

 df.select(datecovert(col("persian_date"))).show()

When I run Pyspark program I receive this error:

   An error occurred while calling o809.apply. Trace:
   py4j.Py4JException: Method apply([class scala.collection.convert.Wrappers$JListWrapper]) does not exist
  at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
  at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
  at py4j.Gateway.invoke(Gateway.java:274)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:238)
  at java.lang.Thread.run(Thread.java:748)

Would you please guide me what is the correct way to create a Scala UDF Jar file to use in Pyspark program?

Any help is really appreciated.

1 Answer 1

0

Problem Solved.

I change the Scala program like this:

import org.apache.spark.sql.functions.udf
import com.bahmanm.persianutils.DateConverter._

object dateconverter extends Serializable {
def convert(ds: String): SimpleDate = {
   val year = ds.substring(0, 4)
   val month = ds.substring(4, 6)
   val day = ds.substring(6, 8)
   val outputDate = s"$year/$month/$day"
   val pDate4 = SimpleDate(outputDate) // initialising from a String
   persianToGregorian(pDate4)
 }
def call = udf { x: String => convert(x)}
}

Then in Pyspark I use this code:

from pyspark.sql.column import Column, _to_java_column, _to_seq
def datecovert(data):
    t1 = spark._jvm.dateconverter.call()
    return Column(t1.apply(_to_seq(spark, [data], _to_java_column)))

df = df.withColumn("persian_date_milady", datecovert(df['persian_date']))
df.select('persian_date','persian_date_milady').show()

But, I want persian_date_milady to be a date field not struct field to insert in Oracle not this one:

 root
 |-- persian_date_milady: struct (nullable = true)
 |    |-- year: integer (nullable = false)
 |    |-- month: integer (nullable = false)
 |    |-- day: integer (nullable = false)

If your function has no arguments, you should follow this way:

 import org.apache.spark.sql.functions.udf
 import org.bson.types.ObjectId
 object CreateObjectId extends Serializable {
 def call = udf { () => ObjectId.get().toString }
  }

Then after running sbt clean assembly and copy jar file in the Pyspark program. I add this code:

 from pyspark.sql.column import Column, _to_java_column, _to_seq
 def createObjectId():
     t1 = spark._jvm.CreateObjectId.call()
     return Column(t1.apply(_to_seq(spark, [], _to_java_column)))
     df = df.withColumn('guid',createObjectId())
     df.select('guid').show(truncate=False)

By the way, build.sbt is like this:

name := "CreateObjectId"

version := "0.1"

scalaVersion := "2.12.8"

val root = project.in(file("."))
  .settings(Seq(
   libraryDependencies ++= Seq(
    "org.apache.spark" %% "spark-core" % "3.1.2" % Provided,
    "org.apache.spark" %% "spark-sql" % "3.1.2" % Provided,
    "org.mongodb" % "bson" % "2.3"
    ),assembly / assemblyJarName := "createobjectid.jar")
  )
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.