0

I'm doing data preprocessing for this csv file of 1 million rows and hoping to shrink it down to 600000 rows. However I'm having trouble always when doing an apply function on a column in the dataframe because it always raises an EOF error.

import os
import numpy as np
import ast
import re

import pyspark.sql.functions as f

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, ArrayType, StructField, StructType, FloatType

DATA_DIR = "./data/chronic-disease-data"

path = os.path.join(DATA_DIR, "U.S._Chronic_Disease_Indicators__CDI___2023_Release.csv")

spark = SparkSession.builder.appName('test')\
    .config("spark.executor.memory", "6g")\
    .getOrCreate()

test_spark_df_00_10 = spark.read.format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(path)

cols_to_drop = ["Response",
    "ResponseID",
    "DataValueFootnoteSymbol",
    "DatavalueFootnote",

    "StratificationCategory2",
    "Stratification2",
    "StratificationCategory3",
    "Stratification3",

    "StratificationCategoryID1",
    "StratificationID1",
    "StratificationCategoryID2",
    "StratificationID2",
    "StratificationCategoryID3",
    "StratificationID3"]
test_spark_df_00_10 = test_spark_df_00_10.drop(*cols_to_drop)
test_spark_df_00_10.show()

it shows rows fine even after dropping which is expected

+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
|YearStart|YearEnd|LocationAbbr|        LocationDesc|DataSource| Topic|            Question|DataValueUnit|DataValueType|DataValue|DataValueAlt|LowConfidenceLimit|HighConfidenceLimit|StratificationCategory1|    Stratification1|         GeoLocation|LocationID|TopicID|QuestionID|DataValueTypeID|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
|     2014|   2014|          AR|            Arkansas| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      916|       916.0|              NULL|               NULL|                 Gender|               Male|POINT (-92.274490...|         5|    AST|    AST3_1|           NMBR|
|     2018|   2018|          CO|            Colorado| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     2227|      2227.0|              NULL|               NULL|                Overall|            Overall|POINT (-106.13361...|         8|    AST|    AST3_1|           NMBR|
|     2018|   2018|          DC|District of Columbia| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      708|       708.0|              NULL|               NULL|                Overall|            Overall|POINT (-77.036871...|        11|    AST|    AST3_1|           NMBR|
|     2017|   2017|          GA|             Georgia| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     3520|      3520.0|              NULL|               NULL|                 Gender|             Female|POINT (-83.627580...|        13|    AST|    AST3_1|           NMBR|
|     2010|   2010|          MI|            Michigan| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      123|       123.0|              NULL|               NULL|         Race/Ethnicity|           Hispanic|POINT (-84.714390...|        26|    AST|    AST3_1|           NMBR|
|     2015|   2015|          MT|             Montana| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     NULL|        NULL|              NULL|               NULL|         Race/Ethnicity|           Hispanic|POINT (-109.42442...|        30|    AST|    AST3_1|           NMBR|
|     2013|   2013|          OR|              Oregon| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      760|       760.0|              NULL|               NULL|                 Gender|               Male|POINT (-120.15503...|        41|    AST|    AST3_1|           NMBR|
|     2013|   2013|          PR|         Puerto Rico| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     NULL|        NULL|              NULL|               NULL|                Overall|            Overall|POINT (-66.590149...|        72|    AST|    AST3_1|           NMBR|
|     2017|   2017|          PR|         Puerto Rico| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     NULL|        NULL|              NULL|               NULL|                Overall|            Overall|POINT (-66.590149...|        72|    AST|    AST3_1|           NMBR|
|     2010|   2010|          WI|           Wisconsin| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|     1967|      1967.0|              NULL|               NULL|                 Gender|               Male|POINT (-89.816370...|        55|    AST|    AST3_1|           NMBR|
|     2016|   2016|          WI|           Wisconsin| SEDD; SID|Asthma|Hospitalizations ...|         NULL|       Number|      110|       110.0|              NULL|               NULL|         Race/Ethnicity|           Hispanic|POINT (-89.816370...|        55|    AST|    AST3_1|           NMBR|
|     2014|   2014|          AL|             Alabama|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       22|        22.0|              NULL|               NULL|                 Gender|               Male|POINT (-86.631860...|         1|    AST|    AST4_1|           NMBR|
|     2015|   2015|          ID|               Idaho|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       21|        21.0|              NULL|               NULL|                Overall|            Overall|POINT (-114.36373...|        16|    AST|    AST4_1|           NMBR|
|     2016|   2016|          ID|               Idaho|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       21|        21.0|              NULL|               NULL|                Overall|            Overall|POINT (-114.36373...|        16|    AST|    AST4_1|           NMBR|
|     2020|   2020|          IL|            Illinois|      NVSS|Asthma|Asthma mortality ...|       Number|       Number|       89|        89.0|              NULL|               NULL|                 Gender|               Male|POINT (-88.997710...|        17|    AST|    AST4_1|           NMBR|
|     2012|   2012|          KS|              Kansas|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       24|        24.0|              NULL|               NULL|         Race/Ethnicity|White, non-Hispanic|POINT (-98.200781...|        20|    AST|    AST4_1|           NMBR|
|     2015|   2015|          KS|              Kansas|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       29|        29.0|              NULL|               NULL|                Overall|            Overall|POINT (-98.200781...|        20|    AST|    AST4_1|           NMBR|
|     2018|   2018|          KS|              Kansas|      NVSS|Asthma|Asthma mortality ...|       Number|       Number|       29|        29.0|              NULL|               NULL|                Overall|            Overall|POINT (-98.200781...|        20|    AST|    AST4_1|           NMBR|
|     2017|   2017|          LA|           Louisiana|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       21|        21.0|              NULL|               NULL|                 Gender|               Male|POINT (-92.445680...|        22|    AST|    AST4_1|           NMBR|
|     2017|   2017|          MA|       Massachusetts|      NVSS|Asthma|Asthma mortality ...|         NULL|       Number|       28|        28.0|              NULL|               NULL|                 Gender|               Male|POINT (-72.082690...|        25|    AST|    AST4_1|           NMBR|
+---------+-------+------------+--------------------+----------+------+--------------------+-------------+-------------+---------+------------+------------------+-------------------+-----------------------+-------------------+--------------------+----------+-------+----------+---------------+
only showing top 20 rows

but when I get here

test_udf = f.udf(lambda x: 1.0, FloatType())
test_spark_df_00_10.withColumn("Test", test_udf(f.col("YearStart"))).show()

here is the traceback to the error:

----> 1 test_spark_df_00_10.withColumn("Test", test_udf(f.col("GeoLocation"))).select("Test").collect()

File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\pyspark\sql\dataframe.py:1263, in DataFrame.collect(self)
   1243 """Returns all the records as a list of :class:`Row`.
   1244 
   1245 .. versionadded:: 1.3.0
   (...)   1260 [Row(age=14, name='Tom'), Row(age=23, name='Alice'), Row(age=16, name='Bob')]
   1261 """
   1262 with SCCallSiteSync(self._sc):
-> 1263     sock_info = self._jdf.collectToPython()
   1264 return list(_load_from_socket(sock_info, BatchedSerializer(CPickleSerializer())))

File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\py4j\java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\pyspark\errors\exceptions\captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
    177 def deco(*a: Any, **kw: Any) -> Any:
    178     try:
--> 179         return f(*a, **kw)
    180     except Py4JJavaError as e:
    181         converted = convert_exception(e.java_exception)

File c:\Users\LARRY\anaconda3\envs\tech-interview\Lib\site-packages\py4j\protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o552.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 68.0 failed 1 times, most recent failure: Lost task 3.0 in stage 68.0 (TID 309) (LAPTOP-3GL266K9.bbrouter executor driver): java.net.SocketException: Connection reset by peer
    at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
    at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
    at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
    at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
    at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
    at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
    at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
    at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
    at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
    at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2856)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2792)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2791)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2791)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1247)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1247)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3060)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2994)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2983)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:989)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2393)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2414)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2433)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2458)
    at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1049)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:410)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:1048)
    at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:448)
    at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:4149)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4323)
    at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
    at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4321)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4321)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:4146)
    at jdk.internal.reflect.GeneratedMethodAccessor169.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
    at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
    at java.base/java.lang.Thread.run(Thread.java:842)
Caused by: java.net.SocketException: Connection reset by peer
    at java.base/sun.nio.ch.NioSocketImpl.implWrite(NioSocketImpl.java:425)
    at java.base/sun.nio.ch.NioSocketImpl.write(NioSocketImpl.java:445)
    at java.base/sun.nio.ch.NioSocketImpl$2.write(NioSocketImpl.java:831)
    at java.base/java.net.Socket$SocketOutputStream.write(Socket.java:1035)
    at java.base/java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:81)
    at java.base/java.io.BufferedOutputStream.write(BufferedOutputStream.java:127)
    at java.base/java.io.DataOutputStream.write(DataOutputStream.java:112)
    at java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
    at org.apache.spark.api.python.PythonRDD$.write$1(PythonRDD.scala:310)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1(PythonRDD.scala:322)
    at org.apache.spark.api.python.PythonRDD$.$anonfun$writeIteratorToStream$1$adapted(PythonRDD.scala:322)
    at scala.collection.Iterator.foreach(Iterator.scala:943)
    at scala.collection.Iterator.foreach$(Iterator.scala:943)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
    at org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:322)
    at org.apache.spark.sql.execution.python.BasePythonUDFRunner$PythonUDFWriterThread.writeIteratorToStream(PythonUDFRunner.scala:58)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.$anonfun$run$1(PythonRunner.scala:451)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1928)
    at org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:282)

a simple user defined function like this always keeps raising a EOF error. I don't know what could be causing this. Is it the null values in the column? What am I missing here? All I want is to be able to do the necessary transformation using these user defined functions

4
  • What is your Spark and Java version? Can you provide the full error traceback? Commented Apr 22 at 10:17
  • edited it just now Commented Apr 22 at 10:22
  • A simple pyspark code is working? Seens that the problem is related misconfiguration. What are your Spark version, python-version? Commented Apr 22 at 19:16
  • Its spark is in 3.5.5 and the python installed globally is 3.12.6 Commented Apr 23 at 3:14

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.