2

I'm trying to use Pydequu on Jupyter Notebook when i try to use ConstraintSuggestionRunner and show this error:

Py4JJavaError: An error occurred while calling o70.run.
: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'

I'm using this setup for the test:

  • SDKMAN
  • sdk install java 8.0.292.hs-adpt
  • SPARK 3.0.0

I got this configs from awslabs/python-dequu on README.md file.

import os
from pyspark.sql import SparkSession, Row
import pydeequ

os.environ["SPARK_VERSION"] = "3.0.0"

The error it's from below code:

spark = (SparkSession
    .builder
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .config("org.apache.spark.sql.catalyst", "spark-catalyst_2.12-3.1.2-amzn-0.jar")
    .getOrCreate())

df = spark.sparkContext.parallelize([
            Row(a="foo", b=1, c=5),
            Row(a="bar", b=2, c=6),
            Row(a="baz", b=3, c=None)]).toDF()

Complete error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Input In [3], in <cell line: 3>()
      1 from pydeequ.suggestions import *
      3 suggestionResult = ConstraintSuggestionRunner(spark) \
      4              .onData(df) \
      5              .addConstraintRule(DEFAULT()) \
----> 6              .run()
      8 # Constraint Suggestions in JSON format
      9 print(suggestionResult)

File /opt/conda/lib/python3.10/site-packages/pydeequ/suggestions.py:81, in ConstraintSuggestionRunBuilder.run(self)
     74 def run(self):
     75     """
     76     A method that runs the desired ConstraintSuggestionRunBuilder functions on the data to obtain a constraint
     77             suggestion result. The result is then translated to python.
     78 
     79     :return: A constraint suggestion result
     80     """
---> 81     result = self._ConstraintSuggestionRunBuilder.run()
     83     jvmSuggestionResult = self._jvm.com.amazon.deequ.suggestions.ConstraintSuggestionResult
     84     result_json = json.loads(jvmSuggestionResult.getConstraintSuggestionsAsJson(result))

File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /usr/local/spark/python/pyspark/sql/utils.py:190, in capture_sql_exception.<locals>.deco(*a, **kw)
    188 def deco(*a: Any, **kw: Any) -> Any:
    189     try:
--> 190         return f(*a, **kw)
    191     except Py4JJavaError as e:
    192         converted = convert_exception(e.java_exception)

File /usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o70.run.
: java.lang.NoSuchMethodError: 'org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction.toAggregateExpression(boolean)'
    at org.apache.spark.sql.DeequFunctions$.withAggregateFunction(DeequFunctions.scala:31)
    at org.apache.spark.sql.DeequFunctions$.stateful_approx_count_distinct(DeequFunctions.scala:60)
    at com.amazon.deequ.analyzers.ApproxCountDistinct.aggregationFunctions(ApproxCountDistinct.scala:52)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.$anonfun$runScanningAnalyzers$3(AnalysisRunner.scala:319)
    at scala.collection.immutable.List.flatMap(List.scala:366)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.liftedTree1$1(AnalysisRunner.scala:319)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.runScanningAnalyzers(AnalysisRunner.scala:318)
    at com.amazon.deequ.analyzers.runners.AnalysisRunner$.doAnalysisRun(AnalysisRunner.scala:167)
    at com.amazon.deequ.analyzers.runners.AnalysisRunBuilder.run(AnalysisRunBuilder.scala:110)
    at com.amazon.deequ.profiles.ColumnProfiler$.profile(ColumnProfiler.scala:141)
    at com.amazon.deequ.profiles.ColumnProfilerRunner.run(ColumnProfilerRunner.scala:72)
    at com.amazon.deequ.profiles.ColumnProfilerRunBuilder.run(ColumnProfilerRunBuilder.scala:185)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunner.profileAndSuggest(ConstraintSuggestionRunner.scala:203)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunner.run(ConstraintSuggestionRunner.scala:102)
    at com.amazon.deequ.suggestions.ConstraintSuggestionRunBuilder.run(ConstraintSuggestionRunBuilder.scala:226)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:833)

1 Answer 1

1

I had the same issue by using almost the same environment as you (meaning Jupyter notebook and Spark 3.1.1). I solved the problem by following the steps:

  1. Download the deequ-2.0.0-spark-3.1.jar from maven repository https://repo1.maven.org/maven2/com/amazon/deequ/deequ/2.0.0-spark-3.1/deequ-2.0.0-spark-3.1.jar

  2. Upload deequ-2.0.0-spark-3.1.jar into the a Jupyter folder /home/jovyan/work/java-libs/

  3. In the notebook add the following line:

    os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars file:///home/jovyan/work/java-libs/deequ-2.0.0-spark-3.1.jar pyspark-shell'

  4. Use the initialization code

    from pyspark.sql import SparkSession, Row
    

    import pydeequ

    spark=(SparkSession .builder .getOrCreate())

By following the above steps I could get rid off the error that you've mentioned. However, I bumped into a new one:

  py4j.Py4JException: Constructor com.amazon.deequ.suggestions.rules.CategoricalRangeRule([]) does not exist

that I've solved with a code like this:

suggestionResult = (
    ConstraintSuggestionRunner(spark)
    .onData(df)
    .addConstraintRule(CompleteIfCompleteRule())
    .addConstraintRule(NonNegativeNumbersRule())
    .addConstraintRule(RetainCompletenessRule())
    .addConstraintRule(RetainTypeRule())
    .addConstraintRule(UniqueIfApproximatelyUniqueRule())
    .run()
)

print(json.dumps(suggestionResult))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.