1

In the following PySpark dataframe I am trying to multiply values in column top to bottom (as if I was using F.product function) but instead of row by row calculation I have subgroups of identical values for every id that I want to multiply. Product is the name of the column with expected results based on id and values columns. Grateful for any suggestions.

+---+------+-------+
| id|values|product|
+---+------+-------+
|  1|    10|   10  |
|  1|    10|   10  |
|  1|    11|  110  |
|  1|    11|  110  |
|  2|    20|   20  |
|  2|    20|   20  |
|  2|    22|  440  |
|  2|    22|  440  |
|  2|     4| 1760  |
|  2|     4| 1760  |
|  3|    30|   30  |
|  3|    30|   30  |
|  3|     3|   90  |
|  3|     3|   90  |
|  3|     6|  540  |
|  3|     6|  540  |
|  4|     4|    4  |
|  4|     4|    4  |
|  4|     2|    8  |
|  4|     2|    8  |
|  4|     5|   40  |
|  5|    12|   12  |
|  5|    12|   12  |
|  5|     4|   48  |
|  5|   0.5|   24  |
|  5|   0.5|   24  |
+---+------+-------+

df = spark.createDataFrame([
    (1, 10, 10), (1, 10, 10), (1, 11, 110), (1, 11, 110),
    (2, 20, 20), (2, 20, 20), (2, 22, 440), (2, 22, 440), (2, 4, 1760), (2, 4, 1760),
    (3, 30, 30), (3, 30, 30), (3, 3, 90), (3, 3, 90), (3, 6, 540), (3, 6, 540),
    (4, 4, 4), (4, 4, 4), (4, 2, 8), (4, 2, 8), (4, 5, 40),
    (5, 12, 12), (5, 12, 12), (5, 4, 48), (5, 0.5, 24), (5, 0.5, 24) ], ["id", "values", "product"])

1 Answer 1

1

I think you probably need a column that keeps track of row number within each id so that we preserve the order of the values we are multiplying together, so you can use a window partitioned by id ordered by F.lit(1) which is a dummy ordering.

from pyspark.sql import Window
import pyspark.sql.functions as F

schema = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("values", FloatType(), True),
        StructField("product", FloatType(), True)
    ]
)

df = spark.createDataFrame([
    (1, 10., 10.), (1, 10., 10.), (1, 11., 110.), (1, 11., 110.),
    (2, 20., 20.), (2, 20., 20.), (2, 22., 440.), (2, 22., 440.), (2, 4., 1760.), (2, 4., 1760.),
    (3, 30., 30.), (3, 30., 30.), (3, 3., 90.), (3, 3., 90.), (3, 6., 540.), (3, 6., 540.),
    (4, 4., 4.), (4, 4., 4.), (4, 2., 8.), (4, 2., 8.), (4, 5., 40.),
    (5, 12., 12.), (5, 12., 12.), (5, 4., 48.), (5, 0.5, 24.), (5, 0.5, 24.) ], 
    schema=schema
)

window_row = Window.partitionBy('id').orderBy(F.lit(1))

df_input = df.select(
    "id","values"
).withColumn(
    'row', F.row_number().over(window_row)
)

We have df_input which looks like the following:

+---+------+---+
| id|values|row|
+---+------+---+
|  1|  10.0|  1|
|  1|  10.0|  2|
|  1|  11.0|  3|
|  1|  11.0|  4|
|  2|  20.0|  1|
|  2|  20.0|  2|
|  2|  22.0|  3|
|  2|  22.0|  4|
|  2|   4.0|  5|
|  2|   4.0|  6|
|  3|  30.0|  1|
|  3|  30.0|  2|
|  3|   3.0|  3|
|  3|   3.0|  4|
|  3|   6.0|  5|
|  3|   6.0|  6|
|  4|   4.0|  1|
|  4|   4.0|  2|
|  4|   2.0|  3|
|  4|   2.0|  4|
|  4|   5.0|  5|
|  5|  12.0|  1|
|  5|  12.0|  2|
|  5|   4.0|  3|
|  5|   0.5|  4|
|  5|   0.5|  5|
+---+------+---+

After that, we can drop duplicates so that we are only taking the cumulative product of unique values for each id, then create another window partitioned by id and ordered by row, and take F.product of value over this window to obtain the cumulative product:

w = (Window.partitionBy('id').orderBy('row').rowsBetween(Window.unboundedPreceding, 0))

df_products = df_input.drop_duplicates(['id','values']).withColumn(
    'product', F.product(F.col('values')).over(w)
).select('id','values','product')

This gives us our cumulative product in df_products:

+---+------+-------+
| id|values|product|
+---+------+-------+
|  1|  10.0|   10.0|
|  1|  11.0|  110.0|
|  2|  20.0|   20.0|
|  2|  22.0|  440.0|
|  2|   4.0| 1760.0|
|  3|  30.0|   30.0|
|  3|   3.0|   90.0|
|  3|   6.0|  540.0|
|  4|   4.0|    4.0|
|  4|   2.0|    8.0|
|  4|   5.0|   40.0|
|  5|  12.0|   12.0|
|  5|   4.0|   48.0|
|  5|   0.5|   24.0|
+---+------+-------+

Now we can join df_product back onto df_input:

df_input.join(
    df_products, on=['id','values'], how='inner'
).orderBy(
    'id','row'
).select(
    'id','values','product','row'
)

Result:

+---+------+-------+---+
| id|values|product|row|
+---+------+-------+---+
|  1|  10.0|   10.0|  1|
|  1|  10.0|   10.0|  2|
|  1|  11.0|  110.0|  3|
|  1|  11.0|  110.0|  4|
|  2|  20.0|   20.0|  1|
|  2|  20.0|   20.0|  2|
|  2|  22.0|  440.0|  3|
|  2|  22.0|  440.0|  4|
|  2|   4.0| 1760.0|  5|
|  2|   4.0| 1760.0|  6|
|  3|  30.0|   30.0|  1|
|  3|  30.0|   30.0|  2|
|  3|   3.0|   90.0|  3|
|  3|   3.0|   90.0|  4|
|  3|   6.0|  540.0|  5|
|  3|   6.0|  540.0|  6|
|  4|   4.0|    4.0|  1|
|  4|   4.0|    4.0|  2|
|  4|   2.0|    8.0|  3|
|  4|   2.0|    8.0|  4|
|  4|   5.0|   40.0|  5|
|  5|  12.0|   12.0|  1|
|  5|  12.0|   12.0|  2|
|  5|   4.0|   48.0|  3|
|  5|   0.5|   24.0|  4|
|  5|   0.5|   24.0|  5|
+---+------+-------+---+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.