The SQL in question:
WITH
first_cte AS (
SELECT
s.two_id,
s.d_month,
COUNT(*) AS total,
COUNT(DISTINCT one_id) AS unique_one_id_count
FROM iceberg_db.tab s
GROUP BY s.two_id, s.d_month
)
select *
from first_cte
I run it in glue on g8x and 32 workers and it takes long time. The data is about 300 million records which generate 700k records. The problem is that the data is skewed as there as a few two_id with many one_id while most of the two_id doesn't have many. Resulting that most of the tasks waiting on a few.
I was reading about a few techniques to solve this issue. Salting, Storage Partition Join, Broadcast, explode/COLLECT_SET/flattrn technique, and others. SPJ and broadcast are not relevant as they relate to join (although I did try the spark conf and hints). explode/COLLECT_SET/flattrn requires all unique values to be in the memory so I dropped it. which leaves salting, since I am doing count distinct I can't do it with RAND as I will lose the unique count (I can only get APPOX), so it leaves me with the following solution:
WITH
first_cte AS (
SELECT
s.one_id,
s.two_id,
s.d_month,
COUNT(*) AS total
FROM iceberg_db.tab s
GROUP BY s.one_id, s.two_id, s.d_month
),
second_cte AS (
SELECT
s.two_id,
s.d_month,
SUM(s.total) AS total,
COUNT(*) AS unique_one_id_count,
FROM first_cte s
GROUP BY s.two_id, s.d_month
)
select *
from second_cte
I get a much better performance like this. However, I do wonder if I can get even better performance as the data skew issue did not go away, it just became smaller as we still have some tasks that take longer as s.one_id have much more data for some s.two_id than others.