let's assume I have an expensive PySpark query that results in a large dataframe sdf_input. I now want to add a column to this dataframe that requires the count of the total number of rows in sdf_input. Let's for simplicity say I want to divide another column A by the total_num_rows.
I could simply do something like this:
total_num_rows = sdf_input.count()
sdf_output = sdf_input.withColumn('B', F.col('A')/total_num_rows)
This will work, but it will perform the expensive sdf_input calculation twice because the count() action will trigger the query to execute (materialize sdf_input) Is there a (proper) way to obtain total_num_rows without materializing sdf_input?
It feels like this should not be that hard, any ideas on how to properly do this?
I came up with a few 'solutions' but all of them feel over complicated and/or computationally expensive.
1. Simply writing sdf_input to disk
Not ideal as it is a large dataset and feels very inefficient.
2. Caching sdf_input
total_num_rows = sdf_input.cache().count()
sdf_output = sdf_input.withColumn('B', F.col('A')/total_num_rows)
The entire sdf_input will not fit into memory, so this also feels inefficient.
3. Using a window operation
Use a window on the entire dataframe to obtain total_num_rows and attach to each row:
sdf_output = sdf_input.withColumn('B', F.col('A')/F.count("*").over(W.partitionBy()))
This is probably a very bad idea as it will move all data to a single partition (Spark even gives a warning..)
4. Aggregate over the entire dataframe and join back to each row
sdf_total_sum = sdf_input.agg(F.count("*").alias("total_num_rows"))
sdf_output = sdf_input.crossJoin(F.broadcast(sdf_total_sum)).withColumn("B", F.col("A") / F.col("total_num_rows"))
Intuitively this makes sense, but it feels way over complicated to just get a count..