1

I've been working on a tool on pyspark tool that filters based on a search then sorts those results. The data frame is a compilation of over 1,400 csv's. When I attempt to run the code, I get a very lengthy error message. It appears to break down to a java error for an unexpected EOF:

Py4JJavaError: An error occurred while calling o1331.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 202 in stage 56.0 failed 4 times, most recent failure: Lost task 202.3 in stage 56.0 (TID 7632, emr-master-f35-eels.sss.local, executor 31): com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
         Auto configuration enabled=true
         Autodetect column delimiter=false
         Autodetect quotes=false
         Column reordering enabled=true
         Delimiters for detection=null
         Empty value=
         Escape unquoted values=false
         Header extraction enabled=null
         Headers=null
         Ignore leading whitespaces=false
         Ignore leading whitespaces in quotes=false
         Ignore trailing whitespaces=false
         Ignore trailing whitespaces in quotes=false
         Input buffer size=128
         Input reading on separate thread=false
         Keep escape sequences=false
         Keep quotes=false
         Length of content displayed on error=-1
         Line separator detection enabled=false
         Maximum number of characters per column=-1
         Maximum number of columns=20480
         Normalize escaped line separators=true
         Null value=
         Number of records to read=all
         Processor=none
         Restricting data in exceptions=false
         RowProcessor error handler=null
         Selected fields=field selection: [2]
         Skip bits as whitespace=true
         Skip empty lines=true
         Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
         CsvFormat:
                 Comment character=\0
                 Field delimiter=,
                 Line separator (normalized)=\n
                 Line separator sequence=\n
                 Quote character="
                 Quote escape character="
                 Quote escape escape character=null
Internal state when error was thrown: line=737459, column=3, record=235, charIndex=457399297, headers=[attachment_md5_checksum, attachment_filename, attachment_text, attachment_urlsafe_base64_bytes, notice_id, title, solicitation_number, department_ind_agency, cgac, sub_tier, fpds_code, office, aac_code, posted_date, type, base_type, archive_type, archive_date, set_aside_code, set_aside, response_deadline, naice_code, classification_code, pop_street_address, pop_city, pop_state, pop_zip, pop_country, active, award_number, award_date, award_dollars, awardee, primary_contact_title, primary_contact_full_name, primary_contact_email, primary_contact_phone, primary_contact_fax, secondary_contact_title, secondary_contact_full_name, secondary_contact_email, secondary_contact_phone, secondary_contact_fax, organization_type, state, city, zip_code, country_code, additional_info_link, link, description]
         at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:369)
         at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:595)
         at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:330)
         at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
         at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
         at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
         at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
         at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
         at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
         at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.IllegalStateException: Error reading from input
         at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:80)
         at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:192)
         at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:269)
         at com.univocity.parsers.common.input.NoopCharAppender.appendUntil(NoopCharAppender.java:170)
         at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:186)
         at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:560)
         ... 18 more
Caused by: java.io.EOFException: Unexpected end of input stream
         at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
         at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
         at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
         at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
         at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
         at java.io.InputStreamReader.read(InputStreamReader.java:184)
         at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:78)
         ... 23 more

Driver stacktrace:
         at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2043)
         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2031)
         at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2030)
         at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
         at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2030)
         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
         at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:967)
         at scala.Option.foreach(Option.scala:257)
         at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:967)
         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2264)
         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2213)
         at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2202)
         at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
         at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:778)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
         at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
         at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1080)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
         at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
         at org.apache.spark.rdd.RDD.reduce(RDD.scala:1062)
         at org.apache.spark.rdd.RDD$$anonfun$takeOrdered$1.apply(RDD.scala:1484)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
         at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
         at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
         at org.apache.spark.rdd.RDD.takeOrdered(RDD.scala:1471)
         at org.apache.spark.sql.execution.TakeOrderedAndProjectExec.executeCollect(limit.scala:136)
         at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3267)
         at org.apache.spark.sql.Dataset$$anonfun$collectToPython$1.apply(Dataset.scala:3264)
         at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
         at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$executeQuery$1(SQLExecution.scala:83)
         at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1$$anonfun$apply$1.apply(SQLExecution.scala:94)
         at org.apache.spark.sql.execution.QueryExecutionMetrics$.withMetrics(QueryExecutionMetrics.scala:141)
         at org.apache.spark.sql.execution.SQLExecution$.org$apache$spark$sql$execution$SQLExecution$$withMetrics(SQLExecution.scala:178)
         at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:93)
         at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:200)
         at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:92)
         at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
         at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3264)
         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
         at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
         at py4j.Gateway.invoke(Gateway.java:282)
         at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
         at py4j.commands.CallCommand.execute(CallCommand.java:79)
         at py4j.GatewayConnection.run(GatewayConnection.java:238)
         at java.lang.Thread.run(Thread.java:748)
Caused by: com.univocity.parsers.common.TextParsingException: java.lang.IllegalStateException - Error reading from input
Parser Configuration: CsvParserSettings:
         Auto configuration enabled=true
         Autodetect column delimiter=false
         Autodetect quotes=false
         Column reordering enabled=true
         Delimiters for detection=null
         Empty value=
         Escape unquoted values=false
         Header extraction enabled=null
         Headers=null
         Ignore leading whitespaces=false
         Ignore leading whitespaces in quotes=false
         Ignore trailing whitespaces=false
         Ignore trailing whitespaces in quotes=false
         Input buffer size=128
         Input reading on separate thread=false
         Keep escape sequences=false
         Keep quotes=false
         Length of content displayed on error=-1
         Line separator detection enabled=false
         Maximum number of characters per column=-1
         Maximum number of columns=20480
         Normalize escaped line separators=true
         Null value=
         Number of records to read=all
         Processor=none
         Restricting data in exceptions=false
         RowProcessor error handler=null
         Selected fields=field selection: [2]
         Skip bits as whitespace=true
         Skip empty lines=true
         Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
         CsvFormat:
                  Comment character=\0
                 Field delimiter=,
                 Line separator (normalized)=\n
                 Line separator sequence=\n
                 Quote character="
                 Quote escape character="
                 Quote escape escape character=null
Internal state when error was thrown: line=737459, column=3, record=235, charIndex=457399297, headers=[attachment_md5_checksum, attachment_filename, attachment_text, attachment_urlsafe_base64_bytes, notice_id, title, solicitation_number, department_ind_agency, cgac, sub_tier, fpds_code, office, aac_code, posted_date, type, base_type, archive_type, archive_date, set_aside_code, set_aside, response_deadline, naice_code, classification_code, pop_street_address, pop_city, pop_state, pop_zip, pop_country, active, award_number, award_date, award_dollars, awardee, primary_contact_title, primary_contact_full_name, primary_contact_email, primary_contact_phone, primary_contact_fax, secondary_contact_title, secondary_contact_full_name, secondary_contact_email, secondary_contact_phone, secondary_contact_fax, organization_type, state, city, zip_code, country_code, additional_info_link, link, description]
         at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:369)
         at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:595)
         at org.apache.spark.sql.execution.datasources.csv.UnivocityParser$$anon$1.next(UnivocityParser.scala:330)
         at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
         at scala.collection.TraversableOnce$FlattenOps$$anon$1.hasNext(TraversableOnce.scala:464)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
         at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:130)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
         at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
         at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
         at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:585)
         at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
         at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:153)
         at scala.collection.Iterator$class.foreach(Iterator.scala:891)
         at org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:148)
         at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:224)
         at org.apache.spark.api.python.PythonRunner$$anon$2.writeIteratorToStream(PythonRunner.scala:561)
         at org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:346)
         at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1990)
         at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:195)
Caused by: java.lang.IllegalStateException: Error reading from input
         at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:80)
         at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:192)
         at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:269)
         at com.univocity.parsers.common.input.NoopCharAppender.appendUntil(NoopCharAppender.java:170)
         at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:186)
         at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:560)
         ... 18 more
Caused by: java.io.EOFException: Unexpected end of input stream
         at org.apache.hadoop.io.compress.DecompressorStream.decompress(DecompressorStream.java:165)
         at org.apache.hadoop.io.compress.DecompressorStream.read(DecompressorStream.java:105)
         at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
         at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
         at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
         at java.io.InputStreamReader.read(InputStreamReader.java:184)
         at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:78)
         ... 23 more

(<class 'py4j.protocol.Py4JJavaError'>, Py4JJavaError('An error occurred while calling o1331.collectToPython.\n', JavaObject id=o1338), <traceback object at 0x7ff3f2ff9230>)

I went through the process of running this code on each csv individually and narrowed it down to 6 of them that are causing this error. Obviously I can remove those 6 files from the list and then the code can run just fine, but if there is a way to use code to diagnose and potentially repair these files I'd like to try that route first. Any suggestions/ideas for how I can go about this?

EDIT

Per the suggestion below, I attempted to open the file with the following code and then print the contents:

with open('file.csv.bz2', 'r', encoding='ISO-8859-1') as f:
    lines = f.readlines()
    print(lines)

This can run without issue. However, I then tried to open it in pandas and got an EOFError.

3
  • Things you can check on the six offending CSV files: - read them in plain python3 as text file and print (can this be done without error or truncation) - try to "repair" the lines of these files with line = line.encode("ascii", "ignore") If this helps there are more subtle methods that retain diacritics, etc. Commented Oct 2, 2021 at 12:00
  • @HadoopMarc Thank you for the suggestion. I've updated my OP with the results of that. Commented Oct 8, 2021 at 14:13
  • Please also check stackoverflow.com/questions/54958530/… and stackoverflow.com/questions/54523324/… Commented Oct 10, 2021 at 11:45

1 Answer 1

0

You can load the files, while preserving all records:

  1. You can load all records without these 6 files into a single data frame.

  2. Load the 6 files, using the schema from #1, while using PERMISSIVE mode (see example) and preserve malformed columns.

  3. Optionally, you can rename the currepted column name. See Not able to retain the corrupted rows in pyspark using PERMISSIVE mode

  4. Now you can see what is malformed, and decide if to drop those, write them to a deal-letter queue, log them of whatever you decide.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.