1

How can I access geomesas UDF in spark scala dataframe (not textual) api? I.e. how to convert

How can I make sql UDFs available in the textual spark-sql API available in the scala data frame DSL? I.e. how to enable instead of this expression

spark.sql("select st_asText(st_bufferPoint(geom,10)) from chicago where case_number = 1")

something similar to

df.select(st_asText(st_bufferPoint('geom, 10))).filter('case_number === 1)

How to register geomesas UDF in a way that these are not only available to the sql text mode. SQLTypes.init(spark.sqlContext) from https://github.com/locationtech/geomesa/blob/f13d251f4d8ad68f4339b871a3283e43c39ad428/geomesa-spark/geomesa-spark-sql/src/main/scala/org/apache/spark/sql/SQLTypes.scala#L59-L66 only seems to register textual expressions.

I am already importing

import org.apache.spark.sql.functions._

so these functions

https://github.com/locationtech/geomesa/blob/828822dabccb6062118e36c58df8c3a7fa79b75b/geomesa-spark/geomesa-spark-sql/src/main/scala/org/apache/spark/sql/SQLSpatialFunctions.scala#L31-L41

should be available.

2 Answers 2

1

Take a look at the callUDF function from org.apache.spark.sql.functions

val spark = SparkSession.builder()
  .appName("callUDF")
  .master("local[*]")
  .getOrCreate()
import spark.implicits._

val df = spark.createDataset(List("abcde", "bcdef", "cdefg")).toDF("str")
df.createTempView("view")

spark.sql("select length(substring(str, 2, 3)) from view").show()
df.select(callUDF("length", callUDF("substring", $"str", lit(2), lit(3)))).show()

spark.stop()

Tested with Spark 2.1

Sign up to request clarification or add additional context in comments.

Comments

1

You can use the udf function in the org.apache.spark.sql.functions you're importing e.g.

val  myUdf = udf((x: String) => doSomethingWithX(x))

you can then use myUdf in the DSL as in df.select(myUdf($"field"))

2 Comments

But SQLTypes.init(spark.sqlContext) is already registering the functions. So could this be a problem to use your solution?
yes it does but you lose the refrece ie. if the line is val ST_DistanceSpheroid: (Geometry, Geometry) => jl.Double = nullableUDF((s, e) => fastDistance(s.getCoordinate, e.getCoordinate)). you need to use ST_DistanceSpheroid in the DSL

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.