2

I am having two small problems regarding my one bigger problem: I want to read in JSON data once a day and save it as Parquet for later data-related work. Working with parquet is so much faster. But the thing where I am stuck at is the fact that when reading that parquet, Spark always tries to get the schema from the schema file or just takes the schema from the first parquet file and presumes the schema is same for all files. But there are cases when we don't have any data for some days in some columns.

So let's say I have a JSON file with data with the following schema:

root
 |-- Id: long (nullable = true)    
 |-- People: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- Name: string (nullable = true)
 |    |    |-- Amount: double (nullable = true)

And then I have another JSON file where there is no data for the "People" column. And therefore the schema is the following:

root
 |-- Id: long (nullable = true)    
 |-- People: array (nullable = true)
 |    |-- element: string (containsNull = true)

When I would read them in together with read.json, Spark goes through all the files and infers the merged schema from these, more specifically from the first one and just leaves the rows from the second file empty, but the schema is correct.

But when I read these separately and write to parquet separately, then I cannot read them together because for Parquet, the schema doesn't match and I get an error.

My first idea was to read in the file with missing data and change its schema manually by casting column types to match the first schema, but this manual conversion is faulty, it can be out of sync and I don't even know how to cast this string type to array or struct type.

And another problem is when the "Amount" field has only full integers, then Spark reads them in as longs but not doubles, as is necessary. But if I use:

val df2 = df.withColumn("People.Amount", col("People.Amount").cast(org.apache.spark.sql.types.ArrayType(org.apache.spark.sql.types.DoubleType,true)))

Then it doesn't change the type of the original column, but adds a new column named People.Amount

1 Answer 1

1

I think you may be able to jigger something up with schema merging (see docs here). If the first parquet you have has the correct schema, could you then do something like this to apply that schema to the new parquets?

// Read the partitioned table
val mergedDF = spark.read.option("mergeSchema", "true").parquet("data/test_table")
mergedDF.printSchema()

edit

You say there are 200+ columns, do you know them all already? I see two way forwards, and there's probably a number of ways to accomplish this. One is you define all the fields you can see in advance. What I've done in the past is create a json file with a single dummy record that has all the fields I want and is typed exactly as i want them. Then, you can just always load that record at the same time as your "Monday" or "Tuesday" dataset and strip it out post load. It probably isn't best practice, but this is how I've stumbled my way forward.

The alternative is to stop trying to load/save individual datasets in the correct schema, and set the schema once you've loaded all the data. Doesn't sound like the path you want to go down, but at least then you won't have this specific problem.

Sign up to request clarification or add additional context in comments.

8 Comments

I'm pretty sure I already tried it. Is there any difference between Spark 1.6 and 2.0? But anyways, if let's say Monday data has the first schema, Tuesday data has the second schema (without data in some fields) and Wednesday data has the first schema again, then I can never be sure that the "first" parquet has the "correct" schema. If I want to read in Monday and Tuesday together, it has, but if I want Tuesday and Wednesday, then the first one is not the correct schema and it wouldn't work. I will try this again quickly but I think this didn't work and I'm probably missing something important
Well, yeah, I tried again and got the results I expected. This does not work because merging parquet only works when you ADD columns. When some columns are with a different type, it fails. And I know it makes sense, but I don't know how to counter this problem. I don't know how to tell Spark the exact schema I have with 200+ attribute fields in a JSON file in a nested structure. I got the following error: org.apache.spark.SparkException: Failed to merge incompatible data types ArrayType(StructType(StructField(Name,StringType,true), StructField(Amount,DoubleType,true)),true) and StringType
Well, our current flow works in the way that our GoLang server records an event (one of three types) and marshals the Go struct to a JSON string and sends it to corresponding Firehose stream (one of three). This then saves the files with the raw json data to S3 with specific prefix of type and date, ie s3://bucket/Type1/2017/01/27/00. As our Go Lang struct schemas change as we develop our system, the data will be different. Of course in this case, we would mostly add data columns. Problem is when data itself is missing and spark reads in the data with the wrong type.
Right - and you could have part of your process be to move a 1-line file with all of the available columns you may have to that same location, then merge with the incoming data - this means that all of the data would load in the correct format, then you just drop that one dummy record.
Actually that seems reasonable. I will try this. Thank you
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.