0

let's assume I have an expensive PySpark query that results in a large dataframe sdf_input. I now want to add a column to this dataframe that requires the count of the total number of rows in sdf_input. Let's for simplicity say I want to divide another column A by the total_num_rows.

I could simply do something like this:

total_num_rows = sdf_input.count()
sdf_output = sdf_input.withColumn('B', F.col('A')/total_num_rows)

This will work, but it will perform the expensive sdf_input calculation twice because the count() action will trigger the query to execute (materialize sdf_input) Is there a (proper) way to obtain total_num_rows without materializing sdf_input?

It feels like this should not be that hard, any ideas on how to properly do this?

I came up with a few 'solutions' but all of them feel over complicated and/or computationally expensive.

1. Simply writing sdf_input to disk

Not ideal as it is a large dataset and feels very inefficient.

2. Caching sdf_input

total_num_rows = sdf_input.cache().count()
sdf_output = sdf_input.withColumn('B', F.col('A')/total_num_rows)

The entire sdf_input will not fit into memory, so this also feels inefficient.

3. Using a window operation

Use a window on the entire dataframe to obtain total_num_rows and attach to each row:

sdf_output = sdf_input.withColumn('B', F.col('A')/F.count("*").over(W.partitionBy()))

This is probably a very bad idea as it will move all data to a single partition (Spark even gives a warning..)

4. Aggregate over the entire dataframe and join back to each row

    sdf_total_sum = sdf_input.agg(F.count("*").alias("total_num_rows"))
    sdf_output = sdf_input.crossJoin(F.broadcast(sdf_total_sum)).withColumn("B", F.col("A") / F.col("total_num_rows"))

Intuitively this makes sense, but it feels way over complicated to just get a count..

1
  • I'm still wondering what the 'correct' approach is... It is a pretty common thing to do in Spark, so I would expect an easier, faster and more intuitive way to do this. Any thoughts? Commented Feb 14 at 7:08

1 Answer 1

0

Your option 4, aggregating over the entire dataframe to get the count and then broadcasting that count to the rest of the dataframe, is also a common pattern. Although it might seem complicated, it's a fairly standard way of handling this kind of situation in Spark.

Here's a slight simplification of your option 4 using a broadcast variable:

from pyspark.sql import functions as F

# Count the total number of rows
total_num_rows = sdf_input.count()

# Broadcast the count
broadcast_count = spark.sparkContext.broadcast(total_num_rows)

# Use the broadcasted count to add the new column
sdf_output = sdf_input.withColumn('B', F.col('A') / broadcast_count.value)

This way, you avoid the join operation by using a broadcast variable. The count is computed once, then sent to all nodes, where it can be used to compute the new column without triggering any additional computation of sdf_input.

Sign up to request clarification or add additional context in comments.

1 Comment

Will this still not trigger 2 actions? (and therefore compute sdf_input twice), i.e., once to obtain total_num_rows and once to obtain sdf_output

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.