1

I have a example, want to create Dataframe in a UDF. Something like the one below

import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.feature.VectorAssembler

data to Dataframe

    val df = Seq((1,1,34,23,34,56),(2,1,56,34,56,23),(3,0,34,23,23,78),(4,0,23,34,78,23),(5,1,56,23,23,12),
(6,1,67,34,56,34),(7,0,23,23,23,56),(8,0,12,34,45,89),(9,1,12,34,12,34),(10,0,12,34,23,34)).toDF("id","label","tag1","tag2","tag3","tag4")
    val assemblerDF = new VectorAssembler().setInputCols(Array("tag1", "tag2", "tag3","tag4")).setOutputCol("features")
    val data = assemblerDF.transform(df)
    val Array(train,test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
    val testData=test.toDF    

    val loadmodel=LogisticRegressionModel.load("/user/xu/savemodel")
    sc.broadcast(loadmodel)
    val assemblerFe = new VectorAssembler().setInputCols(Array("a", "b", "c","d")).setOutputCol("features")
    sc.broadcast(assemblerFe)

UDF

    def predict(predictSet:Vector):Double={
        val set=Seq((1,2,3,4)).toDF("a","b","c","d")
        val predata = assemblerFe.transform(set)
        val result=loadmodel.transform(predata)
        result.rdd.take(1)(0)(3).toString.toDouble}

    spark.udf.register("predict", predict _)
    testData.registerTempTable("datatable")
    spark.sql("SELECT predict(features) FROM datatable").take(1)

i get an error like

ERROR Executor: Exception in task 3.0 in stage 4.0 (TID 7) [Executor task launch worker for task 7]
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => double)

and

WARN TaskSetManager: Lost task 3.0 in stage 4.0 (TID 7, localhost, executor driver): org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (vector) => double)

Is dataframe not supported? I am using Spark 2.3.0 and Scala 2.11. thanks

2
  • 1
    Why do you want to apply your model on test data in UDF, you can just apply the model to the test data in main program. Commented Nov 30, 2018 at 4:48
  • base on the previous framework,maybe cann't create Dataframe in UDF Commented Nov 30, 2018 at 6:08

1 Answer 1

1

As mentioned in comments, you don't need UDF here to apply the Trained model to test data. You can apply the model to test dataframe in the main program itself as below:

val df = Seq((1,1,34,23,34,56),(2,1,56,34,56,23),(3,0,34,23,23,78),(4,0,23,34,78,23),(5,1,56,23,23,12),
(6,1,67,34,56,34),(7,0,23,23,23,56),(8,0,12,34,45,89),(9,1,12,34,12,34),(10,0,12,34,23,34)).toDF("id","label","tag1","tag2","tag3","tag4")
val assemblerDF = new VectorAssembler().setInputCols(Array("tag1", "tag2", "tag3","tag4")).setOutputCol("features")
val data = assemblerDF.transform(df)
val Array(train,test) = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val testData=test.toDF    

val loadmodel=LogisticRegressionModel.load("/user/xu/savemodel")
sc.broadcast(loadmodel)
val assemblerFe = new VectorAssembler().setInputCols(Array("a", "b", "c","d")).setOutputCol("features")
sc.broadcast(assemblerFe)


val set=Seq((1,2,3,4)).toDF("a","b","c","d")
val predata = assemblerFe.transform(set)
val result=loadmodel.transform(predata) // Applying model on predata dataframe. You can apply model on any DataFrame.

result is a DataFrame now, you can Reigister the DataFrame as a table and query predictionLabel and features using SQL OR you can directly select the predictLabel and other fields from DataFrame.

Please note, UDF is a feature of Spark SQL to define new Column-based functions that extend the vocabulary of Spark SQL’s DSL for transforming Datasets. It doesnt return the DataFrame itself as a return type. and generally its not advised to use UDF's unless necessary, refer to: https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql-udfs-blackbox.html

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.