3

I have a dataframe with columns col1,col2,col3. col1,col2 are strings. col3 is a Map[String,String] defined below

 |-- col3: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

I have grouped by col1,col2 and aggregated using collect_list to get an Array of Maps and stored in col4.

 df.groupBy($"col1", $"col2").agg(collect_list($"col3").as("col4"))

 |-- col4: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

However I would like to get col4 as a single map with all the maps combined. Currently I have:

[[a->a1,b->b1],[c->c1]]

Expected output

[a->a1,b->b1,c->c1]

Using an udf would be ideal?

Any help is appreciated. Thanks.

1

3 Answers 3

5

You can use aggregate and map_concat:

import org.apache.spark.sql.functions.{expr, collect_list}

val df = Seq(
  (1, Map("k1" -> "v1", "k2" -> "v3")),
  (1, Map("k3" -> "v3")),
  (2, Map("k4" -> "v4")),
  (2, Map("k6" -> "v6", "k5" -> "v5"))
).toDF("id", "data")

val mergeExpr = expr("aggregate(data, map(), (acc, i) -> map_concat(acc, i))")

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeExpr.as("merged_data"))
  .show(false)

// +---+------------------------------+
// |id |merged_data                   |
// +---+------------------------------+
// |1  |[k1 -> v1, k2 -> v3, k3 -> v3]|
// |2  |[k4 -> v4, k6 -> v6, k5 -> v5]|
// +---+------------------------------+

With map_concat we concatenate all the Map items of the data column via the aggregate build-in function which allows us to apply the aggregation to the pairs of the list.

Attention: current implementation of map_concat on Spark 2.4.5 it allows co-existence of identical keys. This is most likely a bug since it is not the expected behaviour according to the official documentation. Please be aware of that.

If you want to avoid such a case you can also go for a UDF:

import org.apache.spark.sql.functions.{collect_list, udf}

val mergeMapUDF = udf((data: Seq[Map[String, String]]) => data.reduce(_ ++ _))

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeMapUDF($"data").as("merged_data"))
  .show(false)

UPDATE (2022-08-27)

  1. In Spark 3.3.0 the above code doesn't work and the following exception is thrown:
AnalysisException: cannot resolve 'aggregate(`data`, map(), lambdafunction(map_concat(namedlambdavariable(), namedlambdavariable()), namedlambdavariable(), namedlambdavariable()), lambdafunction(namedlambdavariable(), namedlambdavariable()))' due to data type mismatch: argument 3 requires map<null,null> type, however, 'lambdafunction(map_concat(namedlambdavariable(), namedlambdavariable()), namedlambdavariable(), namedlambdavariable())' is of map<string,string> type.;
Project [id#110, aggregate(data#119, map(), lambdafunction(map_concat(cast(lambda acc#122 as map<string,string>), lambda i#123), lambda acc#122, lambda i#123, false), lambdafunction(lambda id#124, lambda id#124, false)) AS aggregate(data, map(), lambdafunction(map_concat(namedlambdavariable(), namedlambdavariable()), namedlambdavariable(), namedlambdavariable()), lambdafunction(namedlambdavariable(), namedlambdavariable()))#125]
+- Aggregate [id#110], [id#110, collect_list(data#111, 0, 0) AS data#119]
   +- Project [_1#105 AS id#110, _2#106 AS data#111]
      +- LocalRelation [_1#105, _2#106]

It seems that map() is initialised as map<null,null> when map<string,string> is expected.

To fix this just cast map() into map<string, string> explicitly with cast(map() as map<string, string>).

Here is the updated code:

val mergeExpr = expr("aggregate(data, cast(map() as map<string,
string>), (acc, i) -> map_concat(acc, i))")

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeExpr)
  .show(false)
  1. Regarding the identical keys bug, this seems fixed in the latest versions. If you try to add identical keys an exception is thrown:
Caused by: RuntimeException: Duplicate map key k5 was found, please check the input data. If you want to remove the duplicated keys, you can set spark.sql.mapKeyDedupPolicy to LAST_WIN so that the key inserted at last takes precedence.
Sign up to request clarification or add additional context in comments.

6 Comments

How do we make the "aggregate" method work for a map of a different type ? Basically, how do we initalize an empty map of (int, int) or (float, float) because by default an empty map is (string, string)
you can do map(0, 0) to initialise a map of [int, int]
as a matter of fact my previous suggestion is wrong. I am not able to initialize the map as map[string, string]. Have you found any solution so far? I tried with map('', '') but then of course you need elegant way to remove the redundant '' item. Unfortunately I was not able to find such a elegant alternative without involving plain Scala code
The only ugly alternative I found is trough map_filter for excluding '' empty key with: val mergeExpr = expr("map_filter(aggregate(data, map('', ''), (acc, i) -> map_concat(acc, i)), (k, v) -> k != '')")
@SoumyadipGhosh actually the answer was quite easy :) you just use cast to convert map into map<string, string>. Please check my updated answer
|
0

You can achieve it without UDF. Let's create your dataframe:

val df = Seq(Seq(Map("a" -> "a1", "b" -> "b1"), Map("c" -> "c1", "d" -> "d1"))).toDF()
df.show(false)
df.printSchema()

output:

+----------------------------------------+
|value                                   |
+----------------------------------------+
|[[a -> a1, b -> b1], [c -> c1, d -> d1]]|
+----------------------------------------+

root
 |-- value: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

If your array contains 2 elements just use map_concat:

df.select(map_concat('value.getItem(0), 'value.getItem(1))).show(false)

or this (I have no idea how to loop from 0 to 'value array type column size dynamically, it might be the shortest solution)

df.select(map_concat((for {i <- 0 to 1} yield 'value.getItem(i)): _*)).show(false)

Otherwise if your array contains multiple maps and size is not known you could try below method:

  val df2 = df.map(s => {
    val list = s.getList[Map[String, String]](0)
    var map = Map[String, String]()
    for (i <- 0 to list.size() - 1) {
      map = map ++ list.get(i)
    }
    map
  })

  df2.show(false)
  df2.printSchema()

output:

+------------------------------------+
|value                               |
+------------------------------------+
|[a -> a1, b -> b1, c -> c1, d -> d1]|
+------------------------------------+

root
 |-- value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

Comments

0

If the number of records are less, then you can explode and collect them as struct() and again use the map_from_entries

val df = Seq(Seq(Map("a" -> "a1", "b" -> "b1"), Map("c" -> "c1", "d" -> "d1"))).toDF()
df.show(false)
df.printSchema()

+----------------------------------------+
|value                                   |
+----------------------------------------+
|[{a -> a1, b -> b1}, {c -> c1, d -> d1}]|
+----------------------------------------+

root
 |-- value: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)


df.createOrReplaceTempView("items")

val df2 = spark.sql("""

with t1 (select value from items),
     t2 (select value, explode(value) m1 from t1 ),
     t3 (select value, explode(m1) (k,v) from t2 ),
     t4 (select value, struct(k,v) r1 from t3 ),
     t5 (select collect_list(r1) r2 from t4 )
     select map_from_entries(r2) merged_data from t5
    """)
df2.show(false)
df2.printSchema

+------------------------------------+
|merged_data                         |
+------------------------------------+
|{a -> a1, b -> b1, c -> c1, d -> d1}|
+------------------------------------+

root
 |-- merged_data: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

Note that spark throws org.apache.spark.sql.AnalysisException: expression t4.value cannot be used as a grouping expression because its data type array<map<string,string>> is not an orderable data type. when we use the "value" in group-by.

Lets take abiratsis sample data. Here we have to use the id column in group-by, otherwise all the map-elements will be merged together.

val df = Seq(
  (1, Map("k1" -> "v1", "k2" -> "v3")),
  (1, Map("k3" -> "v3")),
  (2, Map("k4" -> "v4")),
  (2, Map("k6" -> "v6", "k5" -> "v5"))
).toDF("id", "data")
df.show(false)
df.printSchema()

+---+--------------------+
|id |data                |
+---+--------------------+
|1  |{k1 -> v1, k2 -> v3}|
|1  |{k3 -> v3}          |
|2  |{k4 -> v4}          |
|2  |{k6 -> v6, k5 -> v5}|
+---+--------------------+

root
 |-- id: integer (nullable = false)
 |-- data: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

df.createOrReplaceTempView("items")

val df2 = spark.sql("""

with t1 (select id, data from items),
     t2 (select id, explode(data) (k,v) from t1 ),
     t3 (select id, struct(k,v) r1 from t2 ),
     t4 (select id, collect_list(r1) r2 from t3 group by id )
     select id, map_from_entries(r2) merged_data from t4
    """)
df2.show(false)
df2.printSchema

+---+------------------------------+
|id |merged_data                   |
+---+------------------------------+
|1  |{k1 -> v1, k2 -> v3, k3 -> v3}|
|2  |{k4 -> v4, k6 -> v6, k5 -> v5}|
+---+------------------------------+

root
 |-- id: integer (nullable = false)
 |-- merged_data: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.