3

I am running below code on spark 3.0.1 (CDP cluster):

enter image description here

The autoBroadcastJoinThreshold is on default 10 MiB and AQE is disabled

enter image description here

enter image description here

On DataFrame.explain() , it selected SortMergeJoin as expected:

enter image description here

If I call DataFrame.show() it goes for SortMergeJoin while DataFrame.count() is going for BroadcastHashJoin.

df.show() --> SortMergeJoin:

enter image description here

df.count() --> BrodcastHashJoin:

enter image description here

I understood that it is because , df.count() uses only the join key column for projection in the first stage and so the size of data from that key column alone is under 10 MiB. So it went for BroadCastHashJoin.

enter image description here

But What I am not understanding is how it does that ?

As I understood spark uses runtime statistics only when AQE is enabled otherwise Catalyst optimizer uses input file size to identify the Join strategy while preparing the physical plan.

Does .count() recalculates the size like this after reading the file at runtime ?

3
  • @RobertKossendey : I have already posted it in my question. Check it out after this statement "On DataFrame.explain() , it selected SortMergeJoin as expected:" It is having optimized plan with cost analysis from Statistics class of spark as well as the Physical plan selected by catalyst pipeline Commented Nov 14, 2022 at 19:53
  • @RobertKossendey : I don't have such a plan . Not sure how can I generate separate plans based on different actions from same DataFrame !! All I have is the plan chosen by catalyst optimizer for the join which is the plan with "SortMergeJoin" . Calling .show() chosen the same plan while .count() has chosen BroadcastHashJoin for that . I have pasted snaps of the executed SQL DAGs for both and you can see in the pic for .count(), it has chosen the BroadcastHashJoin Commented Nov 14, 2022 at 21:20
  • Thats right, you can't call explain after an action because result of an action is no longer a dataframe (it will be long in case of count). I think that you can simulate this second scenario with broadcast by modifying your right side of join. Instead of passing whole data set just add .select("order_id") similarly as it is done in case of count Commented Nov 14, 2022 at 21:27

1 Answer 1

2

I think that you are right and AQE is not doing anything in here and the magic i happening during initial planning.

In Catalyst statistics are propagated from leaf node to other nodes. In your case the leafs are Json Files Scans.

Lets take a look at them in Spark source code.

Your leaf is going to be evaluated into JsonScan, so its this code: JsonScan in Spark repo

JsonFileScan extends TextBasedFileScan which extends FileScan - and in this class we can find function which is estimating your statistics which are later used during physical planning

Spark source code

override def estimateStatistics(): Statistics = {
    new Statistics {
      override def sizeInBytes(): OptionalLong = {
        val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor
        val size = (compressionFactor * fileIndex.sizeInBytes /
          (dataSchema.defaultSize + fileIndex.partitionSchema.defaultSize) *
          (readDataSchema.defaultSize + readPartitionSchema.defaultSize)).toLong

        OptionalLong.of(size)
      }

      override def numRows(): OptionalLong = OptionalLong.empty()
    }
  }

As you can see numRows is empty but sizeInBytes is calculated based on fileIndex and few other variables. One of this variables is readDataSchema.defaultSize. In case of count instead of show Spark is going to read only one column from second file, and that's why statistics are going to be lower and later Catalyst is choosing broadcastHashJoin instead of SMJ - because based on statistics it will be able to broadcast one side of your join.

In source code you can also find how Spark is choosing side which will be broadcasted and how it is comparing size from statistics with spark parameters:

You dont have hint so Spark is going to execute this SparkStrategies.scala:

 def createJoinWithoutHint() = {
          createBroadcastHashJoin(false)
            .orElse(createShuffleHashJoin(false))
            .orElse(createSortMergeJoin())
            .orElse(createCartesianProduct())
            .getOrElse {
              // This join could be very slow or OOM
              val buildSide = getSmallerSide(left, right)
              Seq(joins.BroadcastNestedLoopJoinExec(
                planLater(left), planLater(right), buildSide, joinType, j.condition))
            }
        }

At first it will try to createBroadcast join, here it will check if it can broadcast one side of join.

During this check Spark is going to execute below check first for left side and then for right if left is to big for broadcast Joins.Scala:

def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
    val autoBroadcastJoinThreshold = if (plan.stats.isRuntime) {
      conf.getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD)
        .getOrElse(conf.autoBroadcastJoinThreshold)
    } else {
      conf.autoBroadcastJoinThreshold
    }
    plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= autoBroadcastJoinThreshold
  }

Here most probably for on side of your join plan.stats.sizeInBytes will be filled with values propagated from JsonFileScan which are smaller than parameter and broadcastHashJoin is going to be used in physical plan

Sign up to request clarification or add additional context in comments.

8 Comments

Some areas not clear. While Stastics calculation , it is actually reading all data , and then on the final part of the stage only , it is doing some column pruning at the time of projection. Also since it is a CSV file , there is no way it can read a column alone, since it is not a column store format. So After the initial read only , the replanning is happening. What I am surprised , as per documentation , once a plan is selected , calling an action will submit that plan to DAGScheduler unless AQE is enabled !! Then how it is doing a replanning ? Or Column Projection can change the plan?
As you said , I can reproduce BHJ plan with only join key column selected for left DF. so which means dataSchema.defaultSize is re-defined after deciding the column projection for .count() , since spark knows , for a .count() needs no other column but only the id column for join. So the catalyst optimizer is switching from SMJ to BHJ . Though its a kind of runtime decision. Interesting !!
"...So the catalyst optimizer is switching from SMJ to BHJ . Though its a kind of runtime decision. Interesting !!" - column prunning and filter pushdown is done during logical plan optimization, based on which physical plan is created in later stage of query planning. So imo when Spark is doing physical planning he already knows that for one branch he needs only one column and what schema he needs to read te source. My opinion is, that there is no replanning and catalyst is not switiching join in runtime - that is decision made initialy during physical planning based on statistics
"Also since it is a CSV file , there is no way it can read a column alone, since it is not a column store format." - in your example you were using json file. But tbh it doesn't matter. In general you dont need column store format to read only one column. In case of csv Spark is still going to iterate over all rows but only one field instead of all of them is going to be loaded into memory and thats why the dataset is smaller. In case of columnar store you read only data from your column, thats true, but it doesnt mean that in case of other formats you cannot do some optimizations on load
Agree. What I mentioned was , file scan is iterating through all rows for both JSON files first. Then after projection only it is deciding the join strategy based on schema default size. But if it was columnar , it doesn't require to scan rows , instead it can directly scan the required column alone. Anyway , got some clarity now !! Thanks for your valuable time :)
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.