2

I have a pyspark dataframe as shown below

+--------------------+---+
|            _c0|_c1|
+--------------------+---+
|{"object":"F...|  0|
|{"object":"F...|  1|
|{"object":"F...|  2|
|{"object":"E...|  3|
|{"object":"F...|  4|
|{"object":"F...|  5|
|{"object":"F...|  6|
|{"object":"S...|  7|
|{"object":"F...|  8|

The column _c0 contains a string in dictionary form.

'{"object":"F","time":"2019-07-18T15:08:16.143Z","values":[0.22124142944812775,0.2147877812385559,0.16713131964206696,0.3102800250053406,0.31872493028640747,0.3366488814353943,0.25324496626853943,0.14537988603115082,0.12684473395347595,0.13864757120609283,0.15222792327404022,0.238663449883461,0.22896413505077362,0.237777978181839]}'

How can I convert the above string to a dictionary form and fetch each key value pair and store it to a variables? I don't want to convert it to pandas as it is expensive.

6
  • 1
    Have you tried df.rdd.map? Commented Jul 19, 2019 at 8:43
  • 1
    Ok when I tried it I got a pipelined rdd. But I think it is memory consuming to converting that string to dataframe and acessing values using take(). Is there any other way to acess that pipelined rdd. Commented Jul 19, 2019 at 8:44
  • See below for the idea. If you need some pyspark-specifics on how to address columns and what to return from map, maybe someone else can chime in (or you give me the time to set up pyspark and try to reproduce). Commented Jul 19, 2019 at 8:54
  • well, you need to access the data once. For that you need to parse the whole string (except if you hack your own parsing solution if you only need bits). The nice thing is that .map works distributedly, so the nodes share the work. Commented Jul 19, 2019 at 8:56
  • 1
    What's the desired output? Commented Jul 19, 2019 at 11:15

4 Answers 4

1

You should use the equivalents of Spark API for Scala's Dataset.withColumn and from_json standard function.

Sign up to request clarification or add additional context in comments.

Comments

0

Extending on @Jacek Laskowski's post: First create the schema of the struct column. Then use from_json to convert the string column to a struct. Lastly we use the nested schema structure to extract the new columns (we use the f-strings which need python 3.6). On the struct-type you can directly use .select to operate on the nested structure.

schema = StructType([StructField("object",StringType()),
                    StructField("time",StringType()),
                    StructField("values",ArrayType(FloatType()))])

df=df.withColumn('_c0',f.from_json('_c0', schema))

select_list = ["_c0","_c1"] + [f.col(f'_c0.{column}').alias(column) for column in ["object","time","values"]] 
df.select(*select_list).show()

Output (just first to rows)

+--------------------+---+------+--------------------+--------------------+
|                 _c0|_c1|object|                time|              values|
+--------------------+---+------+--------------------+--------------------+
|[F, 2019-07-18T15...|  0|     F|2019-07-18T15:08:...|[0.22124143, 0.21...|
|[F, 2019-07-18T15...|  1|     F|2019-07-18T15:08:...|[0.22124143, 0.21...|
+--------------------+---+------+--------------------+--------------------+

Comments

0

df.rdd.map applies the given function to each row of data. I have not yet used the python variant of spark, but it could work like this:

import json

def wrangle(row):
   tmp = json.loads(row._c0)
   return (row._c1, tmp['object'], tmp['time'], tmp['values'])

df.rdd.map(wrangle).toDF()  # should yield a new frame/rdd with the object split

The question how to address the columns might work like that, but you seem to have figured that out already.

This loads the JSON-formatted string to a Python object and returns a tuple with the required elements. Maybe you need to return a Row object instead of a tuple, but, as above, I have not yet used the python part of spark.

Comments

0

What I did

The column that had JSON string had values like

"{\"arm\": \"1\", \"wtvec\": [1, 1, 0.4], \"td\": \"current|0.01\", \"MABparam\": [[340, 1000], [340, 1000], [340, 1000], [340, 1000], [340, 1000], [340, 1000], [340, 1000], [340, 1000]], \"seg\": \"c2\"}"

made a simple UDF

def htl_id(x):
    try:
        return int(json.loads(x)['arm'])
    except:
        raise Exception(x)

htlid_udf = udf(htl_id, IntegerType())

Then for extracting a column named 'arm' in my case,

cdm.withColumn('arm', htlid_udf(col('logString')))

Other answers make you schema and what not, and that wasn't cutting for me

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.