I think that you are right and AQE is not doing anything in here and the magic i happening during initial planning.
In Catalyst statistics are propagated from leaf node to other nodes. In your case the leafs are Json Files Scans.
Lets take a look at them in Spark source code.
Your leaf is going to be evaluated into JsonScan, so its this code: JsonScan in Spark repo
JsonFileScan extends TextBasedFileScan which extends FileScan - and in this class we can find function which is estimating your statistics which are later used during physical planning
Spark source code
override def estimateStatistics(): Statistics = {
new Statistics {
override def sizeInBytes(): OptionalLong = {
val compressionFactor = sparkSession.sessionState.conf.fileCompressionFactor
val size = (compressionFactor * fileIndex.sizeInBytes /
(dataSchema.defaultSize + fileIndex.partitionSchema.defaultSize) *
(readDataSchema.defaultSize + readPartitionSchema.defaultSize)).toLong
OptionalLong.of(size)
}
override def numRows(): OptionalLong = OptionalLong.empty()
}
}
As you can see numRows is empty but sizeInBytes is calculated based on fileIndex and few other variables. One of this variables is readDataSchema.defaultSize. In case of count instead of show Spark is going to read only one column from second file, and that's why statistics are going to be lower and later Catalyst is choosing broadcastHashJoin instead of SMJ - because based on statistics it will be able to broadcast one side of your join.
In source code you can also find how Spark is choosing side which will be broadcasted and how it is comparing size from statistics with spark parameters:
You dont have hint so Spark is going to execute this SparkStrategies.scala:
def createJoinWithoutHint() = {
createBroadcastHashJoin(false)
.orElse(createShuffleHashJoin(false))
.orElse(createSortMergeJoin())
.orElse(createCartesianProduct())
.getOrElse {
// This join could be very slow or OOM
val buildSide = getSmallerSide(left, right)
Seq(joins.BroadcastNestedLoopJoinExec(
planLater(left), planLater(right), buildSide, joinType, j.condition))
}
}
At first it will try to createBroadcast join, here it will check if it can broadcast one side of join.
During this check Spark is going to execute below check first for left side and then for right if left is to big for broadcast Joins.Scala:
def canBroadcastBySize(plan: LogicalPlan, conf: SQLConf): Boolean = {
val autoBroadcastJoinThreshold = if (plan.stats.isRuntime) {
conf.getConf(SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD)
.getOrElse(conf.autoBroadcastJoinThreshold)
} else {
conf.autoBroadcastJoinThreshold
}
plan.stats.sizeInBytes >= 0 && plan.stats.sizeInBytes <= autoBroadcastJoinThreshold
}
Here most probably for on side of your join plan.stats.sizeInBytes will be filled with values propagated from JsonFileScan which are smaller than parameter and broadcastHashJoin is going to be used in physical plan