0

The SQL in question:

WITH
first_cte AS (
    SELECT
        s.two_id,
        s.d_month,
        COUNT(*)               AS total,
        COUNT(DISTINCT one_id) AS unique_one_id_count 
    FROM iceberg_db.tab s
    GROUP BY s.two_id, s.d_month
)
select * 
from first_cte

I run it in glue on g8x and 32 workers and it takes long time. The data is about 300 million records which generate 700k records. The problem is that the data is skewed as there as a few two_id with many one_id while most of the two_id doesn't have many. Resulting that most of the tasks waiting on a few.

I was reading about a few techniques to solve this issue. Salting, Storage Partition Join, Broadcast, explode/COLLECT_SET/flattrn technique, and others. SPJ and broadcast are not relevant as they relate to join (although I did try the spark conf and hints). explode/COLLECT_SET/flattrn requires all unique values to be in the memory so I dropped it. which leaves salting, since I am doing count distinct I can't do it with RAND as I will lose the unique count (I can only get APPOX), so it leaves me with the following solution:

WITH
first_cte AS (
    SELECT
        s.one_id,
        s.two_id,
        s.d_month,
        COUNT(*)        AS total
    FROM iceberg_db.tab s
    GROUP BY s.one_id, s.two_id, s.d_month
),
second_cte AS (
    SELECT
        s.two_id, 
        s.d_month,
        SUM(s.total) AS total,
        COUNT(*)     AS unique_one_id_count,
    FROM first_cte s
    GROUP BY s.two_id, s.d_month
)
select * 
from second_cte

I get a much better performance like this. However, I do wonder if I can get even better performance as the data skew issue did not go away, it just became smaller as we still have some tasks that take longer as s.one_id have much more data for some s.two_id than others.

1 Answer 1

0

I think the multi-step aggregation makes sense, you can try to use a salting column instead of s.one_id to have more control over the number of partitions to use per s.two_id

WITH
raw_data AS (
  SELECT
    s.one_id,
    s.two_id,
    FLOOR(RAND(123456)*<NUMBER_OF_BUCKETS>)) as salted_key,
    s.d_month
  FROM iceberg_db.tab s
),
first_cte AS (
  SELECT
    one_id,
    two_id,
    salted_key,
    d_month,
    COUNT(*) AS total
  FROM raw_data
  GROUP BY one_id, two_id, salted_key, d_month
),
second_cte AS (
  SELECT
    s.two_id, 
    s.d_month,
    SUM(s.total) AS total,
    COUNT(DISTINCT one_id)     AS unique_one_id_count,
  FROM first_cte s
  GROUP BY two_id, d_month
)
select * 
from second_cte

Here you can control the number of buckets per key using NUMBER_OF_BUCKETS, so if 100 is selected for example, then for each key data will be partitioned into 100 bucket, but keep in mind that too much buckets per key can hurt the performance of other keys with low data, so try different numbers relevant to your data

Sign up to request clarification or add additional context in comments.

1 Comment

you are right, then I'd add the key as (one_id, two_id, salted_key, month), which will partition the data under one_id and two_id, while keeping the one_id info. I have updated the answer

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.