I think you probably need a column that keeps track of row number within each id so that we preserve the order of the values we are multiplying together, so you can use a window partitioned by id ordered by F.lit(1) which is a dummy ordering.
from pyspark.sql import Window
import pyspark.sql.functions as F
schema = StructType(
[
StructField("id", IntegerType(), True),
StructField("values", FloatType(), True),
StructField("product", FloatType(), True)
]
)
df = spark.createDataFrame([
(1, 10., 10.), (1, 10., 10.), (1, 11., 110.), (1, 11., 110.),
(2, 20., 20.), (2, 20., 20.), (2, 22., 440.), (2, 22., 440.), (2, 4., 1760.), (2, 4., 1760.),
(3, 30., 30.), (3, 30., 30.), (3, 3., 90.), (3, 3., 90.), (3, 6., 540.), (3, 6., 540.),
(4, 4., 4.), (4, 4., 4.), (4, 2., 8.), (4, 2., 8.), (4, 5., 40.),
(5, 12., 12.), (5, 12., 12.), (5, 4., 48.), (5, 0.5, 24.), (5, 0.5, 24.) ],
schema=schema
)
window_row = Window.partitionBy('id').orderBy(F.lit(1))
df_input = df.select(
"id","values"
).withColumn(
'row', F.row_number().over(window_row)
)
We have df_input which looks like the following:
+---+------+---+
| id|values|row|
+---+------+---+
| 1| 10.0| 1|
| 1| 10.0| 2|
| 1| 11.0| 3|
| 1| 11.0| 4|
| 2| 20.0| 1|
| 2| 20.0| 2|
| 2| 22.0| 3|
| 2| 22.0| 4|
| 2| 4.0| 5|
| 2| 4.0| 6|
| 3| 30.0| 1|
| 3| 30.0| 2|
| 3| 3.0| 3|
| 3| 3.0| 4|
| 3| 6.0| 5|
| 3| 6.0| 6|
| 4| 4.0| 1|
| 4| 4.0| 2|
| 4| 2.0| 3|
| 4| 2.0| 4|
| 4| 5.0| 5|
| 5| 12.0| 1|
| 5| 12.0| 2|
| 5| 4.0| 3|
| 5| 0.5| 4|
| 5| 0.5| 5|
+---+------+---+
After that, we can drop duplicates so that we are only taking the cumulative product of unique values for each id, then create another window partitioned by id and ordered by row, and take F.product of value over this window to obtain the cumulative product:
w = (Window.partitionBy('id').orderBy('row').rowsBetween(Window.unboundedPreceding, 0))
df_products = df_input.drop_duplicates(['id','values']).withColumn(
'product', F.product(F.col('values')).over(w)
).select('id','values','product')
This gives us our cumulative product in df_products:
+---+------+-------+
| id|values|product|
+---+------+-------+
| 1| 10.0| 10.0|
| 1| 11.0| 110.0|
| 2| 20.0| 20.0|
| 2| 22.0| 440.0|
| 2| 4.0| 1760.0|
| 3| 30.0| 30.0|
| 3| 3.0| 90.0|
| 3| 6.0| 540.0|
| 4| 4.0| 4.0|
| 4| 2.0| 8.0|
| 4| 5.0| 40.0|
| 5| 12.0| 12.0|
| 5| 4.0| 48.0|
| 5| 0.5| 24.0|
+---+------+-------+
Now we can join df_product back onto df_input:
df_input.join(
df_products, on=['id','values'], how='inner'
).orderBy(
'id','row'
).select(
'id','values','product','row'
)
Result:
+---+------+-------+---+
| id|values|product|row|
+---+------+-------+---+
| 1| 10.0| 10.0| 1|
| 1| 10.0| 10.0| 2|
| 1| 11.0| 110.0| 3|
| 1| 11.0| 110.0| 4|
| 2| 20.0| 20.0| 1|
| 2| 20.0| 20.0| 2|
| 2| 22.0| 440.0| 3|
| 2| 22.0| 440.0| 4|
| 2| 4.0| 1760.0| 5|
| 2| 4.0| 1760.0| 6|
| 3| 30.0| 30.0| 1|
| 3| 30.0| 30.0| 2|
| 3| 3.0| 90.0| 3|
| 3| 3.0| 90.0| 4|
| 3| 6.0| 540.0| 5|
| 3| 6.0| 540.0| 6|
| 4| 4.0| 4.0| 1|
| 4| 4.0| 4.0| 2|
| 4| 2.0| 8.0| 3|
| 4| 2.0| 8.0| 4|
| 4| 5.0| 40.0| 5|
| 5| 12.0| 12.0| 1|
| 5| 12.0| 12.0| 2|
| 5| 4.0| 48.0| 3|
| 5| 0.5| 24.0| 4|
| 5| 0.5| 24.0| 5|
+---+------+-------+---+