4

I have big numpy array. Its shape is (800,224,224,3), which means that there are images (224 * 244) with 3 channels. For distributed deep learning in Spark, I want to change 'numpy array' to 'spark dataframe'.

My method is:

  1. Changed numpy array to csv
  2. Loaded csv and make spark dataframe with 150528 columns (224*224*3)
  3. Use VectorAssembler to create a vector of all columns (features)
  4. Reshape the output of 3 but in the third step, I failed since computation might be too much high

In order to make a vector from this:

+------+------+
|col_1 | col_2|
+------+------+
|0.1434|0.1434|
|0.1434|0.1451|
|0.1434|0.1467|
|0.3046|0.3046|
|0.3046|0.3304|
|0.3249|0.3046|
|0.3249|0.3304|
|0.3258|0.3258|
|0.3258|0.3263|
|0.3258|0.3307|
+------+------+

to this:

+-------------+
|   feature   |
+-------------+
|0.1434,0.1434|
|0.1434,0.1451|
|0.1434,0.1467|
|0.3046,0.3046|
|0.3046,0.3304|
|0.3249,0.3046|
|0.3249,0.3304|
|0.3258,0.3258|
|0.3258,0.3263|
|0.3258,0.3307|
+-------------+

But the number of columns are really many...

I also tried to convert numpy array to rdd directly but I got 'out of memory' error. In single machine, my job works well with this numpy array.

1
  • 1
    Out of memory error, is it? Can you try setting the driver memory to whatever maximum you can give it? I use 6g and my laptop ram is 8gb. Commented Oct 24, 2017 at 4:09

3 Answers 3

5

You should be able to convert the numpy array directly to a Spark dataframe, without going through a csv file. You could try something like the below code:

from pyspark.ml.linalg import Vectors

num_rows = 800
arr = map(lambda x: (Vectors.dense(x), ), numpy_arr.reshape(num_rows, -1))
df = spark.createDataFrame(arr, ["features"])
Sign up to request clarification or add additional context in comments.

4 Comments

Hello, it gives me this error "TypeError: not supported type: <class 'numpy.ndarray'>"
@A.B: Try converting to tuples of vectors and see if it works, you can refer to: stackoverflow.com/questions/41328799/…
This answer does not actually work, given A.B.'s comment and testing.
@JohnStud: You were correct. It seems I didn't hear back from A.B whether using tuples worked and then I forgot about it. I tested it out and updated the answer. It should work now.
2

You can also do this, which I find most convenient:

import numpy as np
import pandas as pd
import pyspark

sc = pyspark.SparkContext()
sqlContext = SQLContext(sc)

array = np.linspace(0, 10)
df_spark = sqlContext.createDataFrame(pd.DataFrame(array))
df_spark.show()

The only downside is that pandas needs to be installed.

Comments

1

Increase worker memory from the default value of 1 GB using spark.executor.memory flag to resolve out of memory error if you are getting error in worker node otherwise if you are getting this error in driver then try increasing the driver memory as suggested by @pissall. Also, try to identify proper fraction of memory(spark.memory.fraction) to be used for keeping RDD in memory.

3 Comments

Does it matter tweaking the executor memory when working with spark locally? The executors are used when we have a cluster and multiple worker nodes, right? I suggested to try increasing the driver memory in this case. Help me if it works differently.
No, executor memory doesn't matter in case of local mode as both executors and driver run in same JVM process whose memory can be increased by setting driver memory. In question he says the job runs well in single machine, so I assumed he is working in cluster mode
Yes, I am woriking in cluster mode. Your answer also helped me a lot!! I am new to spark, especially pyspark and python. I am slow to make my project though, I think Im getting in. Thanks all!!!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.