0

There is this syntax: df.withColumn('new', regexp_replace('old', 'str', ''))

this is for replacing a string in a column.

My question is what if ii have a column consisting of arrays and string. Meaning a row could have either a string , or an array containing this string. Is there any way of replacing this string regardless of if it's alone or inside an array?

4
  • What’s ur spark version ? With 2.4 u can use higher order function transform to do inside array Commented Apr 5, 2020 at 17:33
  • I am not sure about the version. Can you show me how its done? Commented Apr 5, 2020 at 17:36
  • no need to explode array to apply regex in 2.4.u can apply on array(string) column df.withColumn("new", F.expr("""transform(col1, x-> regexp_replace(x,'str',''))""")) Commented Apr 6, 2020 at 5:31
  • got an error u"cannot resolve 'df.my.ou' given input columns: [my]; line 1 pos 10;\n'Project [my#914196, 'transform('df.my.ou, lambdafunction('regexp_replace(lambda 'x, yes, ), lambda 'x, false)) AS new#914198]\n+- Filter (my#914196.Source = 5)\n +- Project [my#914196]\n +- Generate explode(contents#914193), false, [my#914196]\n +- Relation[contents#914193] parquet\n" Commented Apr 6, 2020 at 7:31

2 Answers 2

2

Having a column with multiple types is not currently supported. However, the column contained an array of string, you could explode the array (https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=explode#pyspark.sql.functions.explode), which creates a row for each element in the array, and apply the regular expression to the new column. Example:

from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

sql_context = SQLContext(spark.sparkContext)

df = sql_context.createDataFrame([("hello world",),
                                  ("hello madam",),
                                  ("hello sir",),
                                  ("hello everybody",),
                                  ("goodbye world",)], schema=['test'])

df = df.withColumn('test', F.array(F.col('test')))

print(df.show())

df = df.withColumn('test-exploded', F.explode(F.col('test')))

df = df.withColumn('test-exploded-regex', F.regexp_replace(F.col('test-exploded'), "hello", "goodbye"))


print(df.show())

Output:

+-----------------+
|             test|
+-----------------+
|    [hello world]|
|    [hello madam]|
|      [hello sir]|
|[hello everybody]|
|  [goodbye world]|
+-----------------+

+-----------------+---------------+-------------------+
|             test|  test-exploded|test-exploded-regex|
+-----------------+---------------+-------------------+
|    [hello world]|    hello world|      goodbye world|
|    [hello madam]|    hello madam|      goodbye madam|
|      [hello sir]|      hello sir|        goodbye sir|
|[hello everybody]|hello everybody|  goodbye everybody|
|  [goodbye world]|  goodbye world|      goodbye world|
+-----------------+---------------+-------------------+

And if you wanted to put the results back in an array:

df = df.withColumn('test-exploded-regex-array', F.array(F.col('test-exploded-regex')))

Output:

+-----------------+---------------+-------------------+-------------------------+
|             test|  test-exploded|test-exploded-regex|test-exploded-regex-array|
+-----------------+---------------+-------------------+-------------------------+
|    [hello world]|    hello world|      goodbye world|          [goodbye world]|
|    [hello madam]|    hello madam|      goodbye madam|          [goodbye madam]|
|      [hello sir]|      hello sir|        goodbye sir|            [goodbye sir]|
|[hello everybody]|hello everybody|  goodbye everybody|      [goodbye everybody]|
|  [goodbye world]|  goodbye world|      goodbye world|          [goodbye world]|
+-----------------+---------------+-------------------+-------------------------+

Hope this helps!

Update

Updated to include case where the array column has several strings:

from pyspark import SQLContext
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()

sql_context = SQLContext(spark.sparkContext)

df = sql_context.createDataFrame([("hello world", "foo"),
                                  ("hello madam", "bar"),
                                  ("hello sir", "baz"),
                                  ("hello everybody", "boo"),
                                  ("goodbye world", "bah")], schema=['test', 'test2'])

df = df.withColumn('test', F.array(F.col('test'), F.col('test2'))).drop('test2')

df = df.withColumn('id', F.monotonically_increasing_id())

print(df.show())

df = df.withColumn('test-exploded', F.explode(F.col('test')))

df = df.withColumn('test-exploded-regex', F.regexp_replace(F.col('test-exploded'), "hello", "goodbye"))

df = df.groupBy('id').agg(F.collect_list(F.col('test-exploded-regex')).alias('test-exploded-regex-array'))


print(df.show())

Output:

+--------------------+-----------+
|                test|         id|
+--------------------+-----------+
|  [hello world, foo]|          0|
|  [hello madam, bar]| 8589934592|
|    [hello sir, baz]|17179869184|
|[hello everybody,...|25769803776|
|[goodbye world, bah]|25769803777|
+--------------------+-----------+

+-----------+-------------------------+
|         id|test-exploded-regex-array|
+-----------+-------------------------+
| 8589934592|     [goodbye madam, bar]|
|          0|     [goodbye world, foo]|
|25769803776|     [goodbye everybod...|
|25769803777|     [goodbye world, bah]|
|17179869184|       [goodbye sir, baz]|
+-----------+-------------------------+

Just drop the id column when you're finished processing!

Sign up to request clarification or add additional context in comments.

6 Comments

Thanx. But, when I try to convert it back to array it doesn't realy turn it to the original column but rather it stays exploded. It just takes each element and convert it array but it does it for each elemnt even though two rows are supposed to be one. Like before exploding
You could potentially add an id column such that when you explode rows that were derived from the same array have the same id, then just perform a groupBy('id').agg(F.collect_list(F.col('test-exploded-regex'))) to ensure all strings are added to the same array.
thanks but it doesn't seem to work quite smoothly. As what you did gives an object only with id column and "test-exploded-regex' but it deletes all previously existing columns. Also, when I try to get the newly grouped column back to the first dataframe it givres me an error. It seem like this squeezed column doesn't fit exactly to the original one
How did you try to merge the new dataframe with the original? An inner join on the 'id column should work, since the number of rows in the generated dataframe should match the number of rows in the original dataframe.
Ok this finally worked for me I upvoted ur answer and now accept it.. Can you answer me this question stackoverflow.com/questions/61138777/… how to drop a nested column?
|
0

I think it is not possible in a dataframe in spark since the dataframe does not allow having multiple types for a column. It will give error while making the dataframe.

Though, you can do it using RDD's.

scala> val seq = Seq((1,"abc"),(2,List("abcd")))
seq: Seq[(Int, java.io.Serializable)] = List((1,abc), (2,List(abcd)))

scala> val rdd1 = sc.parallelize(seq)
rdd1: org.apache.spark.rdd.RDD[(Int, java.io.Serializable)] = ParallelCollectionRDD[2] at parallelize at <console>:26

scala> rdd1.take(2)
res1: Array[(Int, java.io.Serializable)] = Array((1,abc), (2,List(abcd)))

scala> val rdd2 = rdd1.map(x => x._2 match {
     | case v: String => (x._1, v.replaceAll("abc","def"))
     | case p: List[String] => (x._1, p.map(s => s.replaceAll("abc","def")))
     | }
     | )
rdd2: org.apache.spark.rdd.RDD[(Int, java.io.Serializable)] = MapPartitionsRDD[3] at map at <console>:25

scala> rdd2.take(2)
res2: Array[(Int, java.io.Serializable)] = Array((1,def), (2,List(defd)))

Let me know if it helps!!

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.