8

I have a given DataSet :

+-------------------+--------------------+
|               date|            products|
+-------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|
|2017-09-21 22:00:00|[361, 361, 361, 3...|
|2017-09-28 22:00:00|[360, 361, 361, 3...|

where products column is an array of strings with possible duplicated items.

I would like to remove this duplication (within one row)

What I did is basically write an UDF function like that

 val removeDuplicates: WrappedArray[String] => WrappedArray[String] = _.distinct
 val udfremoveDuplicates = udf(removeDuplicates)

This solution gives me a proper results :

+-------------------+--------------------+--------------------+
|               date|            products|       rm_duplicates|
+-------------------+--------------------+--------------------+
|2017-08-31 22:00:00|[361, 361, 361, 3...|[361, 362, 363, 3...|
|2017-09-22 22:00:00|[361, 362, 362, 3...|[361, 362, 363, 3...|

My questions are :

  1. Do Spark provides a better/more efficient way of getting this result ?

  2. I was thinking about using a map - but how to get desired column as a List to be able to use 'distinct' method like in my removeDuplicates lambda ?

Edit: I marked this topic with java tag, because it does not matter to me in which language (scala or java) I will get an answear :) Edit2: typos

2
  • 2
    Spark doesn't provide a built-in function for this type of operation, so a UDF is the way to go like @user6910411 stated. If you want a List, you can just add .toList after distinct and update your udf type annotation to return a List Commented Nov 12, 2017 at 22:24
  • 1
    Maps are a much more expensive data structure than arrays and should be avoided unless really needed, e.g., to check the presence of an element frequently and when the average size of the element collection is quite large (or when you need to merge maps together, etc.). Even then, if you need to check for the presence of an element, often the fastest way is to represent the elements as a delimited string, e.g., ":123:345:126:", and do a substring search for <delimiter><element><delimiter>. Complex data structures, even arrays, require lots more processing than strings. Commented Nov 13, 2017 at 9:23

4 Answers 4

5

The answers are out of date now, hence this newer answer.

With Spark 2.4 array functions you can something like this, some other aspects shown: as well but one can get the gist of it:

val res4 = res3.withColumn("_f", array_distinct(sort_array(flatten($"_e"))))

BTW a good read here: https://www.waitingforcode.com/apache-spark-sql/apache-spark-2.4.0-features-array-higher-order-functions/read

Sign up to request clarification or add additional context in comments.

1 Comment

I don't think the sorting is necessary–at least there's no mention in the docs that array_distinct assumes the input is sorted.
3

The approach presented in the question--using a UDF--is the best approach as spark-sql has no built-in primitive to uniquify arrays.

If you are dealing with massive amounts of data and/or the array values have unique properties then it's worth thinking about the implementation of the UDF.

WrappedArray.distinct builds a mutable.HashSet behind the scenes and then traverses it to build the array of distinct elements. There are two possible problems with this from a performance standpoint:

  1. Scala's mutable collections are not wonderfully efficient, which is why in the guts of Spark you'll find a lot of Java collections and while loops. If you are in need of extreme performance, you can implement your own generic distinct using faster data structures.

  2. A generic implementation of distinct does not take advantage of any properties of your data. For example, if the arrays will be small on average then a simple implementation that builds directly into an array and does a linear search for duplicates may perform much better than code that builds a complex data structure, despite it's theoretical O(n^2) complexity. For another example, if the values can only be numbers in a small range, or strings from a small set, you can implement uniquification via a bit set.

Again, these strategies should only be considered if you have ridiculous amounts of data. Your simple implementation is perfectly suitable for almost every situation.

Comments

1

You can use a simple UDF.

val dedup = udf((colName: scala.collection.mutable.WrappedArray[String]) => colName.distinct)
    
df.withColumn("DeDupColumn", dedup($"colName"))

Comments

0

Given your current dataframe schema as

root
 |-- date: string (nullable = true)
 |-- products: array (nullable = true)
 |    |-- element: integer (containsNull = false)

You can use following method for removing the duplicates.

df.map(row => DuplicateRemoved(row(0).toString, row(1).asInstanceOf[mutable.WrappedArray[Int]], row(1).asInstanceOf[mutable.WrappedArray[Int]].distinct)).toDF()

Of course you need a case class for this

case class DuplicateRemoved(date: String, products: mutable.WrappedArray[Int], rm_duplicates: mutable.WrappedArray[Int])

You should be getting following output

+-------------------+------------------------------+-------------------------+
|date               |products                      |rm_duplicates            |
+-------------------+------------------------------+-------------------------+
|2017-08-31 22:00:00|[361, 361, 361, 362, 363, 364]|[361, 362, 363, 364]     |
|2017-09-22 22:00:00|[361, 362, 362, 362, 363, 364]|[361, 362, 363, 364]     |
|2017-09-21 22:00:00|[361, 361, 361, 362, 363, 364]|[361, 362, 363, 364]     |
|2017-09-28 22:00:00|[360, 361, 361, 362, 363, 364]|[360, 361, 362, 363, 364]|
+-------------------+------------------------------+-------------------------+

I hope the answer is helpful

4 Comments

The UDF approach is simpler, faster & better. This answer involves unsafe Row access and unnecessary case class creation, which you then turn back into Rows. It processes the entire row of data, which prevents Spark from performing column optimizations or plan re-writes if this were part of a larger transformation DAG. (Also, as an aside, the question said the arrays that needed uniquification had strings; may want to fix that.)
yes @Sim, You are absolutely correct. OP wanted to see other possibilities of doing the same task that is done in my other answer using udf function. Thats why I posted this answer to show that its possible. :)
The exact question was "Do Spark provides a better/more efficient way of getting this result?" There are infinitely many ways to solve the a problem in software that are worse & less efficient than a given solution. :)
I agree with @Sim Ramesh, this is not a good solution and the OP asked for a more efficient one. I'm sorry, I'll downvote your answer.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.