0

Using mongo-connector version 10.0.1 the below are my config

.config("spark.mongodb.write.connection.uri", "mongodb://127.0.0.1:27017/")
.config("spark.mongodb.write.database", "test")
.config("spark.mongodb.write.collection", "spark")
.config("spark.mongodb.write.operationType", "insert")

this is what im trying to do and im not sure why this is happening

spark_df.write.format("mongodb").mode("append").save()

Below is the error I'm getting

24/01/09 11:17:26 ERROR Utils: Aborting task
java.lang.NullPointerException
        at org.apache.spark.api.python.SerDeUtil$.$anonfun$toJavaArray$1(SerDeUtil.scala:72)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:419)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
24/01/09 11:17:26 ERROR DataWritingSparkTask: Aborting commit for partition 0 (task 13, attempt 0, stage 2.0)
24/01/09 11:17:26 WARN Utils: Suppressing exception in catch: Write aborted for: PartitionId: 0, TaskId: 13. Manual data clean up may be required.
com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: PartitionId: 0, TaskId: 13. Manual data clean up may be required.
        at com.mongodb.spark.sql.connector.write.MongoDataWriter.abort(MongoDataWriter.java:121)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$6(WriteToDataSourceV2Exec.scala:453)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1536)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
24/01/09 11:17:26 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 13)
java.lang.NullPointerException
        at org.apache.spark.api.python.SerDeUtil$.$anonfun$toJavaArray$1(SerDeUtil.scala:72)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:419)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
        Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: PartitionId: 0, TaskId: 13. Manual data clean up may be required.
                at com.mongodb.spark.sql.connector.write.MongoDataWriter.abort(MongoDataWriter.java:121)
                at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$6(WriteToDataSourceV2Exec.scala:453)
                at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1536)
                ... 10 more
24/01/09 11:17:26 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 13) (host.docker.internal executor driver): java.lang.NullPointerException
        at org.apache.spark.api.python.SerDeUtil$.$anonfun$toJavaArray$1(SerDeUtil.scala:72)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:419)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
        Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: PartitionId: 0, TaskId: 13. Manual data clean up may be required.
                at com.mongodb.spark.sql.connector.write.MongoDataWriter.abort(MongoDataWriter.java:121)
                at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$6(WriteToDataSourceV2Exec.scala:453)
                at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1536)
                ... 10 more

24/01/09 11:17:26 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job
24/01/09 11:17:26 ERROR AppendDataExec: Data source write support com.mongodb.spark.sql.connector.write.MongoBatchWrite@65dd8323 is aborting.
24/01/09 11:17:26 ERROR AppendDataExec: Data source write support com.mongodb.spark.sql.connector.write.MongoBatchWrite@65dd8323 failed to abort.
Traceback (most recent call last):
  File "C:\Users\delve\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 197, in _run_module_as_main
    return _run_code(code, main_globals, None,
  File "C:\Users\delve\AppData\Local\Programs\Python\Python39\lib\runpy.py", line 87, in _run_code
    exec(code, run_globals)
  File "C:\Users\delve\dev\spark_ao\pool\app\service\c_list\process\parser.py", line 1508, in <module>
    response = spark_df.write.format("mongodb").mode("append").save()
  File "C:\Users\delve\dev\spark_ao\.venv\lib\site-packages\pyspark\sql\readwriter.py", line 738, in save
    self._jwrite.save()
  File "C:\Users\delve\dev\spark_ao\.venv\lib\site-packages\py4j\java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\delve\dev\spark_ao\.venv\lib\site-packages\pyspark\sql\utils.py", line 111, in deco
    return f(*a, **kw)
  File "C:\Users\delve\dev\spark_ao\.venv\lib\site-packages\py4j\protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o85.save.
: org.apache.spark.SparkException: Writing job failed.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobFailedError(QueryExecutionErrors.scala:606)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:381)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:330)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.writeWithV2(WriteToDataSourceV2Exec.scala:236)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run(WriteToDataSourceV2Exec.scala:309)
        at org.apache.spark.sql.execution.datasources.v2.V2ExistingTableWriteExec.run$(WriteToDataSourceV2Exec.scala:308)
        at org.apache.spark.sql.execution.datasources.v2.AppendDataExec.run(WriteToDataSourceV2Exec.scala:236)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
        at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:97)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:97)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:93)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)  
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:93)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:80)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:115)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:311)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
        at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
        at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 13) (host.docker.internal executor driver): java.lang.NullPointerException
        at org.apache.spark.api.python.SerDeUtil$.$anonfun$toJavaArray$1(SerDeUtil.scala:72)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:419)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
        Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: PartitionId: 0, TaskId: 13. Manual data clean up may be required.
                at com.mongodb.spark.sql.connector.write.MongoDataWriter.abort(MongoDataWriter.java:121)
                at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$6(WriteToDataSourceV2Exec.scala:453)
                at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1536)
                ... 10 more

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
        at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
        at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
        at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
        at scala.Option.foreach(Option.scala:407)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:938)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2214)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:354)
        ... 44 more
        Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: 678e70a2-e5a0-4a4d-ac1a-9f46173e8cd2. 0/1 tasks completed.
                at com.mongodb.spark.sql.connector.write.MongoBatchWrite.abort(MongoBatchWrite.java:101)
                at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:376)
                ... 44 more
Caused by: java.lang.NullPointerException
        at org.apache.spark.api.python.SerDeUtil$.$anonfun$toJavaArray$1(SerDeUtil.scala:72)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:419)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1525)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:457)
        at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:358)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        ... 1 more
        Suppressed: com.mongodb.spark.sql.connector.exceptions.DataException: Write aborted for: PartitionId: 0, TaskId: 13. Manual data clean up may be required.
                at com.mongodb.spark.sql.connector.write.MongoDataWriter.abort(MongoDataWriter.java:121)
                at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$6(WriteToDataSourceV2Exec.scala:453)
                at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1536)
                ... 10 more

SUCCESS: The process with PID 18700 (child process of PID 26460) has been terminated.
SUCCESS: The process with PID 26460 (child process of PID 1880) has been terminated.
SUCCESS: The process with PID 1880 (child process of PID 22132) has been terminated.
2024-01-09 11:17:26,865 INFO:Closing down clientserver connection

I have tried with different connector version and also making sure the spark version is within the version mongo-connector docs have specified. I have looked into other posts and have tried all the solutions but none have worked.

1 Answer 1

0

Solved the issue i was doing a map on rdd

result = df.rdd.map(lambda x: parse(x))

and then creating a spark dataframe out of the result above with a custom schema the schema had a datatype issue for one of the fields, basically instead of using TimestampType for a date field i was using a StringType.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.