0

I have a sample pyspark code where I am trying to generate a json structure . Below is the code

def func(row):
    temp=row.asDict()
    headDict = {}
    headDict['type'] = "record"
    headDict['name'] = "source"
    headDict['namespace'] = "com.streaming.event"
    headDict['doc'] = "SCD signals from  source"
    fieldslist = []
    headDict['fields'] = fieldslist
    for i in temp:
        fieldslist.append({i:temp[i]})
    return (json.dumps(headDict))
if __name__ == "__main__":
    spark = SparkSession.builder.master("local[*]").appName("PythonWordCount").getOrCreate()
    payload=udf(func,StringType())
    data = spark.createDataFrame(
        [
            (1, "a", 'foo1'),  # create your data here, be consistent in the types.
            (2, "b", 'bar'),
            (3, "c", 'mnc')
        ],
        ['id', 'nm', 'txt']  # add your columns label here
    )
    df=data.withColumn("payload1",payload(struct([data[x] for x in data.columns])))
    df.show(3,False)

I am getting an error while inserting data into dataframe

  raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple '{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from  source"}' with StructType

If I am trying to print the json payload I am getting correct output

{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from  source"}

I have also verified this is a valid json.

I am not sure what I am missing here.

Could this be a python version issue?I am using python 2.7

Update-I tried to run the exact same code using python 3.7 and it is running fine

1 Answer 1

1

it works for me in spark 3.x with python 2.7.x.,

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0
      /_/

Using Python version 2.7.17 (default, Jul 20 2020 15:37:01)
SparkSession available as 'spark'.

results from pyspark shell

import json
from pyspark.sql.functions import * 
from pyspark.sql.types import *

def func(row):
    temp=row.asDict()
    headDict = {}
    headDict['type'] = "record"
    headDict['name'] = "source"
    headDict['namespace'] = "com.streaming.event"
    headDict['doc'] = "SCD signals from  source"
    fieldslist = []
    headDict['fields'] = fieldslist
    for i in temp:
        fieldslist.append({i:temp[i]})
    return (json.dumps(headDict))

spark = SparkSession.builder.master("local[*]").appName("PythonWordCount").getOrCreate()
payload=udf(func,StringType())
data = spark.createDataFrame([(1, "a", 'foo1'),     (2, "b", 'bar'),    (3, "c", 'mnc')],['id', 'nm', 'txt'])
data.show()
'''
+---+---+----+                                                                  
| id| nm| txt|
+---+---+----+
|  1|  a|foo1|
|  2|  b| bar|
|  3|  c| mnc|
+---+---+----+
'''


df=data.withColumn("payload1",payload(struct([data[x] for x in data.columns])))
df.show(3,False)
'''
+---+---+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id |nm |txt |payload1                                                                                                                                                        |
+---+---+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1  |a  |foo1|{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "foo1"}, {"id": 1}, {"nm": "a"}], "doc": "SCD signals from  source"}|
|2  |b  |bar |{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "bar"}, {"id": 2}, {"nm": "b"}], "doc": "SCD signals from  source"} |
|3  |c  |mnc |{"namespace": "com.streaming.event", "type": "record", "name": "source", "fields": [{"txt": "mnc"}, {"id": 3}, {"nm": "c"}], "doc": "SCD signals from  source"} |
+---+---+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------+
'''

Sign up to request clarification or add additional context in comments.

1 Comment

I just change the import statements and it worked for me.Never knew this would cause a problem

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.