1

I am using Spark2.3 with Scala and trying to load multiple csv files from a directory, I am getting an issue that it load files but miss some columns from them

I have following sample files

test1.csv

Col1,Col2,Col3,Col4,Col5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5
aaa,2,3,4,5

test2.csv

Col1,Col2,Col3,Col4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4
aaa,2,3,4

test3.csv

Col1,Col2,Col3,Col4,Col6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6
aaa,2,3,4,6

test4.csv

Col1,Col2,Col5,Col4,Col3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3
aaa,2,5,4,3

What i want to do is load all these files into a dataframe with all the columns in 4 files but when i try to load files with following code

val dft = spark.read.format("csv").option("header", "true").load("path/to/directory/*.csv")

It loads csv but misses some columns from csv.

here is the output of dft.show()

+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col6|
+----+----+----+----+----+
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   3|   4|   6|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   5|   4|   3|
| aaa|   2|   3|   4|   5|
| aaa|   2|   3|   4|   5|
+----+----+----+----+----+

I want it to be like this

+----+----+----+----+----+----+
|Col1|Col2|Col3|Col4|Col5|Col6|
+----+----+----+----+----+----+
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
| aaa|   2|   3|   4|   5|   6|
+----+----+----+----+----+----+

Please guide me what is wrong with my code? or is there any other efficient way to do it?

Thanks

2
  • 2
    Some thing related to the same issue stackoverflow.com/questions/48999381/… Commented Aug 6, 2018 at 7:00
  • Spark's CSV reader does not support missing columns. You have to find another way. Can you tell me how many files you have and how big they are? Also, what do you expect when the column is not there? Commented Aug 6, 2018 at 9:18

2 Answers 2

1

I found solution for the problem I was trying to solve, so I thought I should share this for anyone who is trying to achieve same output.

I used Parquet to solve the merge task in different files with some common columns.

here is the code

val conf = new SparkConf()
      .setAppName("Exercise")
      .setMaster("local")
val sc = new SparkContext(conf)

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val spark = SparkSession
      .builder()
      .appName("Spark Sql Session")
      .config("spark.some.config.option", "test")
      .getOrCreate()

val filepath = sc.wholeTextFiles("path/to/MergeFiles/*.txt").keys
val list = filepath.collect().toList
var i = 1
list.foreach{ path  =>
val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("delimiter", ",")
    .option("header", "true")
    .load(path)
df.write.parquet("data/test_tbl/key="+ i)
    i +=1
}
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_tbl")

mergedDF.write.format("csv").save("target/directory/for/mergedFiles")

and following is the output of mergedDF.show()

+----+----+----+----+----+----+---+
|Col1|Col2|Col3|Col4|Col6|Col5|key|
+----+----+----+----+----+----+---+
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |6   |null|2  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |4  |
|aaa |2   |3   |4   |null|5   |3  |
|aaa |2   |3   |4   |null|5   |3  |
+----+----+----+----+----+----+---+
Sign up to request clarification or add additional context in comments.

Comments

0

If each individual file is not too big, you could use wholeTextFile and parse your files yourself as follows:

val columns = (1 to 6).map("Col"+_)
val rdd = sc.wholeTextFiles("path_to_files/*")
    .map(_._2.split("\\n"))
    .flatMap(x=> { 
        // We consider the first line as the header
        val cols = x.head.split(",")
        // Then we flatten the remaining lines and shape each of them 
        // as a list of tuples (ColumnName, content).
        x.tail
            .map(_.split(","))
            .map(row => row.indices.map(i => cols(i) -> row(i))) 
    })
    .map(_.toMap)
    // Here we take the list of all the colmuns and map each of them to
    // its value if it exists, null otherwise.
    .map(map => columns.map(name => map.getOrElse(name, null) ))
    .map(Row.fromSeq _)

This code puts each file within a single record using wholeTextFile (this is why files cannot be too big), uses the first line to determine what columns are present and in which order, creates a Map that maps column names to values and convert it to a row with nulls when values are missing. Then, the data is ready to go into a dataframe:

val schema = StructType(
    columns.map(name => StructField(name, StringType, true))
)
spark.createDataFrame(rdd, schema).show()

7 Comments

thanks for replying , but first portion gives err on "columns" in this line "map(map => columns.map(name => map.getOrElse(name, null) ))" i.e "cmd22.sc:1: not found: value columns". Can you please tell me which columns it is referring to?
Right, I should have defined columns earlier ;-) I edited the answer.
thanks for the edit it solve the issue for the first portion, now the it gives error on StructField "not found: value StructField" , then i added import statement for it and getting this error ..... "not enough arguments for method apply: (name: String, dataType: org.apache.spark.sql.types.DataType, nullable: Boolean, metadata: org.apache.spark.sql.types.Metadata)org.apache.spark.sql.types.StructField in object StructField. Unspecified value parameter dataType. val schema = StructField("
here is the import statement i added to the 2nd portion of the code.... "import org.apache.spark.sql.types.StructField"
You could even put import org.apache.spark.sql.types._ or import org.apache.spark.sql.types.{ StructType, StructField, StringType} because you need all three. I realized that I had made another typo. I had used a StructField instead of a StructType in the schema declaration. I corrected the answer.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.