0

Hy, I have a Spark DataFrame and I made some transformation using SQL context, for example, select only two Columns in all data.

df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")

but now I want transform this sqlcontext a pandas dataframe, and I'm using

pddf = df_oraAS.toPandas()

but the output stop here and I need restart the IDE (spyder)

6/01/22 16:04:01 INFO DAGScheduler: Got job 0 (toPandas at <stdin>:1) with 3 output partitions
16/01/22 16:04:01 INFO DAGScheduler: Final stage: ResultStage 0 (toPandas at <stdin>:1)
16/01/22 16:04:01 INFO DAGScheduler: Parents of final stage: List()
16/01/22 16:04:01 INFO DAGScheduler: Missing parents: List()
16/01/22 16:04:01 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1), which has no missing parents
16/01/22 16:04:01 INFO SparkContext: Starting job: toPandas at <stdin>:1
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 9.4 KB, free 9.4 KB)
16/01/22 16:04:01 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.9 KB, free 14.3 KB)
16/01/22 16:04:01 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:50877 (size: 4.9 KB, free: 511.1 MB)
16/01/22 16:04:01 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
16/01/22 16:04:01 INFO DAGScheduler: Submitting 3 missing tasks from ResultStage 0 (MapPartitionsRDD[7] at toPandas at <stdin>:1)
16/01/22 16:04:01 INFO TaskSchedulerImpl: Adding task set 0.0 with 3 tasks
16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.
16/01/22 16:04:02 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,PROCESS_LOCAL, 119523958 bytes)
16/01/22 16:04:03 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, partition 1,PROCESS_LOCAL, 117876401 bytes)
Exception in thread "dispatcher-event-loop-3" java.lang.OutOfMemoryError: Java heap space
    at java.util.Arrays.copyOf(Unknown Source)
    at java.io.ByteArrayOutputStream.grow(Unknown Source)
    at java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
    at java.io.ByteArrayOutputStream.write(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown Source)
    at java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown Source)
    at java.io.ObjectOutputStream.writeObject0(Unknown Source)
    at java.io.ObjectOutputStream.writeObject(Unknown Source)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
    at org.apache.spark.scheduler.Task$.serializeWithDependencies(Task.scala:200)
    at org.apache.spark.scheduler.TaskSetManager.resourceOffer(TaskSetManager.scala:462)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet$1.apply$mcVI$sp(TaskSchedulerImpl.scala:252)
    at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
    at org.apache.spark.scheduler.TaskSchedulerImpl.org$apache$spark$scheduler$TaskSchedulerImpl$$resourceOfferSingleTaskSet(TaskSchedulerImpl.scala:247)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:317)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3$$anonfun$apply$8.apply(TaskSchedulerImpl.scala:315)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
    at org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$resourceOffers$3.apply(TaskSchedulerImpl.scala:315)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.TaskSchedulerImpl.resourceOffers(TaskSchedulerImpl.scala:315)
    at org.apache.spark.scheduler.local.LocalEndpoint.reviveOffers(LocalBackend.scala:84)
    at org.apache.spark.scheduler.local.LocalEndpoint$$anonfun$receive$1.applyOrElse(LocalBackend.scala:63)
    at org.apache.spark.rpc.netty.Inbox$$anonfun$process$1.apply$mcV$sp(Inbox.scala:116)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:204)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.Dispatcher$MessageLoop.run(Dispatcher.scala:215)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)

What I did wrong? thanks

EDIT: more completed: I load the date from Oracle Database (cx_Oracle) and put the data in a pandas dataframe

df_ora = pd.read_sql('SELECT* FROM DEC_CLIENTES', con=connection) 

Next I created a sparkContext to manipulate the dataframe

sqlContext = SQLContext(sc)

df_oraAS = sqlContext.createDataFrame(df_ora)

df_oraAS.registerTempTable("df_oraAS")

df_oraAS = sqlContext.sql("SELECT ENT_EMAIL,MES_ART_ID FROM df_oraAS LIMIT 5 ")

and I want convert again from sqlcontext to a pandas dataframe

 pddf = df_oraAS.toPandas() 

2 Answers 2

3

toPandas is basically collect in disguise. An output is a local Pandas DataFrame. If data doesn't fit into driver memory it will simply fail hence the error you see.

Sign up to request clarification or add additional context in comments.

9 Comments

Could you edit the question? It will be easier to find.
Unfortunately @Kardu like zero323 said, if you don't have enough space you'll get this error while collect whether your original tables has 2 or 1 column...
@eliasah thanks for your comment, but I have a Machine with 80Gb RAM Memory and i'm using only 14Gb ...
@eliasah maybe the spark cannot use all machine memory? How can I know?
Could you double check number of records in your data? And correct code in the question. The last line should be different, shouldn't it?
|
0

Your pd.read_sql call reads the full database into a pandas dataframe. This is local to the driver. When you call createDataFrame, it then creates a Spark DataFrame from your python pandas dataframe, which results in a really large task size (see the log line below):

16/01/22 16:04:02 WARN TaskSetManager: Stage 0 contains a task of very large size (116722 KB). The maximum recommended task size is 100 KB.

Even though you are selecting only 5 rows, you're actually first loading the full database into memory using that pd.read_sql call. If you're reading from an Oracle SQL database, why don't you use the spark JDBC driver and then perform your select filters and then call toPandas?

What your code is doing is reading the whole DB to pandas, writing to Spark, filtering and reading back to Pandas.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.