0

I have this spark dataframe:

+--------------------+--------------------+------+
|                 qid|       question_text|target|
+--------------------+--------------------+------+
|403d7d49a9713e6f7caa|Do coconut trees ...|     0|
|edc9a2709785501cce09|What are 5 must-r...|     0|
|c912510d490de9e8c55c|How can I make my...|     0|
|bcd2395ccebea4d57604|Is reading biogra...|     0|
|d26dcd0879465c08ace5|Is the reason why...|     1|
|1c6398530de0b31e985a|Does John McCain ...|     1|
|69c68408690e66889e6a|Isn't the time th...|     1|
|d1a7d8a1da31041048a6|Why do people eat...|     0|
|54e96f880709e3cf9dd7|How do I get the ...|     0|
|f89c04a1c61487623ba9|What does the kno...|     0|
+--------------------+--------------------+------+

I am trying to apply this UDF:

def check_all_spelling_correct(sentence):
  
  doc=nlp(sentence)
  spell_check_performed=0
  if doc._.performed_spellCheck==True:
    spell_check_performed=1
  return spell_check_performed

Using :

check_all_spelling_correct_UDF = udf(lambda x:check_all_spelling_correct(x),IntegerType())
df_sub_2=df_sub.withColumn("Is spelling correct", check_all_spelling_correct_UDF(col("question_text")))
df_sub_2.collect()

But I am getting this error: Py4JJavaError: An error occurred while calling o235.collectToPython. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 100.0 failed 1 times, most recent failure: Lost task 0.0 in stage 100.0 (TID 1441) (DESKTOP-2TQTOQ2 executor driver): org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162) at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175) ... 24 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2235) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2254) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2279) at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:414) at org.apache.spark.rdd.RDD.collect(RDD.scala:1029) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:394) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:338) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:366) at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:338) at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3538) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704) at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3535) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: org.apache.spark.SparkException: Python worker failed to connect back. at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:188) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:108) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:121) at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:162) at org.apache.spark.sql.execution.python.BatchEvalPythonExec.evaluate(BatchEvalPythonExec.scala:81) at org.apache.spark.sql.execution.python.EvalPythonExec.$anonfun$doExecute$2(EvalPythonExec.scala:130) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: java.net.SocketTimeoutException: Accept timed out at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method) at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131) at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:535) at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:189) at java.net.ServerSocket.implAccept(ServerSocket.java:545) at java.net.ServerSocket.accept(ServerSocket.java:513) at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:175) ... 24 more

1 Answer 1

0

df_sub_2.collect() will bring all the data to the driver node and when working with large datasets, it is very costly operation that results in stage failure.

Instead of df.collect() , use df.take(10) to see the first ten rows of the output dataframe.

OR if you only want to use df.collect() then you can increase the spark.driver.maxResultSize (default 1G) property like this and then use df.collect() :

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('StackOverflow') \
    .config('spark.dirver.maxResultSize', '4096') \
    .getOrCreate()
Sign up to request clarification or add additional context in comments.

1 Comment

Tried both. Same error. I am using a Dataframe with 10 rows.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.