I am using Spark 2.2 version and Scala as a programming language.
Input data:
{"amount":"2.00","cal_group":[{}],"set_id":7057}
{"amount":"1.00","cal_group":[{}],"set_id":7057}
{"amount":"7.00","cal_group": [{"abc_cd":"abc00160","abc_cnt":6.0,"cde_cnt":7.0},{"abc_cd":"abc00160","abc_cnt":5.0,"cde_cnt":2.0},{"abc_cd":"abc00249","abc_cnt":0.0,"cde_cnt":1.0}],"set_id":7057}
Input dataframe:
[2.00,WrappedArray([null,null,null]),7057]
[1.00,WrappedArray([null,null,null]),7057]
[7.00,WrappedArray([abc00160,6.0,7.0],[abc00160,5.0,2.0,],[abc00249,0.0,1.0]),7057]
Input data schema:
|-- amount: string (nullable = true)
|-- cal_group: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- abc_cd: string (nullable = true)
| | |-- abc_cnt: double (nullable = true)
| | |-- cde_cnt: double (nullable = true)
|--set_id: double
Note: Each wrapped array is a struct that contains abc_cd and some 2 other measures columns.
I want to do two level of aggregation on input data. It is mention as Step 1 and Step 2.
Step 1:
We need to get the sum of amount for each set_id and remove nulls while doing collect_list for cal_group
I have tried below code:
val res1=res.groupBy($"set_id").agg(sum($"amount").as('amount_total),collect_list(struct($"cal_group")).as('finalgroup))
It is giving me sum amount as expected. But here I don't know how to skip null WrappedArray cal_group column.
Output: step 1
[7057,10.00,WrappedArray([WrappedArray([null,null,null])],[WrappedArray([null,null,null])],[WrappedArray([null,null,null])],[WrappedArray([abc00160,6.0,7.0],[abc00160,5.0,2.0],[abc00249,0.0,1.0])])
Step 2:
Then I want aggregate 2 measures(abc_cnt, cde_cnt) at abc_cd code level.
Here this aggregation can be done by explode function on cal_group column. It will convert cal_group records at row level, It will increase rows/volume of data.
So, I tried exploding the struct and did group by on abc_cd.
Sample code if use explode function to do sum:
val res2 = res1.select($"set_id",$"amount_total",explode($"cal_group").as("cal_group"))
val res1 = res2.select($"set_id",$"amount_total",$"cal_group")
.groupBy($"set_id",$"cal_group.abc_cd")
.agg(sum($"cal_group.abc_cnt").as('abc_cnt_sum),
sum($"cal_group.cde_cnt").as('cde_cnt_sum),
)
So here, I don't want to explode the col_group column. as it is increasing the volume.
Output expected after Step 2:
[7057,10.00,WrappedArray(**[WrappedArray([null,null,null])],
[WrappedArray([null,null,null])],
[WrappedArray([null,null,null])],
[WrappedArray([abc00160,11.0,9.0],
[abc00249,0.0,1.0])])
Is there any option available, where the function should aggregate at record level and remove the null struct before collecting.
Thanks in advance.