10

Here is the code to create a pyspark.sql DataFrame

import numpy as np
import pandas as pd
from pyspark import SparkContext
from pyspark.sql import SQLContext
df = pd.DataFrame(np.array([[1,2,3],[4,5,6],[7,8,9],[10,11,12]]), columns=['a','b','c'])
sparkdf = sqlContext.createDataFrame(df, samplingRatio=0.1)

So that sparkdf looks like

a  b  c
1  2  3
4  5  6
7  8  9
10 11 12

Now I would like to add as a new column a numpy array (or even a list)

new_col = np.array([20,20,20,20])

But the standard way

sparkdf = sparkdf.withColumn('newcol', new_col)

fails. Probably an udf is the way to go, but I don't know how to create an udf that assigns one different value per DataFrame row, i.e. that iterates through new_col. I have looked at other pyspark and pyspark.sql but couldn't find a solution. Also I need to stay within pyspark.sql so not a scala solution. Thanks!

1 Answer 1

6

Assuming that data frame is sorted to match order of values in an array you can zip RDDs and rebuild data frame as follows:

n = sparkdf.rdd.getNumPartitions()

# Parallelize and cast to plain integer (np.int64 won't work)
new_col = sc.parallelize(np.array([20,20,20,20]), n).map(int) 

def process(pair):
    return dict(pair[0].asDict().items() + [("new_col", pair[1])])

rdd = (sparkdf
    .rdd # Extract RDD
    .zip(new_col) # Zip with new col
    .map(process)) # Add new column

sqlContext.createDataFrame(rdd) # Rebuild data frame

You can also use joins:

new_col = sqlContext.createDataFrame(
    zip(range(1, 5), [20] * 4),
    ("rn", "new_col"))

sparkdf.registerTempTable("df")

sparkdf_indexed = sqlContext.sql(
    # Make sure we have specific order and add row number
    "SELECT row_number() OVER (ORDER BY a, b, c) AS rn, * FROM df")

(sparkdf_indexed
    .join(new_col, new_col.rn == sparkdf_indexed.rn)
    .drop(new_col.rn))

but window function component is not scalable and should be avoided with larger datasets.

Of course if all you need is a column of a single value you can simply use lit

import pyspark.sql.functions as f
sparkdf.withColumn("new_col", f.lit(20))

but I assume it is not the case.

Sign up to request clarification or add additional context in comments.

7 Comments

Thanks. It looks like (first solution) you have to go back to RDD and then convert again to DataFrame at the end, no way of staying inside pyspark.sql. Which of the two solutions provides better performance, i.e. faster?
Well, ignoring Catalyst, data frames are just an abstraction layer over RDDs. There is certainly an overhead from sqlContext.createDataFrame which can be significantly reduced by manually providing a schema. zip by itself is much simpler operation than join and as long as order is preserved you don't need sorting. I think that an important question is why do you need to add a new column. If it comes from other data source then loading it as table and join are the natural choice.
Actually I get the error message: ValueError: Can only zip with RDD which has the same number of partitions
Right. The column does not come from another table, rather it is a manipulation of existing ones. It's fast to compute in numpy and I need to introduce the result as an additional column.
Regarding error just specify number of partitions on input. When it comes to your problem is it a row level operation or do you use multiple rows?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.