5

I am trying to apply a pandas_udf, with two parameters. But I've got this error. First I try with one parameter and it's ok:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import SparkSession
from pyspark.sql.types import *

spark = SparkSession \
        .builder \
        .config('spark.cores.max', 100) \
        .getOrCreate()

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

This is how the data looked like

+---+----+
| id|   v|
+---+----+
|  1| 1.0|
|  1| 2.0|
|  2| 3.0|
|  2| 5.0|
|  2|10.0|
+---+----+

My pandas_udf function is

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def count_udf(v):
    cond = v<=3
    res = v[cond].count()
    return res
df.groupby("id").agg(count_udf(df['v'])).show()

and the result is

+---+------------+
| id|count_udf(v)|
+---+------------+
|  1|         2.0|
|  2|         1.0|
+---+------------+

But when I try to put two parameters for the pandas_udf function as follow, I have an error.

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def count_udf2(v, value):
    cond = v<=value
    res = v[cond].count()
    return res

df.groupby("id").agg(count_udf(df['v'],4)).show()

Error:

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 3267, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-18-468499490a1f>", line 1, in <module>
    res = df.groupby("id").agg(count_udf(df['v'],4))
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/udf.py", line 189, in wrapper
    return self(*args)
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/udf.py", line 169, in __call__
    return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/column.py", line 65, in _to_seq
    cols = [converter(c) for c in cols]
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/column.py", line 65, in <listcomp>
    cols = [converter(c) for c in cols]
  File "/home/idswb/.local/lib/python3.6/site-packages/pyspark/sql/column.py", line 53, in _to_java_column
    "function.".format(col, type(col)))
TypeError: Invalid argument, not a string or column: 4 of type <class 'int'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.
2
  • 1
    Solve, using lit function, df.groupby("id").agg(count_udf(df['v'],lit(4))).show() Commented Dec 19, 2018 at 8:31
  • Doesn't explain how to define OP's function Commented Aug 25, 2021 at 8:01

1 Answer 1

0

You can define a pandas_udf function in the same scope with a calling function. So all local variables will be visible in it.

Ex.:

def wrapper_count_udf():
  value = 4
  
  @pandas_udf("double", PandasUDFType.GROUPED_AGG)
  def count_udf(v):
    cond = v<=value
    res = v[cond].count()
    return res

  df.groupby("id").agg(count_udf(df['v'])).show()
Sign up to request clarification or add additional context in comments.

1 Comment

I dont understand why this was downvoted. I believe this is the solution to the question. Have had problems with pandas_udf on similar things when not doing it this way. You can also import any values on the wrapper_count_udf so they can be seen withn the pandas_udf but there is very little you can do otherwise when you need to use some user defined parameters on your pandas_udf

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.