2

I want a concat function for Spark Sql. I have written a udf as

sqlContext.udf.register("CONCAT",(args:String*)=>{
 String out=""
 for(arg<-args)
  {
    out+=arg
  }
 out
})

sqlContext.sql("select col1,col2,CONCAT(col1,col2) from testtable")

but this udf is not working and I am getting an exception. If I try with fixed number of parameters then it works. I am using spark 1.3.1 and scala 2.10.5.

Has anyone faced this issue or knows a solution for this?

1
  • Can you provide the exception in the question? Commented Aug 26, 2015 at 9:15

2 Answers 2

4

You can do this using the struct function like following:

val myUDF = udf {
  (r: Row) => r.toSeq.map(...) // the "r" row contains your arguments
}
val df = ....
df.select(col("col1"), myUDF(struct(col("col2"), col("col3"), col("col4"), ...)))
Sign up to request clarification or add additional context in comments.

Comments

2

If all you want is to concatenate columns using raw SQL there is no need for a custom UDF at all. CONCAT function is already there:

val df = sc.parallelize(List(("a", "b", "c"))).toDF("x", "y", "z")
df.registerTempTable("df")
sqlContext.sql("SELECT CONCAT(x, y, z) AS xyz FROM df").show

// +---+
// |xyz|
// +---+
// |abc|
// +---+

Since 1.5.0 you can use concat / concat_ws functions directly:

import org.apache.spark.sql.functions.{concat, concat_ws}

df.select(concat_ws("-", $"x", $"y", $"z").alias("x-y-z")).show
// +-----+
// |x-y-z|
// +-----+
// |a-b-c|
// +-----+

df.select(concat($"x", $"y", $"z").alias("xyz")).show

// +---+
// |xyz|
// +---+
// |abc|
// +---+

See also Spark UDF with varargs

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.