I have 3 tables in an Oracle database which I am trying to join and run some aggregates on :
orders: (3000 + rows)
order_line_items: (5000 + rows)
item_wms: (14 million rows)
When I run the following code in PySpark:
joined_df = (orders_df.alias("o")
.join(orders_line_item_df.alias("oli"), F.col("o.order_id") == F.col("oli.order_id"), how="inner")
.join(item_wms_df.alias("iw"), F.col("oli.item_id") == F.col("iw.item_id"), how="inner")
.filter(F.col("o.do_status").isin(["110"]))
)
display(joined_df.limit(100))
It completes within 40 seconds and generates the following query plan:
== Physical Plan ==
AdaptiveSparkPlan (25)
+- == Final Plan ==
ResultQueryStage (14), Statistics(sizeInBytes=981.6 KiB, rowCount=100, ColumnStat: N/A, isRuntime=true)
+- CollectLimit (13)
+- BroadcastHashJoin Inner BuildLeft (12)
:- AQEShuffleRead (10)
: +- ShuffleQueryStage (9), Statistics(sizeInBytes=10.9 MiB, rowCount=2.01E+3, ColumnStat: N/A, isRuntime=true)
: +- Exchange (8)
: +- BroadcastHashJoin Inner BuildLeft (7)
: :- AQEShuffleRead (5)
: : +- ShuffleQueryStage (4), Statistics(sizeInBytes=4.9 MiB, rowCount=1.35E+3, ColumnStat: N/A, isRuntime=true)
: : +- Exchange (3)
: : +- Filter (2)
: : +- Scan JDBCRelation(orders) [numPartitions=1] (1)
: +- Scan JDBCRelation(order_line_item) [numPartitions=1] (6)
+- Scan JDBCRelation(item_wms) [numPartitions=1] (11)
But when I add a GROUP BY with a MIN aggregate the query does not finish:
SELECT
o.order_id,
MIN(
CASE
WHEN iw.spl_instr_code_4 IN(
'L', 'D'
)
THEN
'1-'
WHEN bill_facility_alias_id = '0000002290'
THEN '2-'
WHEN bill_facility_alias_id IN ('0000000514','0000000963')
THEN '3-'
WHEN bill_facility_alias_id IN('0000468976','0000531509')
THEN '4-'
WHEN bill_facility_alias_id ='0000505550'
THEN '5-'
WHEN bill_facility_alias_id IN('0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420',
'0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362', '0000473708')
THEN '6-' || 'NFS'
WHEN bill_facility_alias_id not IN
('0000002290','0000000514','0000000963','0000468976','0000004696','0000375347','0000004281','0000387830','0000435748','0000411534','0000212223','0000372420','0000397211','0000423621','0000001871','0000392981','0000283730','0000268523','0000451376','0000464362','0000473708', '0000505550')
THEN '7-'
ELSE
'8-'
END
) new_priority
FROM
orders o
JOIN order_line_item oli ON oli.order_id = o.order_id
JOIN item_wms iw ON oli.item_id = iw.item_id
WHERE
o.do_status IN (
'110'
)
GROUP BY
o.order_id
The query plan generated is:
== Physical Plan ==
AdaptiveSparkPlan (41)
+- == Current Plan ==
CollectLimit (22)
+- SortAggregate (21)
+- Sort (20)
+- ShuffleQueryStage (19), Statistics(sizeInBytes=2.02E+22 B, ColumnStat: N/A)
+- Exchange (18)
+- SortAggregate (17)
+- * Sort (16)
+- * Project (15)
+- * BroadcastHashJoin Inner BuildLeft (14)
:- AQEShuffleRead (12)
: +- ShuffleQueryStage (11), Statistics(sizeInBytes=94.3 KiB, rowCount=2.01E+3, ColumnStat: N/A, isRuntime=true)
: +- Exchange (10)
: +- * Project (9)
: +- * BroadcastHashJoin Inner BuildLeft (8)
: :- AQEShuffleRead (6)
: : +- ShuffleQueryStage (5), Statistics(sizeInBytes=52.8 KiB, rowCount=1.35E+3, ColumnStat: N/A, isRuntime=true)
: : +- Exchange (4)
: : +- * Project (3)
: : +- * Filter (2)
: : +- * Scan JDBCRelation(orders) [numPartitions=1] (1)
: +- * Scan JDBCRelation(order_line_item) [numPartitions=1] (7)
+- * Scan JDBCRelation(item_wms) [numPartitions=1] (13)
What my understanding is:
- I think the reason why the first query finishes early is because it does not need to the full join. The
CollectLimit actually limits the result and it can do the join only for the 100 rows and exit.
- This is not the case for the second query. It needs to sort the entire table then aggregate on the
order_id.
Am I correct ?
Further questions:
- What does the various
+ symbols mean in the query plan? Does it mean it's a different stage or different job?
- When I am reading the query plan how should I interpret when new job or stage is created ?
I am asking this as I am a beginner to Spark query optimization and need to know how to interpret a query plan.
Cluster info:
4 cores 32 GB Single Node.