I have a table, base_df, with many columns, one of which is an array column:
| Id | FruitNames | Col1 | Col2 | Col3 | ... | Col99 |
|---|---|---|---|---|---|---|
| 1 | ["apple", "banana", "orange"] | ... | ... | ... | ... | ... |
| 2 | ["banana"] | ... | ... | ... | ... | ... |
| 3 | ["grape", "apple"] | ... | ... | ... | ... | ... |
| 4 | [] | ... | ... | ... | ... | ... |
| 5 | null | ... | ... | ... | ... | ... |
| 6 | ["orange", "grape"] | ... | ... | ... | ... | ... |
I also have a table, lookup_df, that has each of the array items as rows:
| Id | Name |
|---|---|
| 1 | apple |
| 2 | banana |
| 3 | orange |
| 4 | grape |
I want to add a column to base_df that is an array of the ids from lookup_df. In this case, I want to generate the column FruitIds from base_df and lookup_df.
End Table
| Id | FruitNames | FruitIds | Col1 | Col2 | Col3 | ... | Col99 |
|---|---|---|---|---|---|---|---|
| 1 | ["apple", "banana", "orange"] | [1, 2, 3] | ... | ... | ... | ... | ... |
| 2 | ["banana"] | [2] | ... | ... | ... | ... | ... |
| 3 | ["grape", "apple"] | [4, 1] | ... | ... | ... | ... | ... |
| 4 | [] | [] | ... | ... | ... | ... | ... |
| 5 | null | null | ... | ... | ... | ... | ... |
| 6 | ["orange", "grape"] | [3, 4] | ... | ... | ... | ... | ... |
base_df = spark.createDataFrame(
[
(1, ["apple", "banana", "orange"], "...", "...", "...", "...", "..."),
(2, ["banana"], "...", "...", "...", "...", "..."),
(3, ["grape", "apple"], "...", "...", "...", "...", "..."),
(4, [], "...", "...", "...", "...", "..."),
(5, None, "...", "...", "...", "...", "..."),
(6, ["orange", "grape"], "...", "...", "...", "...", "..."),
],
["Id", "FruitNames", "Col1", "Col2", "Col3", "...", "Col99"]
)
lookup_df = spark.createDataFrame(
[
(1, "apple"),
(2, "banana"),
(3, "orange"),
(4, "grape"),
],
["Id", "Name"]
)
df_with_fruitids = ???
I can't use explode since that isn't null/empty array safe. I could use an explode_outer with a case statement that ensures that nulls/empty arrays are converted back along with a collect_list to create the FruitIds column, however, it runs into the issue of having to do a group by on a large number of columns. Since both base_df and lookup_df are very large (100M and 100K respectively), efficiency is key.
I have thought about doing some kind of base_df.withColumn("FruitIds", transform(col("FruitNames"), my_func)), but I'm not sure what my_func would be in pure Spark. I would need it to basically be something like:
lambda x: x.join(lookup_df, lookup_df["Name"] == x).select(lookup_df["Id"])
but obviously that doesn't work...
Can someone please help me figure out a pure Spark (no Python UDF) way of creating FruitIds?