After the answer by @Vaebhav realized the question was not set up correctly. Hence editing it with his code snippet.
I have the following table
from pyspark.sql.types import IntegerType,TimestampType,DoubleType
input_str = """
4219,2018-01-01 08:10:00,3.0,50.78,
4216,2018-01-02 08:01:00,5.0,100.84,
4217,2018-01-02 20:00:00,4.0,800.49,
4139,2018-01-03 11:05:00,1.0,400.0,
4170,2018-01-03 09:10:00,2.0,100.0,
4029,2018-01-06 09:06:00,6.0,300.55,
4029,2018-01-06 09:16:00,2.0,310.55,
4217,2018-01-06 09:36:00,5.0,307.55,
1139,2018-01-21 11:05:00,1.0,400.0,
2170,2018-01-21 09:10:00,2.0,100.0,
4218,2018-02-06 09:36:00,5.0,307.55,
4218,2018-02-06 09:36:00,5.0,307.55
""".split(",")
input_values = list(map(lambda x: x.strip() if x.strip() != '' else None, input_str))
cols = list(map(lambda x: x.strip() if x.strip() != 'null' else None, "customer_id,timestamp,quantity,price".split(',')))
n = len(input_values)
n_cols = 4
input_list = [tuple(input_values[i:i+n_cols]) for i in range(0,n,n_cols)]
sparkDF = sqlContext.createDataFrame(input_list,cols)
sparkDF = sparkDF.withColumn('customer_id',F.col('customer_id').cast(IntegerType()))\
.withColumn('timestamp',F.col('timestamp').cast(TimestampType()))\
.withColumn('quantity',F.col('quantity').cast(IntegerType()))\
.withColumn('price',F.col('price').cast(DoubleType()))
I want to calculate the aggergate as follows :
| trxn_date | unique_cust_visits | next_7_day_visits | next_30_day_visits |
|---|---|---|---|
| 2018-01-01 | 1 | 7 | 9 |
| 2018-01-02 | 2 | 6 | 8 |
| 2018-01-03 | 2 | 4 | 6 |
| 2018-01-06 | 2 | 2 | 4 |
| 2018-01-21 | 2 | 2 | 3 |
| 2018-02-06 | 1 | 1 | 1 |
where the
- trxn_date is date from the timestamp column,
- daily_cust_visits is unique count of customers,
- next_7_day_visits is a count of customers on a 7 day rolling window basis.
- next_30_day_visits is a count of customers on a 30 day rolling window basis.
I want to write the code as a single SQL query.