3

I am using spark version 2.3 and working on some poc wherein, I have to load some bunch of csv files to spark dataframe.

Considering below csv as a sample which I need to parse and load it into dataframe. The given csv has multiple bad records which needs to be identified.

id,name,age,loaded_date,sex
1,ABC,32,2019-09-11,M
2,,33,2019-09-11,M
3,XYZ,35,2019-08-11,M
4,PQR,32,2019-30-10,M   #invalid date
5,EFG,32,               #missing other column details
6,DEF,32,2019/09/11,M   #invalid date format
7,XYZ,32,2017-01-01,9   #last column has to be character only
8,KLM,XX,2017-01-01,F
9,ABC,3.2,2019-10-10,M  #decimal value for integer data type
10,ABC,32,2019-02-29,M  #invalid date

It would have been an easy task, if I have to parse it using python or pandas functions.

This's how I defined schema for this.

from pyspark.sql.types import*
schema = StructType([
            StructField("id",            IntegerType(), True),
            StructField("name",          StringType(), True),
            StructField("age",           IntegerType(), True),
            StructField("loaded_date",   DateType(), True),
            StructField("sex",           StringType(), True),
            StructField("corrupt_record",StringType(), True)])



df=spark.read.format("com.databricks.spark.csv") \
.option("header", "true") \
.option("dateFormat", "yyyy-MM-dd") \
.option("nanValue","0") \
.option("nullValue"," ") \
.option("treatEmptyValuesAsNulls","false") \
.option("columnNameOfCorruptRecord", "corrupt_record") \
.schema(schema).load(file)

>>> df.show(truncate=False)
+----+----+----+-----------+----+----------------------+
|id  |name|age |loaded_date|sex |corrupt_record        |
+----+----+----+-----------+----+----------------------+
|1   |ABC |32  |2019-09-11 |M   |null                  |
|2   |null|33  |2019-09-11 |M   |null                  |
|3   |XYZ |35  |2019-08-11 |M   |null                  |
|4   |PQR |32  |2021-06-10 |M   |null                  |
|5   |EFG |32  |null       |null|5,EFG,32,             |
|null|null|null|null       |null|6,DEF,32,2019/09/11,M |
|7   |XYZ |32  |2017-01-01 |9   |null                  |
|null|null|null|null       |null|8,KLM,XX,2017-01-01,F |
|null|null|null|null       |null|9,ABC,3.2,2019-10-10,M|
|10  |ABC |32  |2019-03-01 |M   |null                  |
+----+----+----+-----------+----+----------------------+

Above code has parsed many records as expected but has failed to check on invalid dates. see record '4' & '10'. It has converted to some junk dates.

I can load dates as string type and create some udf or use cast to parse it correctly and to see whether a date entered is valid or not. Is there any way to check invalid date in first place without using a custom udf or later in the code.

Also, I was looking a some way to handle record '7' which is having a numeric value for last column.

2 Answers 2

1

As requested by the OP, I am jotting down the answer here in PySpark -

First of all, just load the data without any prespecified schema, also as done by @AndrzejS

df = spark.read.option("header", "true").csv("data/yourdata.csv")
df.show()
+---+----+---+-----------+----+
| id|name|age|loaded_date| sex|
+---+----+---+-----------+----+
|  1| ABC| 32| 2019-09-11|   M|
|  2|null| 33| 2019-09-11|   M|
|  3| XYZ| 35| 2019-08-11|   M|
|  4| PQR| 32| 2019-30-10|   M|
|  5| EFG| 32|       null|null|
|  6| DEF| 32| 2019/09/11|   M|
|  7| XYZ| 32| 2017-01-01|   9|
|  8| KLM| XX| 2017-01-01|   F|
|  9| ABC|3.2| 2019-10-10|   M|
| 10| ABC| 32| 2019-02-29|   M|
+---+----+---+-----------+----+

Then, we need to determine the which of the values do not fit into the scheme of columns. For eg; XX or 32 cannot be an age, so these values need to be marked as Null. We do a test if this value is an Integer or else. Similarly, we do the test if loaded_date is indeed a date or not and fianlly we check if the sex is either F/M. Please refer to my previous post on these tests.

df = df.select('id','name',
               'age', (col('age').cast('int').isNotNull() & (col('age').cast('int') - col('age') == 0)).alias('ageInt'),
               'loaded_date',(col('loaded_date').cast('date').isNotNull()).alias('loaded_dateDate'),
               'sex'
              )
df.show()
+---+----+---+------+-----------+---------------+----+
| id|name|age|ageInt|loaded_date|loaded_dateDate| sex|
+---+----+---+------+-----------+---------------+----+
|  1| ABC| 32|  true| 2019-09-11|           true|   M|
|  2|null| 33|  true| 2019-09-11|           true|   M|
|  3| XYZ| 35|  true| 2019-08-11|           true|   M|
|  4| PQR| 32|  true| 2019-30-10|          false|   M|
|  5| EFG| 32|  true|       null|          false|null|
|  6| DEF| 32|  true| 2019/09/11|          false|   M|
|  7| XYZ| 32|  true| 2017-01-01|           true|   9|
|  8| KLM| XX| false| 2017-01-01|           true|   F|
|  9| ABC|3.2| false| 2019-10-10|           true|   M|
| 10| ABC| 32|  true| 2019-02-29|          false|   M|
+---+----+---+------+-----------+---------------+----+

Finally, using if/else, which is pyspark is when/otherwise to mark irrelevant values as Null.

df = df.withColumn('age',when(col('ageInt')==True,col('age')).otherwise(None))\
       .withColumn('loaded_date',when(col('loaded_dateDate')==True,col('loaded_date')).otherwise(None))\
       .withColumn('sex',when(col('sex').isin('M','F'),col('sex')).otherwise(None))\
       .drop('ageInt','loaded_dateDate')
df.show()
+---+----+----+-----------+----+
| id|name| age|loaded_date| sex|
+---+----+----+-----------+----+
|  1| ABC|  32| 2019-09-11|   M|
|  2|null|  33| 2019-09-11|   M|
|  3| XYZ|  35| 2019-08-11|   M|
|  4| PQR|  32|       null|   M|
|  5| EFG|  32|       null|null|
|  6| DEF|  32|       null|   M|
|  7| XYZ|  32| 2017-01-01|null|
|  8| KLM|null| 2017-01-01|   F|
|  9| ABC|null| 2019-10-10|   M|
| 10| ABC|  32|       null|   M|
+---+----+----+-----------+----+
Sign up to request clarification or add additional context in comments.

Comments

1

you can try something like that, without udfs:

val data = spark.read.option("header", "true").csv("data/yourdata.csv")
val data2 = data.select('id,
('age.cast("double")
  .cast("int")
  .cast("string")
  .equalTo('age) && 'age.cast("int").isNotNull )
  .equalTo("true")
  .as("isINT"),
'loaded_date.cast("date").isNotNull.as("isDATE"),
('sex.cast("int").isNotNull || 'sex.isNull).notEqual("true").as("isCHAR"))

data2.show()
+---+-----+------+------+
| id|isINT|isDATE|isCHAR|
+---+-----+------+------+
|  1| true|  true|  true|
|  2| true|  true|  true|
|  3| true|  true|  true|
|  4| true| false|  true|
|  5| true| false| false|
|  6| true| false|  true|
|  7| true|  true| false|
|  8|false|  true|  true|
|  9|false|  true|  true|
| 10| true| false|  true|
+---+-----+------+------+

val corrupted = data2.select('id,
    concat(data2.columns.map(data2(_)).drop(1):_*).contains("false").as("isCorrupted")
  )
  corrupted.show()

+---+-----------+
| id|isCorrupted|
+---+-----------+
|  1|      false|
|  2|      false|
|  3|      false|
|  4|       true|
|  5|       true|
|  6|       true|
|  7|       true|
|  8|       true|
|  9|       true|
| 10|       true|
+---+-----------+

data.join(corrupted,"id").show()

+---+----+---+-----------+----+-----------+
| id|name|age|loaded_date| sex|isCorrupted|
+---+----+---+-----------+----+-----------+
|  1| ABC| 32| 2019-09-11|   M|      false|
|  2|null| 33| 2019-09-11|   M|      false|
|  3| XYZ| 35| 2019-08-11|   M|      false|
|  4| PQR| 32| 2019-30-10|   M|       true|
|  5| EFG| 32|       null|null|       true|
|  6| DEF| 32| 2019/09/11|   M|       true|
|  7| XYZ| 32| 2017-01-01|   9|       true|
|  8| KLM| XX| 2017-01-01|   F|       true|
|  9| ABC|3.2| 2019-10-10|   M|       true|
| 10| ABC| 32| 2019-02-29|   M|       true|
+---+----+---+-----------+----+-----------+

2 Comments

It's scala, but python code will be very similar in this case.
would you be able to convert above in pyspark? Thanks

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.