0

I have a dataframe my_df that contains 4 columns:

+----------------+---------------+--------+---------+
|         user_id|         domain|isp_flag|frequency|
+----------------+---------------+--------+---------+
|            josh|     wanadoo.fr|       1|       15|
|            josh|      random.it|       0|       12|
|        samantha|     wanadoo.fr|       1|       16|
|             bob|    eidsiva.net|       1|        5|
|             bob|      media.net|       0|        1|
|           dylan|    vodafone.it|       1|      448|
|           dylan|   somesite.net|       0|       20|
|           dylan|   yolosite.net|       0|       49|
|           dylan|      random.it|       0|        3|
|             don|    vodafone.it|       1|       39|
|             don|   popsugar.com|       0|       10|
|             don|      fabio.com|       1|       49|
+----------------+---------------+--------+---------+

This is what I'm planning to do-

Find all the user_id where the maximum frequency domain with isp_flag=0 has a frequency that is less than 25% of the maximum frequency domain with isp_flag=1.

So, in the example that I have above, my output_df would look like-

+----------------+---------------+--------+---------+
|         user_id|         domain|isp_flag|frequency|
+----------------+---------------+--------+---------+
|             bob|    eidsiva.net|       1|        5|
|             bob|      media.net|       0|        1|
|           dylan|    vodafone.it|       1|      448|
|           dylan|   yolosite.net|       0|       49|
|             don|      fabio.com|       1|       49|
|             don|   popsugar.com|       0|       10|
+----------------+---------------+--------+---------+

I believe I need window functions to do this, and so I tried the following to first find the maximum frequency domains for isp_flag=0 and isp_flag=1 respectively, for each of the user_id-

>>> win_1 = Window().partitionBy("user_id", "domain", "isp_flag").orderBy((col("frequency").desc()))
>>> final_df = my_df.select("*", rank().over(win_1).alias("rank")).filter(col("rank")==1)
>>> final_df.show(5)   # this just gives me the original dataframe back

What am I doing wrong here? How do I get to the final output_df I printed above?

0

1 Answer 1

1

IIUC, you can try the following: calculate the max_frequencies (max_0, max_1) for each user having isp_flag == 0 or 1 respectively. and then filter by condition max_0 < 0.25*max_1 and plus frequency in (max_1, max_0) to select only the records with maximum frequency.

from pyspark.sql import Window, functions as F

# set up the Window to calculate max_0 and max_1 for each user
# having isp_flag = 0 and 1 respectively
w1 = Window.partitionBy('user_id').rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df.withColumn('max_1', F.max(F.expr("IF(isp_flag==1, frequency, NULL)")).over(w1))\ 
  .withColumn('max_0', F.max(F.expr("IF(isp_flag==0, frequency, NULL)")).over(w1))\ 
  .where('max_0 < 0.25*max_1 AND frequency in (max_1, max_0)') \ 
  .show() 
+-------+------------+--------+---------+-----+-----+                           
|user_id|      domain|isp_flag|frequency|max_1|max_0|
+-------+------------+--------+---------+-----+-----+
|    don|popsugar.com|       0|       10|   49|   10|
|    don|   fabio.com|       1|       49|   49|   10|
|  dylan| vodafone.it|       1|      448|  448|   49|
|  dylan|yolosite.net|       0|       49|  448|   49|
|    bob| eidsiva.net|       1|        5|    5|    1|
|    bob|   media.net|       0|        1|    5|    1|
+-------+------------+--------+---------+-----+-----+

Some Explanations per request:

  • the WindowSpec w1 is set to examine all records for the same user(partitionBy), so that the F.max() function will compare all rows based on the same user.

  • we use IF(isp_flag==1, frequency, NULL) to find frequency for rows having isp_flag==1, it returns NULL when isp_flag is not 1 and thus is skipped in F.max() function. this is an SQL expression and thus we need F.expr() function to run it.

  • F.max(...).over(w1) will take the max value of the result from executing the above SQL expression. this calculation is based on the Window w1.

Sign up to request clarification or add additional context in comments.

5 Comments

this worked great, thanks! could you explain the window function though? also, what does F.max(F.expr("IF(isp_flag==1, frequency, NULL)")) do here? and why is there a NULL in there?
@kev, added some explanations in the post.
thanks for the explanation! Also, why did we need AND frequency in (max_1, max_0) in the WHERE clause?
Hi, @kev, I saw you have rank()==1 in your filter, and frequency in (max_1, max_0) is basically to filter by the max values of frequency which does the same as rank() == 1. Just a reminder, you will get more than 2 rows per user if there are ties with max_1 or max_0.
BTW. you can remove .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) from w1 since it's default (unbounded) when there is no orderBy() clause in the WindowSpec.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.