0

I am trying to create a function in which I pass as main parameters:

  • a DataFrame
  • another function (an aggregate: count, countDistinct, max, etc.)

My goal is to return a DataFrame with a new column based on the function provided.

I am having trouble with typing, though. I have been searching around here and most of what I found points to UDFs, and the need to create it in order to apply it in "withColumn".

When I run something like this:

    val DF1 = Seq(
  ("asd", "1", "search", "otpx"),
  ("asd", "1", "xpto", "otpx"),
  ("asd", "2", "xpto", "otpx"),
  ("asd", "3", "xpto", "otpx"),
  ("asd", "3", "search", "otpx"),
  ("asd", "4", "search", "otpx"),

  ("zxc", "1", "search", "otpx"),
  ("zxc", "1", "search", "otpx"),
  ("zxc", "1", "search", "otpx"),
  ("zxc", "1", "search", "otpx"),
  ("zxc", "2", "xpto", "otpx"),
  ("zxc", "3", "xpto", "otpx"),
  ("zxc", "3", "xpto", "otpx"),
  ("zxc", "3", "xpto", "otpx"),

  ("qwe", "1", "xpto", "otpx"),
  ("qwe", "2", "xpto", "otpx"),
  ("qwe", "3", "xpto", "otpx"),
  ("qwe", "4", "xpto", "otpx"),
  ("qwe", "5", "xpto", "otpx")

).toDF("cid", "cts", "type", "subtype")

DF1.show(100)

val canList = List("cid", "cts")

def test[T](df: DataFrame, fn: Column => T, newColName: String, colToFn: String, partitionByColumns: List[String]): DataFrame = {

  val window = Window.partitionBy(partitionByColumns.head, partitionByColumns.tail:_*)

  val fun: (Column => T) = (arg: Column) => fn(arg) // or right away udfFun = udf(fn)

  val udfFun = udf(fun)

  val ret = df.withColumn(newColName, udfFun(df(colToFn)).over(window))

  ret
}

val DF2 = test(DF1, countDistinct, "count_type", "type", canList)

DF2.orderBy(canList.head, canList.tail:_*).show(100)

I get errors like:

No TypeTag available for T

val udfFun = udf(fun)

What am I missing here?

Thanks in advance, cheers!

1 Answer 1

1

First note that countDistinct is not supported over a Window. If you want to define a function that takes other aggregate functions over a window (say count), you can define fn as a function that takes a column and returns a column. UDFs are not appropriate here because you are calling Spark SQL functions, not custom Scala functions.

def test(df: DataFrame,
         fn: Column => Column,
         newColName: String,
         colToFn: String,
         partitionByColumns: List[String]
): DataFrame = {
  val window = Window.partitionBy(partitionByColumns.head, partitionByColumns.tail:_*)
  val ret = df.withColumn(newColName, fn(col(colToFn)).over(window))
  ret
}

// calling the function
test(DF1, count, "count_type", "type", canList)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.