I'm trying to do a transitive closure over dataframes. After several iterations I get some internal spark exception. Any ideas on what causes it and how to solve it. Here is my program:
val e = Seq((1, 2), (1, 3), (2, 4), (4, 5), (5, 6), (6, 7), (7, 8), (8, 9), (9, 10), (10, 11), (11, 12), (12, 13), (13, 14), (14, 15), (15, 16), (16, 17), (17, 18), (18, 19))
var edges = e.map(p => Edge(p._1, p._2)).toDF()
var filtered = edges
.filter("start = 1")
.distinct()
.withColumnRenamed("start", "fStart")
.withColumnRenamed("end", "fEnd")
var i = 0
while (i < 30) {
i = i + 1
println("\n i = " + i)
filtered = filtered
.join(edges, filtered("fEnd") === edges("start"))
.select(filtered("fStart"), edges("end"))
.withColumnRenamed("start", "fStart")
.withColumnRenamed("end", "fEnd").distinct
filtered.show
}
It requires a simple case class to be defined at the top level:
case class Edge(start: Int, end: Int)
And here is the output with exception after which the spark hang for a while and then exits with an error Executor heartbeat timed out.
i = 1
+------+----+
|fStart|fEnd|
+------+----+
| 1| 4|
+------+----+
i = 2
+------+----+
|fStart|fEnd|
+------+----+
| 1| 5|
+------+----+
i = 3
+------+----+
|fStart|fEnd|
+------+----+
| 1| 6|
+------+----+
...
i = 10
+------+----+
|fStart|fEnd|
+------+----+
| 1| 13|
+------+----+
i = 11
16/01/29 00:28:59 ERROR Utils: Uncaught exception in thread driver-heartbeater
java.io.IOException: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1207)
at org.apache.spark.executor.TaskMetrics.readObject(TaskMetrics.scala:219)
at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.spark.util.Utils$.deserialize(Utils.scala:92)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:436)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1$$anonfun$apply$6.apply(Executor.scala:426)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:426)
at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$reportHeartBeat$1.apply(Executor.scala:424)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:424)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:468)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:468)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:468)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.HashMap$SerializationProxy to field org.apache.spark.executor.TaskMetrics._accumulatorUpdates of type scala.collection.immutable.Map in instance of org.apache.spark.executor.TaskMetrics
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2006)
at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
at org.apache.spark.executor.TaskMetrics$$anonfun$readObject$1.apply$mcV$sp(TaskMetrics.scala:220)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1204)
... 32 more
+------+----+
|fStart|fEnd|
+------+----+
| 1| 14|
+------+----+
...
PS1. How can such join be done without column renaming? PS2. Also is there some documentation on using data frames this way? The API documentation is very minimalistic.
filtered.cache()before usingshow