3

Let's consider the following input data

| incremental_id | session_start_id | session_end_id | items_bought |
|----------------|------------------|----------------|--------------|
| 1              | a                | b              | 1            |
| 2              | z                | t              | 7            |
| 3              | b                | c              | 0            |
| 4              | c                | d              | 3            |

Where:

  • Each row represents a user session
  • Each session records a start/end session id
  • We know the first 3 rows are associated to the same user because session_end_id = session_start_id. The 4th row is instead related to a second user

I want to be able to aggregate the above data so that I can get:

  • The first customer has bought 4 items
  • The second customer has bought 7 items

How could this be done in PySpark (or eventually in pure SQL)? I would like to avoid using UDFs in PySpark, but it's ok if that's the only way.

Thanks for your help!

Edit: I have update the example dataframe, the incremental_id alone cannot be used to order the rows as consecutive sessions

3
  • AFAI can see, two records in sequence can be associated by pulling value from previous row but I don't see how 3+ records can be associated in query alone. I expect UDF will be needed. Commented Dec 4, 2020 at 3:17
  • @jxc I think b.a. is not like that. It is rather 5,d,e,1 I assume. Commented Dec 4, 2020 at 13:56
  • looks like a typical question using graphframe.connectedComponents. or if the sample is just part of group, use pandas_udf with the same method in networkx. Commented Dec 4, 2020 at 14:12

3 Answers 3

1

Common Table Expressions are part of SQL:1999.

Using CTE, we can use the below query

WITH cte(session_start_id, session_end_id, items_bought) AS (
  select session_start_id, session_end_id, items_bought from user_session where session_start_id not in (
    select session_end_id from user_session)
UNION ALL
select a.session_start_id, b.session_end_id, b.items_bought from cte a 
  inner join user_session b on a.session_end_id = b.session_start_id)
  select session_start_id, sum(items_bought) from cte group by (session_start_id)

Explanation:

  • In the anchor query, select all the records that does not have a parent. (i.e., no other records ends with current session_start_id)
  • Recursively, join the cte's session_end_id with session_start_id from the table.
  • Group the records and return the result.

SQL Fiddle link: http://sqlfiddle.com/#!4/ac98a/4/0
(Note: Used Oracle in fiddle. But any DB engine that supports CTE should work).

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks, this seems to work but unfortunately I'm using prestodb and the specific version doesn't support recursive WITH statements as the query you suggested. So unfortunately I can't use your proposed approach
0

Here is a PySpark version

from pyspark.sql import Window
from pyspark.sql import functions as F
from pyspark.sql.types import *

# create a window over the full data so we can lag the session end id
win = Window().partitionBy().orderBy("incremental_id")

# This is logic to indicate a user change
df = df.withColumn('user_boundary', F.lag(F.col("session_end_id"), 1).over(win) != F.col("session_start_id"))
df = df.withColumn('user_boundary', F.when(F.col("user_boundary").isNull(), F.lit(False)).otherwise(F.col("user_boundary")))

# Now create an artificial user id
df = df.withColumn('user_id', F.sum(F.col("user_boundary").cast(IntegerType())).over(win))

# Aggregate
df.groupby('user_id').agg(F.sum(F.col("items_bought")).alias("total_bought")).show()

+-------+------------+
|user_id|total_bought|
+-------+------------+
|      0|           4|
|      1|           7|
+-------+------------+

1 Comment

Thanks, but unfortunately this solution with the lag(1) uses the assumption that incremental_id is indicative of subsequent sessions from the same user. I added that just for context but it should work regardless of that. I'm updating the example dataframe to make this more explicit
0

If you are able to access temp table creation and affected row count metadata then you can port this:

insert into #CTESubs
select
    session_start_id,
    session_end_id,
    items_bought
from #user_session
WHERE
    session_start_id not in (select session_end_id from #user_session)

while(@@ROWCOUNT <> 0)
begin
    insert into #CTESubs
    select distinct
        p.session_start_id,
        c.session_end_id,
        c.items_bought
    from #user_session c
        inner join #CTESubs p on c.session_start_id = p.session_end_id
    WHERE
        p.session_start_id not in (select session_end_id from #user_session) 
        and c.session_end_id not in (select session_end_id from #CTESubs)
end

select
    session_start_id,
    sum(items_bought) items_bought
from #CTESubs
group by 
    session_start_id;

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.