0

I have three files coming in,

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1| aa|  ab|  ac|
## |  2| bb|  bc|  bd|
## +---+----+----+---+

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1| aa|  ab|  ad|
## |  2| bb|  bb|  bd|
## +---+----+----+---+

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1| aa|  ac|  ad|
## |  2| bb|  bc|  bd|
## +---+----+----+---+

I need to compare the first two files (which I'm reading as dataframe) and identify only the changes and then merge with the third file, so my output should be,

## +---+----+----+---+
## |pk1|pk2|val1|val2|
## +---+----+----+---+
## |  1| aa|  ac|  ad|
## |  2| bb|  bb|  bd|
## +---+----+----+---+

How to pick only the changed columns? and update another dataframe?

4
  • I think you need to be a bit more specific (there's ambiguity), but have you tried just join? You can join on any arbitrary condition (even !=). Commented Jun 20, 2017 at 22:52
  • May I know what ambiguity? I can join with the pks, but that will just return everything rite? I mean df1 join df2 on df1.pk1=df2.pk1 and df1.pk2=df2.pk2? This is the way I should join, which is fine but to get the only modified columns? For example, when I join the first 2, I should get only pk1->1, pk2->aa, val2 > ad and pk1->2, pk2->bb,val1->bb Commented Jun 20, 2017 at 23:21
  • 1
    your first dataframe in column val1 second row has bc and then the second dataframe on the same column and same row has bb and again the third dataframe has bc. Then how come your final dataframe has bb? isn't that supposed to be bc? Commented Jun 21, 2017 at 2:44
  • Please read all the 3 dataframe as 3 different files. I wanted to compare the first 2 dataframe(files), identify if there is a change and update the changes only in the 3 dataframe. So when I compare the first 2, i get val1 as bb (which is a change) and this change has to be updated in the last dataframe and hence my final result should be bb. Commented Jun 21, 2017 at 3:06

2 Answers 2

1

I cant comment yet, so I will try to solve this issue. It may need to be still amended. From what I can tell, you are looking for the last unique change. So Val1 has {ab -> ab -> ac, bc -> bb -> bc} so the end result is {ac, bb} because the last file has bc which was in the first file and thus not unique. If this is the case then the best way to deal with this is create a set and take the last value from the set. I will use a udf to get this done

So from your example:

val df1: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ac"),(2,"bb","bc","bd"))).toDF("pk1","pk2","val1","val2")
val df2: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ad"),(2,"bb","bb","bd"))).toDF("pk1","pk2","val1","val2")
val df3: DataFrame = sparkContext.parallelize(Seq((1,"aa","ac","ad"),(2,"bb","bc","bd"))).toDF("pk1","pk2","val1","val2") 

import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.UserDefinedFunction
import sqlContext.implicits._

def getChange: UserDefinedFunction = 
    udf((a: String, b: String, c: String) => Set(a,b,c).last)

df1
.join(df2,df1("pk1")===df2("pk1") && df1("pk2")===df2("pk2"), "inner")
.join(df3,df1("pk1")===df3("pk1") && df1("pk2")===df3("pk2"), "inner")
.select(df1("pk1"),df1("pk2"),
  df1("val1").as("df1Val1"),df2("val1").as("df2Val1"),df3("val1").as("df3Val1"),
  df1("val2").as("df1Val2"),df2("val2").as("df2Val2"),df3("val2").as("df3Val2"))
  .withColumn("val1",getChange($"df1Val1",$"df2Val1",$"df3Val1"))
  .withColumn("val2",getChange($"df1Val2",$"df2Val2",$"df3Val2"))
  .select($"pk1",$"pk2",$"val1",$"val2")
  .orderBy($"pk1")
.show(false)

This yields:

+---+---+----+----+
|pk1|pk2|val1|val2|
+---+---+----+----+
|1  |aa |ac  |ad  |
|2  |bb |bb  |bd  |
+---+---+----+----+

Obviously if you use more columns or more dataframes then this will become a bit more cumbersome to write out, but this should do the trick for your example

Edit:
This is used to add more columns to the mix. As I said obove it is a bit more cumbersome. This will iterate through each column until none are left.

require(df1.columns.sameElements(df2.columns) && df1.columns.sameElements(df3.columns),"DF Columns do not match") //this is a check so may not be needed

val cols: Array[String] = df1.columns

def getChange: UserDefinedFunction = udf((a: String, b: String, c: String) => Set(a,b,c).last)

def createFrame(cols: Array[String], df1: DataFrame, df2: DataFrame, df3:DataFrame): scala.collection.mutable.ListBuffer[DataFrame] = {

val list: scala.collection.mutable.ListBuffer[DataFrame] = new scala.collection.mutable.ListBuffer[DataFrame]()
val keys = cols.slice(0,2) //get the keys
val columns = cols.slice(2, cols.length).toSeq //get the columns to use

  def helper(columns: Seq[String]): scala.collection.mutable.ListBuffer[DataFrame] = {
    if(columns.isEmpty) list
    else {
      list += df1
        .join(df2, df1.col(keys(0)) === df2.col(keys(0)) && df1.col(keys(1)) === df2.col(keys(1)), "inner")
        .join(df3, df1.col(keys(0)) === df3.col(keys(0)) && df1.col(keys(1)) === df3.col(keys(1)), "inner")
        .select(df1.col(keys(0)), df1.col(keys(1)),
        getChange(df1.col(columns.head), df2.col(columns.head), df3.col(columns.head)).as(columns.head))

      helper(columns.tail) //use tail recursion
  }
}
  helper(columns)
}

val list: scala.collection.mutable.ListBuffer[DataFrame] = createFrame(cols, df1, df2, df3)

list.reduce((a,b) =>
  a
    .join(b,a(cols.head)===b(cols.head) && a(cols(1))===b(cols(1)),"inner")
    .drop(b(cols.head))
    .drop(b(cols(1))))
.select(cols.head, cols.tail: _*)
.orderBy(cols.head)
.show

An example with 3 value columns then passing these into the code above:

val df1: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ac","ad"),(2,"bb","bc","bd","bc"))).toDF("pk1","pk2","val1","val2","val3")
val df2: DataFrame = sparkContext.parallelize(Seq((1,"aa","ab","ad","ae"),(2,"bb","bb","bd","bf"))).toDF("pk1","pk2","val1","val2","val3")
val df3: DataFrame = sparkContext.parallelize(Seq((1,"aa","ac","ad","ae"),(2,"bb","bc","bd","bg"))).toDF("pk1","pk2","val1","val2","val3")

yields the following dataframes:

Running the code above yields:

//output
+---+---+----+----+----+
|pk1|pk2|val1|val2|val3|
+---+---+----+----+----+
|  1| aa|  ac|  ad|  ae|
|  2| bb|  bb|  bd|  bg|
+---+---+----+----+----+

There may be a more efficient way to do this as well, but this was off the top of my head.

Edit2

To do this with any amount of keys you can do the following. You will need to define the number of keys when you start. This can also probably be cleaned up as well. Ive got this to work with 4/5 keys, but you should run some tests as well, but it should work:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.UserDefinedFunction

val df1: DataFrame = sparkContext.parallelize(Seq((1,"aa","c","d","ab","ac","ad"),(2,"bb","d","e","bc","bd","bc"))).toDF("pk1","pk2","pk3","pk4","val1","val2","val3")
val df2: DataFrame = sparkContext.parallelize(Seq((1,"aa","c","d","ab","ad","ae"),(2,"bb","d","e","bb","bd","bf"))).toDF("pk1","pk2","pk3","pk4","val1","val2","val3")
val df3: DataFrame = sparkContext.parallelize(Seq((1,"aa","c","d","ac","ad","ae"),(2,"bb","d","e","bc","bd","bg"))).toDF("pk1","pk2","pk3","pk4","val1","val2","val3")

require(df1.columns.sameElements(df2.columns) && df1.columns.sameElements(df3.columns),"DF Columns do not match")

val cols: Array[String] = df1.columns

def getChange: UserDefinedFunction = udf((a: String, b: String, c: String) => Set(a,b,c).last)

def createFrame(cols: Array[String], df1: DataFrame, df2: DataFrame, df3:DataFrame): scala.collection.mutable.ListBuffer[DataFrame] = {

val list: scala.collection.mutable.ListBuffer[DataFrame] = new scala.collection.mutable.ListBuffer[DataFrame]()
val keys = cols.slice(0,4)//get the keys
val columns = cols.slice(4, cols.length).toSeq //get the columns to use

def helper(columns: Seq[String]): scala.collection.mutable.ListBuffer[DataFrame] = {

  if(columns.isEmpty) list
  else {
    list += df1
      .join(df2, Seq(keys :_*), "inner")
      .join(df3, Seq(keys :_*), "inner")
      .withColumn(columns.head + "Out", getChange(df1.col(columns.head), df2.col(columns.head), df3.col(columns.head)))
      .select(col(columns.head + "Out").as(columns.head) +: keys.map(x => df1.col(x)) : _*)

    helper(columns.tail)
  }
}

helper(columns)
}

val list: scala.collection.mutable.ListBuffer[DataFrame] = createFrame(cols, df1, df2, df3)
list.foreach(a => a.show(false))
val keys=cols.slice(0,4)

list.reduce((a,b) =>
  a.alias("a").join(b.alias("b"),Seq(keys :_*),"inner")
  .select("a.*","b." + b.columns.head))
  .orderBy(cols.head)
  .show(false)

This yields:

+---+---+---+---+----+----+----+
|pk1|pk2|pk3|pk4|val1|val2|val3|
+---+---+---+---+----+----+----+
|1  |aa |c  |d  |ac  |ad  |ae  |
|2  |bb |d  |e  |bb  |bd  |bg  |
+---+---+---+---+----+----+----+
Sign up to request clarification or add additional context in comments.

13 Comments

Thanks, it works with the example. I will not get more dataframe but the columns will change dynamically. It will be great if I can get it as a query.
Just to clarify will there potentially be more columns in the dataframe (or is the 4 set) or just that their names change or both?
It is just 3 files but there will be more columns.
I've added code to do this above now. It assumes that you are using 2 primary keys. So if that changes you will need to amend the code
I still have one more question though, the primary key what we have used is only 2, there may be more primary keys involved. Like some files will have 4 primary keys and some has 7. How can we df1.col(keys(0)) === df2.col(keys(0)) && df1.col(keys(1)) === df2.col(keys(1)) dynamically?
|
1

I can also do this by creating the dataframe as a temp view and then do select case statement. Like this,

df1.createTempView("df1")
df2.createTempView("df2")
df3.createTempView("df3")

select case when df1.val1=df2.val1 and df1.val1<>df3.val1 then df3.val1 end

This is much faster.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.