I am trying to merge the header into single file output as csv (ref by @Kang)
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StringType, StructType}
object ListOfSavingFiltered {
def merge(srcPath: String, dstPath: String): Unit = {
val hadoopConfig = new Configuration()
val hdfs = FileSystem.get(hadoopConfig)
FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), false, hadoopConfig, null)
// the "true" setting deletes the source files once they are merged into the new output
}
def main(args: Array[String]): Unit = {
val url = "jdbc:sqlserver://localhost;databaseName=InsightWarehouse;integratedSecurity=true";
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val v_Account = "dbo.v_Account"
val v_Customer = "dbo.v_Customer"
val spark = SparkSession.
builder.master("local[*]")
//.config("spark.debug.maxToStringFields", "100")
.appName("Insight Application Big Data")
.getOrCreate()
val dfAccount = spark
.read
.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", v_Account)
.load()
val dfCustomer = spark
.read
.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("dbtable", v_Customer)
.load()
val Classification = Seq("Contractual Account", "Non-Term Deposit", "Term Deposit")
//dfAccount.printSchema()
val joined = dfAccount.as("a")
.join(dfCustomer.as("c"),
Seq("BusinessDate", "CustomerID"), "LEFT")
.filter(
dfAccount.col("BusinessDate") === "2018-11-28"
&& dfAccount.col("Category") === "Deposit"
// && dfAccount.col("IsActive").equalTo("Yes")
&& dfAccount.col("Classification").isin(Classification: _*)
)
//joined.show()
val columnNames = Seq[String](
"a.AcctBranchName",
"c.CustomerNum",
"c.SourceCustomerId",
"a.SourceAccountId",
"a.AccountNum",
"c.FullName",
"c.LastName",
"c.BirthDate",
"a.Balance",
"a.InterestAccrued",
"a.InterestRate",
"a.SpreadRate",
"a.Classification",
"a.ProductType",
"a.ProductDesc",
"a.StartDate",
"a.MaturityDate",
"a.ClosedDate",
"a.FixOrVar",
"a.Term",
"a.TermUnit",
"a.MonthlyNetIncome",
"a.Status_",
"a.HoldsTotal",
"a.AvailableFunds",
"a.InterestRateIndex",
"a.InterestRateVariance",
"a.FeePlan",
"c.CustEmplFullName",
"a.IsActive",
"c.Residence",
"c.Village",
"c.Province",
"c.Commune",
"c.District",
"a.Currency",
"c.TaxType",
"c.TaxRate",
"RollOverStatus"
)
val outputfile = "src/main/resources/out/"
var filename = "lifOfSaving.csv.gz"
var outputFileName = outputfile + "/temp_" + filename
var mergedFileName = outputfile + "/merged_" + filename
var mergeFindGlob = outputFileName
val responseWithSelectedColumns = joined.select(columnNames.map(c => col(c)): _*)
.withColumn("RollOverStatus", when(col("RollOverStatus").equalTo("Y"), "Yes").otherwise("No"))
//create a new data frame containing only header names
import scala.collection.JavaConverters._
val headerDF = spark.createDataFrame(List(Row.fromSeq(responseWithSelectedColumns.columns.toSeq)).asJava, responseWithSelectedColumns.schema)
//merge header names with data
headerDF.union(responseWithSelectedColumns)
// .coalesce(1) //So just a single part- file will be created
.repartition(4)
.write.mode("overwrite")
.option("codec", "org.apache.hadoop.io.compress.GzipCodec")
.format("com.databricks.spark.csv")
.option("charset", "UTF8")
.option("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false") //Avoid creating of crc files
.option("header", "false") //Write the header
.save(outputFileName)
merge(mergeFindGlob, mergedFileName)
responseWithSelectedColumns.unpersist()
spark.stop()
}
}
The code seem correct but still get error message as the following:
Exception in thread "main" java.lang.ClassCastException: java.lang.String cannot be cast to java.sql.Date
at org.apache.spark.sql.catalyst.CatalystTypeConverters$DateConverter$.toCatalystImpl(CatalystTypeConverters.scala:300)
Anyone please help?
DataFramewith some data (whose one of its fields is of type Date), and aSeqof column names. And you want to append those names as the header of your data - am I right?unionwith both having different schemas?