22

I am trying to create a DataFrame using RDD.

First I am creating a RDD using below code -

val account = sc.parallelize(Seq(
                                 (1, null, 2,"F"), 
                                 (2, 2, 4, "F"),
                                 (3, 3, 6, "N"),
                                 (4,null,8,"F")))

It is working fine -

account: org.apache.spark.rdd.RDD[(Int, Any, Int, String)] = ParallelCollectionRDD[0] at parallelize at :27

but when try to create DataFrame from the RDD using below code

account.toDF("ACCT_ID", "M_CD", "C_CD","IND")

I am getting below error

java.lang.UnsupportedOperationException: Schema for type Any is not supported

I analyzed that whenever I put null value in Seq then only I got the error.

Is there any way to add null value?

1
  • 4
    use (1, null: Integer, 2,"F") Commented Sep 13, 2016 at 8:01

2 Answers 2

33

Alternative way without using RDDs:

import spark.implicits._

val df = spark.createDataFrame(Seq(
  (1, None,    2, "F"),
  (2, Some(2), 4, "F"),
  (3, Some(3), 6, "N"),
  (4, None,    8, "F")
)).toDF("ACCT_ID", "M_CD", "C_CD","IND")

df.show
+-------+----+----+---+
|ACCT_ID|M_CD|C_CD|IND|
+-------+----+----+---+
|      1|null|   2|  F|
|      2|   2|   4|  F|
|      3|   3|   6|  N|
|      4|null|   8|  F|
+-------+----+----+---+

df.printSchema
root
 |-- ACCT_ID: integer (nullable = false)
 |-- M_CD: integer (nullable = true)
 |-- C_CD: integer (nullable = false)
 |-- IND: string (nullable = true)
Sign up to request clarification or add additional context in comments.

Comments

23

The problem is that Any is too general type and Spark just has no idea how to serialize it. You should explicitly provide some specific type, in your case Integer. Since null can't be assigned to primitive types in Scala you can use java.lang.Integer instead. So try this:

val account = sc.parallelize(Seq(
                                 (1, null.asInstanceOf[Integer], 2,"F"), 
                                 (2, new Integer(2), 4, "F"),
                                 (3, new Integer(3), 6, "N"),
                                 (4, null.asInstanceOf[Integer],8,"F")))

Here is an output:

rdd: org.apache.spark.rdd.RDD[(Int, Integer, Int, String)] = ParallelCollectionRDD[0] at parallelize at <console>:24

And the corresponding DataFrame:

scala> val df = rdd.toDF("ACCT_ID", "M_CD", "C_CD","IND")

df: org.apache.spark.sql.DataFrame = [ACCT_ID: int, M_CD: int ... 2 more fields]

scala> df.show
+-------+----+----+---+
|ACCT_ID|M_CD|C_CD|IND|
+-------+----+----+---+
|      1|null|   2|  F|
|      2|   2|   4|  F|
|      3|   3|   6|  N|
|      4|null|   8|  F|
+-------+----+----+---+

Also you can consider some cleaner way to declare the null integer value like:

object Constants {
  val NullInteger: java.lang.Integer = null
}

1 Comment

How should I go about it if I'm using case class for creating DataFrame, i.e., I'm creating DataFrame using spark.sparkContext.parallellize(Seq(A(_, _), A(_, _))).toDF() where I have case class A(_, _)? I've tried above technique but null.asInstanceOf[T] is giving me NullPointerException and null: T (as told in comment on question) is giving me an expression of type Null is ineligible for implicit conversion

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.