I am trying to create a function in which I pass as main parameters:
- a DataFrame
- another function (an aggregate: count, countDistinct, max, etc.)
My goal is to return a DataFrame with a new column based on the function provided.
I am having trouble with typing, though. I have been searching around here and most of what I found points to UDFs, and the need to create it in order to apply it in "withColumn".
When I run something like this:
val DF1 = Seq(
("asd", "1", "search", "otpx"),
("asd", "1", "xpto", "otpx"),
("asd", "2", "xpto", "otpx"),
("asd", "3", "xpto", "otpx"),
("asd", "3", "search", "otpx"),
("asd", "4", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "1", "search", "otpx"),
("zxc", "2", "xpto", "otpx"),
("zxc", "3", "xpto", "otpx"),
("zxc", "3", "xpto", "otpx"),
("zxc", "3", "xpto", "otpx"),
("qwe", "1", "xpto", "otpx"),
("qwe", "2", "xpto", "otpx"),
("qwe", "3", "xpto", "otpx"),
("qwe", "4", "xpto", "otpx"),
("qwe", "5", "xpto", "otpx")
).toDF("cid", "cts", "type", "subtype")
DF1.show(100)
val canList = List("cid", "cts")
def test[T](df: DataFrame, fn: Column => T, newColName: String, colToFn: String, partitionByColumns: List[String]): DataFrame = {
val window = Window.partitionBy(partitionByColumns.head, partitionByColumns.tail:_*)
val fun: (Column => T) = (arg: Column) => fn(arg) // or right away udfFun = udf(fn)
val udfFun = udf(fun)
val ret = df.withColumn(newColName, udfFun(df(colToFn)).over(window))
ret
}
val DF2 = test(DF1, countDistinct, "count_type", "type", canList)
DF2.orderBy(canList.head, canList.tail:_*).show(100)
I get errors like:
No TypeTag available for T
val udfFun = udf(fun)
What am I missing here?
Thanks in advance, cheers!