1

Here is minimal example using default data in DataBricks (Spark 3.4):

import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._

sc.setCheckpointDir("dbfs:/tmp/rdd-checkpoints")

var df = spark.read
  .option("header", "true")        // Use first line as headers
  .option("inferSchema", "true")   // Infer data types
  .csv("/databricks-datasets/airlines/part-00000")
  .toDF()

  df = df.limit(100)

val left = df.alias("left")
val right = df.alias("right")

val joined = left.join(right, $"left.UniqueCarrier" === $"right.UniqueCarrier")
  .dropDuplicates(Seq("UniqueCarrier"))
  .select($"left.UniqueCarrier")

var finalDF = joined
// Join joined with itself
for (i <- 1 to 5) {
    finalDF = finalDF.join(finalDF, Seq("UniqueCarrier"))
    println("Doing checkpoint")
    finalDF = finalDF.checkpoint()
  }

finalDF.explain("cost")

It will output:

== Optimized Logical Plan ==
LogicalRDD [UniqueCarrier#373], false, Statistics(sizeInBytes=2.80E+129 B, rowCount=1.00E+128)

That is huge estimation, despite finalDF.count() would yield 1, as there is only 1 record in data set.

If you use 50 iterations, instead of 5, job will stuck at ~20-25 iterations and sometimes if you wait long enough will fail due to bigInt.reportOverflow exception.

Such huge stats will lead to slowed application + can cause bigInt overflow exceptions in the worst case.


So the question is:
Checkpoint is usually recommended to truncate lineage, however it will create LogicalRDD w/o column stats. In case you have big iterative algorithm in Scala Spark, what is recommended way to force recalculate statistics after checkpoint?

1 Answer 1

0

Little activity here on SO. Great question that most are not aware of.

From a discussion and certifying, a checkpoint creates a logicalRDD is materialized with no stats. E.g. rowcount is not known.

  1. So. The common approach known is checkpoints. With the drama you observe sometimes occurring.

  2. The other is without checkpointing. You need to read initially, and join and write out to disk and read again, repeating with a loop. Use DELTA and the read back will give CBO Catalyst planning sensible stats; the plans get these and they are associated with the DF.

A tempView can be analyzed but those stats are not available to DF that was the basis for tempView.

Sign up to request clarification or add additional context in comments.

2 Comments

In our production system we started to see this issue with Spark3.4 Funny thing is that in Spark3.3 after checkpoint LogicalRDD would reset to 8 EiB and we never seen overflow. But in Spark3.4 after checkpoint LogicalRDD maintains previous stats and in iterative algorithms it will explode. So Spark 3.3 behavior would allow us to have big jobs w/o forcing us to write to disk and read back with ANALYZE TABLE
If you use delta then analyze nig needed. Oddities of spark and software dev.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.