13

I've a Pyspark Dataframe with this structure:

root
 |-- Id: string (nullable = true)
 |-- Q: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- pr: string (nullable = true)
 |    |    |-- qt: double (nullable = true)

Something similar to:

 +----+--------------------- ... --+
 | Id |           Q                |
 +----+---------------------- ... -+
 | 001| [ [pr1,1.9], [pr3,2.0]...] |
 | 002| [ [pr2,1.0], [pr9,3.9]...] |
 | 003| [ [pr2,9.0], ...         ] |
  ...

I wold like to convert Q array into columns (name pr value qt). Also I would like to avoid duplicated columns by merging (add) same columns.

 +----+-----+-----+------+ ... ----+
 | Id | pr1 | pr2 | pr3  | ... prn |
 +----+-----+-----+------+ ... ----+
 | 001| 1.9 | 0.0 | 2.0  | ...     |
 | 002| 0.0 | 1.0 | 0    | ...     |
 | 003| 0.0 | 9.0 | ...  | ...     |
  ...

How can I perform this transformation?. Thakyou in advance!!. Julián.

2
  • Hi, let me know if the answer worked or you have any other questions, thx Commented Dec 20, 2017 at 14:04
  • Yes ags29, thank you!!! Commented Dec 22, 2017 at 21:23

2 Answers 2

20

You can do this with a combination of explode and pivot:

import pyspark.sql.functions as F

# explode to get "long" format
df=df.withColumn('exploded', F.explode('Q'))

# get the name and the name in separate columns
df=df.withColumn('name', F.col('exploded').getItem(0))
df=df.withColumn('value', F.col('exploded').getItem(1))

# now pivot
df.groupby('Id').pivot('name').agg(F.max('value')).na.fill(0)
Sign up to request clarification or add additional context in comments.

1 Comment

I get Field name should be String Literal, but it's 0 when running df=df.withColumn('name', F.col('exploded').getItem(0)) df=df.withColumn('value', F.col('exploded').getItem(1))
0

Very interesting question. This is how I approached it.

test.csv

001,pr1:0.9,pr3:1.2,pr2:2.0
002,pr3:5.2,pr4:0.99

Pyspark

file = sc.textFile("file:///test2.csv")

//get it in (key,value)
//[(u'001', u'pr1:0.9')...]

//rdd1 = file.map(lambda r: r.replace(",","\t",1)).map(lambda r: r.split("\t")).map(lambda r: (r[0],r[1])).flatMapValues(lambda r: r.split(','))
rdd1 = file.map(lambda r: r.split(",")[0]).map(lambda r: (r[0],r[1])).flatMapValues(lambda r: r.split(','))

//create a DF with 3 columns
//[(u'001', u'pr1', u'0.9')...)]
+---+---+----+
| _1| _2|  _3|
+---+---+----+
|001|pr1| 0.9|
|001|pr3| 1.2|
|001|pr2| 2.0|
|002|pr3| 5.2|
|002|pr4|0.99|
+---+---+----+


rdd2 = rdd1.map(lambda r: (r[0],r[1].split(":"))).map(lambda r: (r[0],r[1][0],r[1][1]))
df = rdd2.toDF()


//Perform the magic
df.groupBy("_1").pivot("_2").agg(expr("coalesce(first(_3),0)"))


+---+---+---+---+----+
| _1|pr1|pr2|pr3| pr4|
+---+---+---+---+----+
|001|0.9|2.0|1.2|   0|
|002|  0|  0|5.2|0.99|
+---+---+---+---+----+

1 Comment

Thak you Bala, It's a good solution. Maybe a little bit longer than the proposed by ags29.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.