Skip to main content
deleted 947 characters in body
Source Link
ng.newbie
  • 3.3k
  • 6
  • 34
  • 74
orders: (3000 + rows)
order_line_items: (5000 + rows)
item_wmsitems: (14 million rows)
joined_df = (orders_df.alias("o")
             .join(orders_line_item_df.alias("oli"), F.col("o.order_id") == F.col("oli.order_id"), how="inner")
             .join(item_wms_dfitem_df.alias("iw"), F.col("oli.item_id") == F.col("iw.item_id"), how="inner")
             .filter(F.col("o.do_status").isin(["110"]))
)

display(joined_df.limit(100))
SELECT
        o.order_id,
        MIN(
            CASE
                WHEN iw.spl_instr_code_4code IN(
                    'L''A', 'D''B' 
                )
                     THEN
                    '1-''1' 
                    WHEN bill_facility_alias_idbill_id = '0000002290''2'
                    THEN '2-''2' 
                WHEN bill_facility_alias_idbill_id  IN ('0000000514','0000000963')
                    THEN '3-' 
                WHEN bill_facility_alias_id IN('0000468976','0000531509')
                    THEN '4-' 
                WHEN bill_facility_alias_id ='0000505550'
                    THEN '5-' 
                WHEN bill_facility_alias_id  IN('0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420',
                '0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362''14', '0000473708''63')
                    THEN '6-' ||'3' 'NFS'
                WHEN bill_facility_alias_id notbill_id IN
                ('0000002290','0000000514','0000000963','0000468976','0000004696','0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420','0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362','0000473708''76', '0000505550''09')
                    THEN '7-''4' 
                ELSE
                    '8-''5' 
            END
        ) new_priorityfoo
    FROM
        orders            o
        JOIN order_line_item   oli ON oli.order_id = o.order_id
        JOIN item_wmsitem          iw ON oli.item_id = iw.item_id
    WHERE
        o.do_statusstatus IN (
            '110''10'
        )
    GROUP BY
        o.order_id
orders: (3000 + rows)
order_line_items: (5000 + rows)
item_wms: (14 million rows)
joined_df = (orders_df.alias("o")
             .join(orders_line_item_df.alias("oli"), F.col("o.order_id") == F.col("oli.order_id"), how="inner")
             .join(item_wms_df.alias("iw"), F.col("oli.item_id") == F.col("iw.item_id"), how="inner")
             .filter(F.col("o.do_status").isin(["110"]))
)

display(joined_df.limit(100))
SELECT
        o.order_id,
        MIN(
            CASE
                WHEN iw.spl_instr_code_4 IN(
                    'L', 'D' 
                )
                     THEN
                    '1-' 
                    WHEN bill_facility_alias_id = '0000002290'
                    THEN '2-' 
                WHEN bill_facility_alias_id  IN ('0000000514','0000000963')
                    THEN '3-' 
                WHEN bill_facility_alias_id IN('0000468976','0000531509')
                    THEN '4-' 
                WHEN bill_facility_alias_id ='0000505550'
                    THEN '5-' 
                WHEN bill_facility_alias_id  IN('0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420',
                '0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362', '0000473708')
                    THEN '6-' || 'NFS'
                WHEN bill_facility_alias_id not IN
                ('0000002290','0000000514','0000000963','0000468976','0000004696','0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420','0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362','0000473708', '0000505550')
                    THEN '7-' 
                ELSE
                    '8-' 
            END
        ) new_priority
    FROM
        orders            o
        JOIN order_line_item   oli ON oli.order_id = o.order_id
        JOIN item_wms          iw ON oli.item_id = iw.item_id
    WHERE
        o.do_status IN (
            '110'
        )
    GROUP BY
        o.order_id
orders: (3000 + rows)
order_line_items: (5000 + rows)
items: (14 million rows)
joined_df = (orders_df.alias("o")
             .join(orders_line_item_df.alias("oli"), F.col("o.order_id") == F.col("oli.order_id"), how="inner")
             .join(item_df.alias("iw"), F.col("oli.item_id") == F.col("iw.item_id"), how="inner")
             .filter(F.col("o.do_status").isin(["110"]))
)

display(joined_df.limit(100))
SELECT
        o.order_id,
        MIN(
            CASE
                WHEN iw.code IN(
                    'A', 'B' 
                )
                     THEN
                    '1' 
                    WHEN bill_id = '2'
                    THEN '2' 
                WHEN bill_id  IN ('14','63')
                    THEN '3' 
                WHEN bill_id IN('76','09')
                    THEN '4' 
                ELSE
                    '5' 
            END
        ) foo
    FROM
        orders            o
        JOIN order_line_item   oli ON oli.order_id = o.order_id
        JOIN item          iw ON oli.item_id = iw.item_id
    WHERE
        o.status IN (
            '10'
        )
    GROUP BY
        o.order_id
Source Link
ng.newbie
  • 3.3k
  • 6
  • 34
  • 74

Need help understanding why Spark query takes longer to execute when GROUP BY is introduced

I have 3 tables in an Oracle database which I am trying to join and run some aggregates on :

orders: (3000 + rows)
order_line_items: (5000 + rows)
item_wms: (14 million rows)

When I run the following code in PySpark:

joined_df = (orders_df.alias("o")
             .join(orders_line_item_df.alias("oli"), F.col("o.order_id") == F.col("oli.order_id"), how="inner")
             .join(item_wms_df.alias("iw"), F.col("oli.item_id") == F.col("iw.item_id"), how="inner")
             .filter(F.col("o.do_status").isin(["110"]))
)

display(joined_df.limit(100))

It completes within 40 seconds and generates the following query plan:

== Physical Plan ==
AdaptiveSparkPlan (25)
+- == Final Plan ==
   ResultQueryStage (14), Statistics(sizeInBytes=981.6 KiB, rowCount=100, ColumnStat: N/A, isRuntime=true)
   +- CollectLimit (13)
      +- BroadcastHashJoin Inner BuildLeft (12)
         :- AQEShuffleRead (10)
         :  +- ShuffleQueryStage (9), Statistics(sizeInBytes=10.9 MiB, rowCount=2.01E+3, ColumnStat: N/A, isRuntime=true)
         :     +- Exchange (8)
         :        +- BroadcastHashJoin Inner BuildLeft (7)
         :           :- AQEShuffleRead (5)
         :           :  +- ShuffleQueryStage (4), Statistics(sizeInBytes=4.9 MiB, rowCount=1.35E+3, ColumnStat: N/A, isRuntime=true)
         :           :     +- Exchange (3)
         :           :        +- Filter (2)
         :           :           +- Scan JDBCRelation(orders) [numPartitions=1]  (1)
         :           +- Scan JDBCRelation(order_line_item) [numPartitions=1]  (6)
         +- Scan JDBCRelation(item_wms) [numPartitions=1]  (11)

But when I add a GROUP BY with a MIN aggregate the query does not finish:

SELECT
        o.order_id,
        MIN(
            CASE
                WHEN iw.spl_instr_code_4 IN(
                    'L', 'D' 
                )
                     THEN
                    '1-' 
                    WHEN bill_facility_alias_id = '0000002290'
                    THEN '2-' 
                WHEN bill_facility_alias_id  IN ('0000000514','0000000963')
                    THEN '3-' 
                WHEN bill_facility_alias_id IN('0000468976','0000531509')
                    THEN '4-' 
                WHEN bill_facility_alias_id ='0000505550'
                    THEN '5-' 
                WHEN bill_facility_alias_id  IN('0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420',
                '0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362', '0000473708')
                    THEN '6-' || 'NFS'
                WHEN bill_facility_alias_id not IN
                ('0000002290','0000000514','0000000963','0000468976','0000004696','0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420','0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362','0000473708', '0000505550')
                    THEN '7-' 
                ELSE
                    '8-' 
            END
        ) new_priority
    FROM
        orders            o
        JOIN order_line_item   oli ON oli.order_id = o.order_id
        JOIN item_wms          iw ON oli.item_id = iw.item_id
    WHERE
        o.do_status IN (
            '110'
        )
    GROUP BY
        o.order_id

The query plan generated is:

== Physical Plan ==
AdaptiveSparkPlan (41)
+- == Current Plan ==
   CollectLimit (22)
   +- SortAggregate (21)
      +- Sort (20)
         +- ShuffleQueryStage (19), Statistics(sizeInBytes=2.02E+22 B, ColumnStat: N/A)
            +- Exchange (18)
               +- SortAggregate (17)
                  +- * Sort (16)
                     +- * Project (15)
                        +- * BroadcastHashJoin Inner BuildLeft (14)
                           :- AQEShuffleRead (12)
                           :  +- ShuffleQueryStage (11), Statistics(sizeInBytes=94.3 KiB, rowCount=2.01E+3, ColumnStat: N/A, isRuntime=true)
                           :     +- Exchange (10)
                           :        +- * Project (9)
                           :           +- * BroadcastHashJoin Inner BuildLeft (8)
                           :              :- AQEShuffleRead (6)
                           :              :  +- ShuffleQueryStage (5), Statistics(sizeInBytes=52.8 KiB, rowCount=1.35E+3, ColumnStat: N/A, isRuntime=true)
                           :              :     +- Exchange (4)
                           :              :        +- * Project (3)
                           :              :           +- * Filter (2)
                           :              :              +- * Scan JDBCRelation(orders) [numPartitions=1]  (1)
                           :              +- * Scan JDBCRelation(order_line_item) [numPartitions=1]  (7)
                           +- * Scan JDBCRelation(item_wms) [numPartitions=1]  (13)

What my understanding is:

  • I think the reason why the first query finishes early is because it does not need to the full join. The CollectLimit actually limits the result and it can do the join only for the 100 rows and exit.
  • This is not the case for the second query. It needs to sort the entire table then aggregate on the order_id.

Am I correct ?

Further questions:

  • What does the various + symbols mean in the query plan? Does it mean it's a different stage or different job?
  • When I am reading the query plan how should I interpret when new job or stage is created ?

I am asking this as I am a beginner to Spark query optimization and need to know how to interpret a query plan.

Cluster info: 4 cores 32 GB Single Node.