-1

I am trying to merge the header into single file output as csv (ref by @Kang)

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StringType, StructType}

object ListOfSavingFiltered {
  def merge(srcPath: String, dstPath: String): Unit = {
    val hadoopConfig = new Configuration()
    val hdfs = FileSystem.get(hadoopConfig)
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
    // the "true" setting deletes the source files once they are merged into the new output
  }

  def main(args: Array[String]): Unit = {

    val url = "jdbc:sqlserver://localhost;databaseName=InsightWarehouse;integratedSecurity=true";
    val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"

    val v_Account = "dbo.v_Account"
    val v_Customer = "dbo.v_Customer"

    val spark = SparkSession.
      builder.master("local[*]")
      //.config("spark.debug.maxToStringFields", "100")
      .appName("Insight Application Big Data")
      .getOrCreate()


    val dfAccount = spark
      .read
      .format("jdbc")
      .option("url", url)
      .option("driver", driver)
      .option("dbtable", v_Account)
      .load()

    val dfCustomer = spark
      .read
      .format("jdbc")
      .option("url", url)
      .option("driver", driver)
      .option("dbtable", v_Customer)
      .load()

    val Classification = Seq("Contractual Account", "Non-Term Deposit", "Term Deposit")

    //dfAccount.printSchema()
    val joined = dfAccount.as("a")
      .join(dfCustomer.as("c"),
        Seq("BusinessDate", "CustomerID"), "LEFT")
      .filter(
        dfAccount.col("BusinessDate") === "2018-11-28"
          && dfAccount.col("Category") === "Deposit"
          // && dfAccount.col("IsActive").equalTo("Yes")
          && dfAccount.col("Classification").isin(Classification: _*)
         )

    //joined.show()
    val columnNames = Seq[String](
      "a.AcctBranchName",
      "c.CustomerNum",
      "c.SourceCustomerId",
      "a.SourceAccountId",
      "a.AccountNum",
      "c.FullName",
      "c.LastName",
      "c.BirthDate",
      "a.Balance",
      "a.InterestAccrued",
      "a.InterestRate",
      "a.SpreadRate",
      "a.Classification",
      "a.ProductType",
      "a.ProductDesc",
      "a.StartDate",
      "a.MaturityDate",
      "a.ClosedDate",
      "a.FixOrVar",
      "a.Term",
      "a.TermUnit",
      "a.MonthlyNetIncome",
      "a.Status_",
      "a.HoldsTotal",
      "a.AvailableFunds",
      "a.InterestRateIndex",
      "a.InterestRateVariance",
      "a.FeePlan",
      "c.CustEmplFullName",
      "a.IsActive",
      "c.Residence",
      "c.Village",
      "c.Province",
      "c.Commune",
      "c.District",
      "a.Currency",
      "c.TaxType",
      "c.TaxRate",
      "RollOverStatus"
    )

    val outputfile = "src/main/resources/out/"
    var filename = "lifOfSaving.csv.gz"
    var outputFileName = outputfile + "/temp_" + filename
    var mergedFileName = outputfile + "/merged_" + filename
    var mergeFindGlob = outputFileName

    val responseWithSelectedColumns = joined.select(columnNames.map(c => col(c)): _*)
      .withColumn("RollOverStatus", when(col("RollOverStatus").equalTo("Y"), "Yes").otherwise("No"))


    //create a new data frame containing only header names
    import scala.collection.JavaConverters._
    val headerDF = spark.createDataFrame(List(Row.fromSeq(responseWithSelectedColumns.columns.toSeq)).asJava, responseWithSelectedColumns.schema)


    //merge header names with data
    headerDF.union(responseWithSelectedColumns)
      // .coalesce(1) //So just a single part- file will be created
      .repartition(4)
      .write.mode("overwrite")
      .option("codec", "org.apache.hadoop.io.compress.GzipCodec")
      .format("com.databricks.spark.csv")
      .option("charset", "UTF8")
      .option("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") //Avoid creating of crc files
      .option("header", "false") //Write the header

      .save(outputFileName)
    merge(mergeFindGlob, mergedFileName)
    responseWithSelectedColumns.unpersist()

    spark.stop()
  }
}

The code seem correct but still get error message as the following:

Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Date
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$DateConverter$.toCatalystImpl(CatalystTypeConverters.scala:300)

Anyone please help?

3
  • 2
    if I understood your problem well, you have a DataFrame with some data (whose one of its fields is of type Date), and a Seq of column names. And you want to append those names as the header of your data - am I right? Commented Dec 5, 2018 at 14:21
  • I am trying like this ref. stackoverflow.com/questions/38056152/… Commented Dec 5, 2018 at 14:33
  • 1
    uhm.. the problem is that your header will be only Strings but your data has different data types to which String cannot be casted. What happens if you do the union with both having different schemas? Commented Dec 5, 2018 at 14:53

1 Answer 1

1

You don't need to make your headers DataFrame match your data schema.

For example.

import org.apache.spark.sql.{SparkSession, functions => sqlfunctions}

val spark =
  SparkSession
  .builder
  .master("local[*]")
  .getOrCreate()
import spark.implicits._

val dataDF =
  List(
    (1, "Luis"),
    (2, "kn3l")
  ).toDF("id", "name").withColumn("date", sqlfunctions.current_date())

val headersDF = 
  List(
    ("id", "name", "date")
  ).toDF("id", "name", "date")

val union = headersDF.unionByName(dataDF)
// union: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [id: string, name: string, date: string]

union.printSchema()
// root
// |-- id: string (nullable = true)
// |-- name: string (nullable = true)
// |-- date: string (nullable = true)

union.show()
// +---+----+----------+
// | id|name|      date|
// +---+----+----------+
// | id|name|      date|
// |  1|Luis|2018-12-05|
// |  2|kn3l|2018-12-05|
// +---+----+----------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.