3

I have read textFile using spark context, test file is a csv file. Below testRdd is the similar format as my rdd.

testRdd = [[1.0,2.0,3.0,4.0,5.0,6.0,7.0],
[0.0,0.1,0.3,0.4,0.5,0.6,0.7],[1.1,1.2,1.3,1.4,1.5,1.6,1.7]]

I want to convert the the above rdd into a numpy array, So I can feed the numpy array into my machine learning model.

when I tried the following

 feature_vector = numpy.array(testRDD).astype(numpy.float32)

It gives me the below TypeError:

TypeError: float() argument must be a string or a number

How Should I covert the spark rdd into a numpy array.

0

3 Answers 3

3

You'll have to collect the data to your local machine before calling numpy.array:

import numpy as np

a = np.array(testRdd.collect())
print(a)
#array([[ 1. ,  2. ,  3. ,  4. ,  5. ,  6. ,  7. ],
#       [ 0. ,  0.1,  0.3,  0.4,  0.5,  0.6,  0.7],
#       [ 1.1,  1.2,  1.3,  1.4,  1.5,  1.6,  1.7]])

Or if you want each row as a separate array:

b = testRdd.map(np.array).collect()
print(b)
#[array([ 1.,  2.,  3.,  4.,  5.,  6.,  7.]),
# array([ 0. ,  0.1,  0.3,  0.4,  0.5,  0.6,  0.7]),
# array([ 1.1,  1.2,  1.3,  1.4,  1.5,  1.6,  1.7])]
Sign up to request clarification or add additional context in comments.

5 Comments

Is there a way to do this without collect()?
It doesn't make sense to do it without collect - you can't have a spark dataframe with a column of numpy arrays. What are you trying to do?
I'm about to post this as a separate question - but I want to train a distributed deep learning model with horovod which doesn't accept dataframe or rdds, only nparrays. I want to write a udf which will use worker memory to convert the dataframe column to nparray in small batches. My starting point is a massive spark dataframe that cannot be collected in driver
@maverik in that case, do the conversion to numpy array in the udf. That's essentially what happens in testRdd.map(np.array) (I know that's an rdd, but you can make a parallel between rdd.map and a udf)
1

I had the same problem and collect() is not very efficient.

For me writing multiple numpy files on the executors worked pretty well, and loading multiple files using numpy is no problem. The number of resulting files equals the number of partitions.

In my case I had to put the files into hdfs as I have no access to the executor nodes:

from pyspark.sql.types import *
from pyspark.sql.functions import spark_partition_id

def write_numy(list):
  import numpy as np
  from pyspark.taskcontext import TaskContext
  import os
  
  ctx = TaskContext()
  id = ctx.partitionId()
  
  local_path = "/tmp/test"+str(id)+".npy"
  hdfs_dest_path = "/tmp/test/"
  np.save(local_path, np.array(list))   
  os.system("hadoop fs -put "+local_path+" "+hdfs_dest_path)

schema = StructType([ StructField("ID", IntegerType()), StructField("TS", ArrayType( IntegerType()) )])
data = spark.createDataFrame(spark.sparkContext.parallelize(range(1, 1999)).map(lambda x: (x, range(1, 100)) ),schema)

data.rdd.foreachPartition(write_numy)

1 Comment

This is a very good solution to allow spark to write numpy files in a distributed manner.
0

My best shot at this would be:

import pandas
arr = rdd.toDF().toPandas().values

The rdd needs to be converted to a dataframe, the dataframe rendered as pandas, and then the underlying values (numpy arrays) exposed from there.

Edit -- You said you don't like that so have you tried just doing a map? Like so:

arr = np.array()
rdd.map(lambda x: np.append(arr, x))

You should include everything you tried in your question.

2 Comments

I don't want convert rdd into dataframe, I tried this solution, it takes enormous amounts of time showing nothing in my machine. So, is there any other solution?
@VamsiNimmala I made an edit to try a different approach

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.