19

I have a dataframe gi_man_df where group can be n:

+------------------+-----------------+--------+--------------+
|           group  |           number|rand_int|   rand_double|
+------------------+-----------------+--------+--------------+
|          'GI_MAN'|                7|       3|         124.2|
|          'GI_MAN'|                7|      10|        121.15|
|          'GI_MAN'|                7|      11|         129.0|
|          'GI_MAN'|                7|      12|         125.0|
|          'GI_MAN'|                7|      13|         125.0|
|          'GI_MAN'|                7|      21|         127.0|
|          'GI_MAN'|                7|      22|         126.0|
+------------------+-----------------+--------+--------------+

and I am expecting a numpy nd_array i.e, gi_man_array:

[[[124.2],[121.15],[129.0],[125.0],[125.0],[127.0],[126.0]]]

where rand_double values after applying pivot.

I tried the following 2 approaches:
FIRST: I pivot the gi_man_df as follows:

gi_man_pivot = gi_man_df.groupBy("number").pivot('rand_int').sum("rand_double")

and the output I got is:

Row(number=7, group=u'GI_MAN', 3=124.2, 10=121.15, 11=129.0, 12=125.0, 13=125.0, 21=127.0, 23=126.0)

but here the problem is to get the desired output, I can't convert it to matrix then convert again to numpy array.

SECOND: I created the vector in the dataframe itself using:

assembler = VectorAssembler(inputCols=["rand_double"],outputCol="rand_double_vector")

gi_man_vector = assembler.transform(gi_man_df)
gi_man_vector.show(7)

and I got the following output:

+----------------+-----------------+--------+--------------+--------------+
|           group|           number|rand_int|   rand_double| rand_dbl_Vect|
+----------------+-----------------+--------+--------------+--------------+
|          GI_MAN|                7|       3|         124.2|       [124.2]|
|          GI_MAN|                7|      10|        121.15|      [121.15]|
|          GI_MAN|                7|      11|         129.0|       [129.0]|
|          GI_MAN|                7|      12|         125.0|       [125.0]|
|          GI_MAN|                7|      13|         125.0|       [125.0]|
|          GI_MAN|                7|      21|         127.0|       [127.0]|
|          GI_MAN|                7|      22|         126.0|       [126.0]|
+----------------+-----------------+--------+--------------+--------------+

but problem here is I can't pivot it on rand_dbl_Vect.

So my question is:
1. Is any of the 2 approaches is correct way of achieving the desired output, if so then how can I proceed further to get the desired result?
2. What other way I can proceed with so the code is optimal and performance is good?

5
  • I'm not at my spark console but can you use the .toArray() method? Df.select('rand_dbl').toArray(). Neither your number or rand_int suggest that a groupby has any groups to work from to necessitate a groupby. Commented Feb 8, 2017 at 17:34
  • but the groups can be of n types like GI_MAN, LI_MAN and the corresponding values of other columns are changing accordingly, I tried to group by with pivot and it's working fine, can you please elaborate when you say "groupby has any groups to work from to necessitate a groupby", I didn't quite get that Commented Feb 9, 2017 at 6:41
  • Your number vector in the example is all 7s. There's only one group. So why need groupby? Commented Feb 12, 2017 at 0:28
  • did my answer work for you? If so, please approve it. Commented Feb 14, 2017 at 21:18
  • Groups can be of n types so i need a group by there Commented Feb 18, 2017 at 8:18

3 Answers 3

28

This

import numpy as np
np.array(gi_man_df.select('rand_double').collect())

produces

array([[ 124.2 ],
       [ 121.15],
       .........])
Sign up to request clarification or add additional context in comments.

5 Comments

I can't use collect as the current datasize is 20TB and every month, it's increased by ~5TB. So using collect won't be a viable option as it's will need a lot of memory on driver.
any other way to do the same?
@UdayShankarSingh where were you imagining holding this numpy array if not in memory on your driver? I ask because I am doing a similar thing and the only solution I can think of is to batch through the dataframe in chunks that are small enough to hold the resulting array in memory.
@seth127 I have the same problem, did you find an elegant solution? I would be happy to avoid reinventing a solution to a common problem.
did you find a solution? can the data frame be split into chunks and processed async?
1

This solution based on @data_steve's answer is more memory efficient, taking a bit longer:

import numpy as np
np.fromiter( gi_man_df.select('rand_double').toLocalIterator(), dtype=float )[:,None]

as it does not first create a local dataframe and then another numpy array but reads the values one by one to build the array. I observed the RAM consumption and that seems to be exactly what is (not) happening.

Probably you can specify a more appropriate type.

Comments

0

To convert the spark df to numpy array, first convert it to pandas and then apply the to_numpy() function.

spark_df.select(<list of columns needed>).toPandas().to_numpy()

1 Comment

This would work but introduces pandas as an intermediate step, which brings big memory overhead which is unnecessary if all we care about is getting a numpy array out in the end. @data_steve's answer is a really elegant way to get around this.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.