Here is minimal example using default data in DataBricks (Spark 3.4):
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
sc.setCheckpointDir("dbfs:/tmp/rdd-checkpoints")
var df = spark.read
.option("header", "true") // Use first line as headers
.option("inferSchema", "true") // Infer data types
.csv("/databricks-datasets/airlines/part-00000")
.toDF()
df = df.limit(100)
val left = df.alias("left")
val right = df.alias("right")
val joined = left.join(right, $"left.UniqueCarrier" === $"right.UniqueCarrier")
.dropDuplicates(Seq("UniqueCarrier"))
.select($"left.UniqueCarrier")
var finalDF = joined
// Join joined with itself
for (i <- 1 to 5) {
finalDF = finalDF.join(finalDF, Seq("UniqueCarrier"))
println("Doing checkpoint")
finalDF = finalDF.checkpoint()
}
finalDF.explain("cost")
It will output:
== Optimized Logical Plan ==
LogicalRDD [UniqueCarrier#373], false, Statistics(sizeInBytes=2.80E+129 B, rowCount=1.00E+128)
That is huge estimation, despite finalDF.count() would yield 1, as there is only 1 record in data set.
If you use 50 iterations, instead of 5, job will stuck at ~20-25 iterations and sometimes if you wait long enough will fail due to bigInt.reportOverflow exception.
Such huge stats will lead to slowed application + can cause bigInt overflow exceptions in the worst case.
So the question is:
Checkpoint is usually recommended to truncate lineage, however it will create LogicalRDD w/o column stats. In case you have big iterative algorithm in Scala Spark, what is recommended way to force recalculate statistics after checkpoint?