0

I have a streaming Dataframe with 2 columns. A key column represented as String and an objects column which is an array containing one object element. I want to be able to merge records or rows in the Dataframe with the same key such that the merged records form an array of objects.

Dataframe

----------------------------------------------------------------
|key    | objects                                              |
----------------------------------------------------------------
|abc    | [{"name": "file", "type": "sample", "code": "123"}]  |
|abc    | [{"name": "image", "type": "sample", "code": "456"}] |
|xyz    | [{"name": "doc", "type": "sample", "code": "707"}]   |
----------------------------------------------------------------


Merged Dataframe

-------------------------------------------------------------------------
|key   |  objects                                                        |
-------------------------------------------------------------------------
|abc   |    [{"name": "file", "type": "sample", "code": "123"}, {"name": 
            "image", "type": "sample", "code": "456"}]                   |
|xyz   |   [{"name": "doc", "type": "sample", "code": "707"}]            |
--------------------------------------------------------------------------

One option to do this to convert this into a PairedRDD and apply the reduceByKey function, but I'd prefer to do this with Dataframes if possible since it'd more optimal. Is there any way to do this with Dataframes without compromising on performance?

1 Answer 1

1

Assuming column objects is an array of a single JSON string, here's how you can merge objects by key:

import org.apache.spark.sql.functions._

case class Obj(name: String, `type`: String, code: String)

val df = Seq(
    ("abc", Obj("file", "sample", "123")),
    ("abc", Obj("image", "sample", "456")),
    ("xyz", Obj("doc", "sample", "707"))
  ).
  toDF("key", "object").
  select($"key", array(to_json($"object")).as("objects"))

df.show(false)
// +---+-----------------------------------------------+
// |key|objects                                        |
// +---+-----------------------------------------------+
// |abc|[{"name":"file","type":"sample","code":"123"}] |
// |abc|[{"name":"image","type":"sample","code":"456"}]|
// |xyz|[{"name":"doc","type":"sample","code":"707"}]  |
// +---+-----------------------------------------------+

df.groupBy($"key").agg(collect_list($"objects"(0)).as("objects")).
  show(false)
// +---+---------------------------------------------------------------------------------------------+
// |key|objects                                                                                      |
// +---+---------------------------------------------------------------------------------------------+
// |xyz|[{"name":"doc","type":"sample","code":"707"}]                                                |
// |abc|[{"name":"file","type":"sample","code":"123"}, {"name":"image","type":"sample","code":"456"}]|
// +---+---------------------------------------------------------------------------------------------+
Sign up to request clarification or add additional context in comments.

5 Comments

Thanks Leo! This worked for me with a minor edit. (collect_list($"objects"(0)).as("objects")) fails with "org.apache.spark.sql.AnalysisException: Field name should be String Literal, but it's 0;". I got it working without specifying the index since anyways the objects column will have only one element and the index wasn't required.
@bytecode, not sure why $"objects"(0) doesn't work in your environment. I would suggest that you try $"objects".getItem(0). collect_list on the entire Array column will result in a nested Array of WrappedArray, rather than an Array of JSON strings as shown in your expected result dataset.
Explicitly calling getItem on $"objects" works. Not sure why the shorthand didn't work.
BTW, is there any way to do this without the aggregate function?
Since your requirement is to aggregate elements of the objects array per key, I don't see any way not to apply certain aggregate function.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.