1

I have a table, base_df, with many columns, one of which is an array column:

Id FruitNames Col1 Col2 Col3 ... Col99
1 ["apple", "banana", "orange"] ... ... ... ... ...
2 ["banana"] ... ... ... ... ...
3 ["grape", "apple"] ... ... ... ... ...
4 [] ... ... ... ... ...
5 null ... ... ... ... ...
6 ["orange", "grape"] ... ... ... ... ...

I also have a table, lookup_df, that has each of the array items as rows:

Id Name
1 apple
2 banana
3 orange
4 grape

I want to add a column to base_df that is an array of the ids from lookup_df. In this case, I want to generate the column FruitIds from base_df and lookup_df.

End Table

Id FruitNames FruitIds Col1 Col2 Col3 ... Col99
1 ["apple", "banana", "orange"] [1, 2, 3] ... ... ... ... ...
2 ["banana"] [2] ... ... ... ... ...
3 ["grape", "apple"] [4, 1] ... ... ... ... ...
4 [] [] ... ... ... ... ...
5 null null ... ... ... ... ...
6 ["orange", "grape"] [3, 4] ... ... ... ... ...
base_df = spark.createDataFrame(
    [
        (1, ["apple", "banana", "orange"], "...", "...", "...", "...", "..."),
        (2, ["banana"], "...", "...", "...", "...", "..."),
        (3, ["grape", "apple"], "...", "...", "...", "...", "..."),
        (4, [], "...", "...", "...", "...", "..."),
        (5, None, "...", "...", "...", "...", "..."),
        (6, ["orange", "grape"], "...", "...", "...", "...", "..."),
    ],
    ["Id", "FruitNames", "Col1", "Col2", "Col3", "...", "Col99"]
)

lookup_df = spark.createDataFrame(
    [
        (1, "apple"),
        (2, "banana"),
        (3, "orange"),
        (4, "grape"),
    ],
    ["Id", "Name"]
)

df_with_fruitids = ???

I can't use explode since that isn't null/empty array safe. I could use an explode_outer with a case statement that ensures that nulls/empty arrays are converted back along with a collect_list to create the FruitIds column, however, it runs into the issue of having to do a group by on a large number of columns. Since both base_df and lookup_df are very large (100M and 100K respectively), efficiency is key.

I have thought about doing some kind of base_df.withColumn("FruitIds", transform(col("FruitNames"), my_func)), but I'm not sure what my_func would be in pure Spark. I would need it to basically be something like:

lambda x: x.join(lookup_df, lookup_df["Name"] == x).select(lookup_df["Id"])

but obviously that doesn't work...

Can someone please help me figure out a pure Spark (no Python UDF) way of creating FruitIds?

2 Answers 2

0

I'd still love to see an inline solution/row level solution, but this is the only real solution I've found so far.

from pyspark.sql.functions import last, posexplode_outer

df_with_fruitids = (

    # Step 1
    base_df
    .select(
        col("Id"),
        col("FruitNames"),
        posexplode_outer(col("FruitNames")).alias("Position", "Fruit")
    ).alias("exploded")
    
    # Step 2
    .join(lookup_df.alias("lookup"), col("lookup.Name") == col("exploded.Fruit"), how="left")

    # Step 3
    .withColumn(
        "FruitIds",
        when(
            col("FruitNames").isNull(), lit(None)
        )
        .otherwise(
            collect_list(col("lookup.Id"))
            .over(
                Window
                .partitionBy(col("exploded.Id"))
                .orderBy(col("Position"))
            )
        )
    )

    # Step 4
    .groupBy(col("exploded.Id"))
    .agg(last(col("FruitIds")).alias("FruitIds"))

    # Step 5
    .join(base_df, "Id", how="right")
)

Step 1

Break out base_df into only the necessary columns Id and FruitNames along with posexplode_outer to explode with an ordinal (the "pos" part means positional and the "outer" part means it includes null/empty list). This generates the following dataframe I have aliased as exploded:

Id FruitNames Position Fruit
1 ["apple","banana","orange"] 0 apple
1 ["apple","banana","orange"] 1 banana
1 ["apple","banana","orange"] 2 orange
2 ["banana"] 0 apple
3 ["grape","apple"] 0 grape
3 ["grape","apple"] 1 apple
4 [] null null
5 null null null
6 ["orange","grape"] 0 orange
6 ["orange","grape"] 1 grape

Step 2

Join this dataframe with lookup_df on lookup.name == exploded.Fruit.

Step 3

Then, add a column that is null if FruitNames is null, otherwise, it is collect_list with a window function over exploded.Id (the Id from base_df) and ordered on the ordinal position Position from the posexplode_outer. This generates the following dataframe:

exploded.Id exploded.FruitNames exploded.Position exploded.Fruit lookup.Id lookup.Name FruitIds
1 ["apple","banana","orange"] 0 apple 1 apple [1]
1 ["apple","banana","orange"] 1 banana 2 banana [1, 2]
1 ["apple","banana","orange"] 2 orange 3 orange [1, 2, 3]
2 ["banana"] 0 apple 2 apple [2]
3 ["grape","apple"] 0 grape 4 grape [4]
3 ["grape","apple"] 1 apple 1 apple [4, 2]
4 [] null null null null []
5 null null null null null null
6 ["orange","grape"] 0 orange 3 orange [3]
6 ["orange","grape"] 1 grape 4 grape [3, 4]

Step 4

Since only the last row of the window function contains all the values, on the group by, use last function in the aggregation to grab the final version per each id.

Step 5

After, just join it back on base_df on Id using a right join to get the rest of the columns.

Sign up to request clarification or add additional context in comments.

Comments

0
from pyspark.sql import functions as func

base_df.alias("df1").join(
    lookup_df.alias("df2"), 
    on=func.array_contains(func.col("df1.FruitNames"), func.col("df2.Name")),
    how="left"
).groupBy(
    "df1.Id", "df1.FruitNames"
).agg(
    func.map_from_arrays(func.collect_list("df2.Name"), func.collect_list("df2.Id")).alias("Reference")
).select(
    "Id",
    "FruitNames",
    "Reference",
    func.expr("transform(FruitNames, x -> Reference[x])").alias("values")
).show(
    10, False
)

You can use the conditional joining to check if the reference table key exist in the FruitNames array or not, then create a map column as a reference to get the value.

+---+-----------------------+--------------------------------------+---------+
|Id |FruitNames             |Reference                             |values   |
+---+-----------------------+--------------------------------------+---------+
|1  |[apple, banana, orange]|{apple -> 1, banana -> 2, orange -> 3}|[1, 2, 3]|
|2  |[banana]               |{banana -> 2}                         |[2]      |
|3  |[grape, apple]         |{apple -> 1, grape -> 4}              |[4, 1]   |
|4  |[]                     |{}                                    |[]       |
|5  |null                   |{}                                    |null     |
|6  |[orange, grape]        |{orange -> 3, grape -> 4}             |[3, 4]   |
+---+-----------------------+--------------------------------------+---------+

---

Edit: 2025-07-07

In my previous answer, I didn't handle remaining columns (i.e. Col1, Col2, ..., Col99). I use a struct column to store and unnest it later.

from pyspark.sql import functions as func

base_df.select(
    "Id", "FruitNames", func.struct([col for col in base_df.columns if "Col" in col]).alias("ColStruct")
).alias("df1").join(
    lookup_df.alias("df2"), 
    on=func.array_contains(func.col("df1.FruitNames"), func.col("df2.Name")),
    how="left"
).groupBy(
    "df1.Id", "df1.FruitNames", "df1.ColStruct"
).agg(
    func.map_from_arrays(func.collect_list("df2.Name"), func.collect_list("df2.Id")).alias("Reference")
).select(
    "Id",
    "FruitNames",
    "Reference",
    func.expr("transform(FruitNames, x -> Reference[x])").alias("Values"),
    "ColStruct.*"
).orderBy(
    "Id"
).show(
    10, False
)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.