2

I'm trying to do a transitive closure over dataframes. After several iterations I get some internal spark exception. Any ideas on what causes it and how to solve it. Here is my program:

val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), (17, 18), (18, 19))
var edges = e.map(p => Edge(p._1, p._2)).toDF()
var filtered = edges
  .filter("start = 1")
  .distinct()
  .withColumnRenamed("start", "fStart")
  .withColumnRenamed("end", "fEnd")

var i = 0
while (i < 30) {
  i = i + 1
  println("\n i = " + i)
  filtered = filtered
    .join(edges, filtered("fEnd") === edges("start"))
    .select(filtered("fStart"), edges("end"))
    .withColumnRenamed("start", "fStart")
    .withColumnRenamed("end", "fEnd").distinct
  filtered.show
}

It requires a simple case class to be defined at the top level:

case class Edge(start: Int, end: Int)

And here is the output with exception after which the spark hang for a while and then exits with an error Executor heartbeat timed out.

 i = 1
+------+----+
|fStart|fEnd|
+------+----+
|     1|   4|
+------+----+


 i = 2
+------+----+
|fStart|fEnd|
+------+----+
|     1|   5|
+------+----+


 i = 3
+------+----+
|fStart|fEnd|
+------+----+
|     1|   6|
+------+----+
...

 i = 10
+------+----+
|fStart|fEnd|
+------+----+
|     1|  13|
+------+----+


 i = 11
16/01/29 00:28:59 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field    org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
... 32 more
+------+----+
|fStart|fEnd|
+------+----+
|     1|  14|
+------+----+
...

PS1. How can such join be done without column renaming? PS2. Also is there some documentation on using data frames this way? The API documentation is very minimalistic.

1
  • 1
    try adding cache at the end of each iteration: filtered.cache() before using show Commented Jan 29, 2016 at 9:47

1 Answer 1

1

These error seems to be coming only when the resources of cluster are not enough to fulfill the request and backlog is increasing and after some time these kind error appears.

To solve your problem add filtered.cache just before filtered.show.

Also after 16th Iteration there will be no results as there will no matches of the filtered.fEnd === edges.start.

Sign up to request clarification or add additional context in comments.

3 Comments

Indeed adding filtered.cache helped (thank you). I know that after some iterations the program loops without doing anything, it was only to show this error. What I don't understand is how the resources can be lacking. This is extremely simple example with some ONE ROW data frames and few empty ones. Also why doing cache here helped?
Also after I added the filtered.cache and run the loop for more iterations I can see that each iteration takes longer. Why is that? After a while they produce the same empty data frame. PS. @Sumit could you please point me to some docs or youtube video.
DAG can guide you the exact reasons but it seems like that with every increase in iteration, it will also process previous iterations. for example with i=2, it will process the data of i=1 and then will process data for i=2. Remember Spark persist the transformations which needs to be applied to the data (Data lineage) and not the results/ output of transformations. For persisting the output you specifically need to invoke RDD.cache.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.