4

How can I transform data like below in order to store data in ElasticSearch?

Here is a dataset of a bean that I would aggregate by product into a JSON array.

List<Bean> data = new ArrayList<Bean>();
data.add(new Bean("book","John",59));
data.add(new Bean("book","Björn",61));
data.add(new Bean("tv","Roger",36));
Dataset ds = spark.createDataFrame(data, Bean.class);

ds.show(false);

+------+-------+---------+
|amount|product|purchaser|
+------+-------+---------+
|59    |book   |John     |
|61    |book   |Björn    |
|36    |tv     |Roger    |
+------+-------+---------+


ds = ds.groupBy(col("product")).agg(collect_list(map(ds.col("purchaser"),ds.col("amount")).as("map")));
ds.show(false);

+-------+---------------------------------------------+
|product|collect_list(map(purchaser, amount) AS `map`)|
+-------+---------------------------------------------+
|tv     |[[Roger -> 36]]                              |
|book   |[[John -> 59], [Björn -> 61]]                |
+-------+---------------------------------------------+

This is what I want to transform it into:

+-------+------------------------------------------------------------------+
|product|json                                                              |
+-------+------------------------------------------------------------------+
|tv     |[{purchaser: "Roger", amount:36}]                                 |
|book   |[{purchaser: "John", amount:36}, {purchaser: "Björn", amount:61}] |
+-------+------------------------------------------------------------------+
4
  • 1
    First use to_json and then collect_list Commented Mar 16, 2018 at 9:09
  • 3
    Possible duplicate of Spark Row to JSON Commented Mar 16, 2018 at 9:11
  • Can you show me how you will do that ? Your link about possible duplicate doesn't cover the aggregation functionnality into a json array. Commented Mar 16, 2018 at 9:17
  • Great ! thanks a lot for your help philantrovert ! Commented Mar 16, 2018 at 9:31

1 Answer 1

9

The solution :

ds.groupBy(col("product"))
  .agg(collect_list(to_json(struct(col("purchaser"), col("amount"))).alias("json")));
Sign up to request clarification or add additional context in comments.

2 Comments

feel free to accept your own answer if it solves the question.
I added several more columns and this worked for me, but I found that to get the 'alias' working I had to shift it to the collect_list not the to_json function, so for me it was like ds.groupBy(col("product")).agg(collect_list(to_json(struct(col("purchaser"), col("amount")))).alias("json"));

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.