62

I have a Spark SQL DataFrame with date column, and what I'm trying to get is all the rows preceding current row in a given date range. So for example I want to have all the rows from 7 days back preceding given row. I figured out, I need to use a Window Function like:

Window \
    .partitionBy('id') \
    .orderBy('start')

I want to have a rangeBetween 7 days, but there is nothing in the Spark docs I could find on this. Does Spark even provide such option? For now I'm just getting all the preceding rows with:

.rowsBetween(-sys.maxsize, 0)

but would like to achieve something like:

.rangeBetween("7 days", 0)

3 Answers 3

104

Spark >= 2.3

Since Spark 2.3 it is possible to use interval objects using SQL API, but the DataFrame API support is still work in progress.

df.createOrReplaceTempView("df")

spark.sql(
    """SELECT *, mean(some_value) OVER (
        PARTITION BY id 
        ORDER BY CAST(start AS timestamp) 
        RANGE BETWEEN INTERVAL 7 DAYS PRECEDING AND CURRENT ROW
     ) AS mean FROM df""").show()

## +---+----------+----------+------------------+       
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Spark < 2.3

As far as I know it is not possible directly neither in Spark nor Hive. Both require ORDER BY clause used with RANGE to be numeric. The closest thing I found is conversion to timestamp and operating on seconds. Assuming start column contains date type:

from pyspark.sql import Row

row = Row("id", "start", "some_value")
df = sc.parallelize([
    row(1, "2015-01-01", 20.0),
    row(1, "2015-01-06", 10.0),
    row(1, "2015-01-07", 25.0),
    row(1, "2015-01-12", 30.0),
    row(2, "2015-01-01", 5.0),
    row(2, "2015-01-03", 30.0),
    row(2, "2015-02-01", 20.0)
]).toDF().withColumn("start", col("start").cast("date"))

A small helper and window definition:

from pyspark.sql.window import Window
from pyspark.sql.functions import mean, col


# Hive timestamp is interpreted as UNIX timestamp in seconds*
days = lambda i: i * 86400 

Finally query:

w = (Window()
   .partitionBy(col("id"))
   .orderBy(col("start").cast("timestamp").cast("long"))
   .rangeBetween(-days(7), 0))

df.select(col("*"), mean("some_value").over(w).alias("mean")).show()

## +---+----------+----------+------------------+
## | id|     start|some_value|              mean|
## +---+----------+----------+------------------+
## |  1|2015-01-01|      20.0|              20.0|
## |  1|2015-01-06|      10.0|              15.0|
## |  1|2015-01-07|      25.0|18.333333333333332|
## |  1|2015-01-12|      30.0|21.666666666666668|
## |  2|2015-01-01|       5.0|               5.0|
## |  2|2015-01-03|      30.0|              17.5|
## |  2|2015-02-01|      20.0|              20.0|
## +---+----------+----------+------------------+

Far from pretty but works.


* Hive Language Manual, Types

Sign up to request clarification or add additional context in comments.

5 Comments

I use Spark 2.3, but the first option doesn't work for me and throws exception scala.MatchError: CalendarIntervalType (of class org.apache.spark.sql.types.CalendarIntervalType$) There is a JIRA issue that will be fixed in 2.4.0: issues.apache.org/jira/browse/SPARK-25845
Hi, for the last query, can I ask you how to include the 'days'? I got "name 'days' is not defined".
@Spacez the "days" helper function is declared above as a lambda function that multiplies the argument by 86400 (one day in seconds).
Window.partitionBy(col("id"), pyspark.sql.functions.window("start", "1 day"))
@zero323, would you like to explain whey in window function, you add cast('timestamp').cast('long'), is cast('long') a must? thank you.
11

Spark 3.5 is released, but...

The answer may be as old as Spark 1.5.0: datediff.

datediff(col_name, '1000') will return an integer difference of days from 1000-01-01 to col_name.

As the first argument, it accepts dates, timestamps and even strings.
As the second, it even accepts 1000.


The answer

Date difference in days - depending on the data type of the order column:

date

  • Spark 3.5+

    .orderBy(F.unix_date("col_name")).rangeBetween(-7, 0)
    
  • Spark 3.1+

    .orderBy(F.expr("unix_date(col_name)")).rangeBetween(-7, 0)
    
  • Spark 2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

timestamp

  • Spark 2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

long - UNIX time in microseconds (e.g. 1672534861000000)

  • Spark 2.1+

    .orderBy(F.col("col_name") / 86400_000000).rangeBetween(-7, 0)
    

long - UNIX time in milliseconds (e.g. 1672534861000)

  • Spark 2.1+

    .orderBy(F.col("col_name") / 86400_000).rangeBetween(-7, 0)
    

long - UNIX time in seconds (e.g. 1672534861)

  • Spark 2.1+

    .orderBy(F.col("col_name") / 86400).rangeBetween(-7, 0)
    

long in format yyyyMMdd

  • Spark 3.5+

    .orderBy(F.unix_date(F.to_date("col_name", 'yyyyMMdd'))).rangeBetween(-7, 0)
    
  • Spark 3.3+

    .orderBy(F.expr("unix_date(to_date(col_name, 'yyyyMMdd'))")).rangeBetween(-7, 0)
    
  • Spark 3.1+

    .orderBy(F.expr("unix_date(to_date(cast(col_name as string), 'yyyyMMdd'))")).rangeBetween(-7, 0)
    
  • Spark 2.2+

    .orderBy(F.expr("datediff(to_date(cast(col_name as string), 'yyyyMMdd'), '1000')")).rangeBetween(-7, 0)
    
  • Spark 2.1+

    .orderBy(F.unix_timestamp(F.col("col_name").cast('string'), 'yyyyMMdd') / 86400).rangeBetween(-7, 0)
    

string in date format of 'yyyy-MM-dd'

  • Spark 3.5+

    .orderBy(F.unix_date(F.to_date("col_name"))).rangeBetween(-7, 0)
    
  • Spark 3.1+

    .orderBy(F.expr("unix_date(to_date(col_name))")).rangeBetween(-7, 0)
    
  • Spark 2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

string in other date format (e.g. 'MM-dd-yyyy')

  • Spark 3.5+

    .orderBy(F.unix_date(F.to_date("col_name", 'MM-dd-yyyy'))).rangeBetween(-7, 0)
    
  • Spark 3.1+

    .orderBy(F.expr("unix_date(to_date(col_name, 'MM-dd-yyyy'))")).rangeBetween(-7, 0)
    
  • Spark 2.2+

    .orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy'), '1000')")).rangeBetween(-7, 0)
    
  • Spark 2.1+

    .orderBy(F.unix_timestamp("col_name", 'MM-dd-yyyy') / 86400).rangeBetween(-7, 0)
    

string in timestamp format of 'yyyy-MM-dd HH:mm:ss'

  • Spark 2.1+

    .orderBy(F.expr("datediff(col_name, '1000')")).rangeBetween(-7, 0)
    

string in other timestamp format (e.g. 'MM-dd-yyyy HH:mm:ss')

  • Spark 2.2+

    .orderBy(F.expr("datediff(to_date(col_name, 'MM-dd-yyyy HH:mm:ss'), '1000')")).rangeBetween(-7, 0)
    

Different test cases in Spark 3.4+ can be created with this:

ints = F.expr("sequence(1, 10)").alias('ints')
dates = (
    date := F.expr("sequence(to_date('2000-01-01'), to_date('2000-01-10'))")
    # timestamp := F.expr("sequence(to_timestamp('2000-01-01'), to_timestamp('2000-01-10'))")
    # long_micro := F.expr("sequence(946684800000000, 947462400000000, 86400000000)")
    # long_milli := F.expr("sequence(946684800000, 947462400000, 86400000)")
    # long_secs := F.expr("sequence(946684800, 947462400, 86400)")
    # long_yyyyMMdd := F.expr("sequence(20000101, 20000110)")
    # str_unformatted_date := F.expr("transform(sequence(to_date('2000-01-01'), to_date('2000-01-10')), x -> string(x))")
    # str_formatted_date := F.expr("transform(sequence(to_date('2000-01-01'), to_date('2000-01-10')), x -> date_format(x, 'MM-dd-yyyy'))")
    # str_unformatted_ts := F.expr("transform(sequence(to_timestamp('2000-01-01'), to_timestamp('2000-01-10')), x -> string(x))")
    # str_formatted_ts := F.expr("transform(sequence(to_date('2000-01-01'), to_date('2000-01-10')), x -> date_format(x, 'MM-dd-yyyy HH:mm:ss'))")
).alias('col_name')
df = spark.range(1).select(F.inline(F.arrays_zip(ints, dates)))

1 Comment

How can we make this dynamic, i need to give the range starting as first day of the current month and end on current day of the month ?
6

Fantastic solution @zero323, if you want to operate with minutes instead of days as I have to, and you don't need to partition with id, so you only have to modify a simply part of the code as I show:

df.createOrReplaceTempView("df")
spark.sql(
    """SELECT *, sum(total) OVER (
        ORDER BY CAST(reading_date AS timestamp) 
        RANGE BETWEEN INTERVAL 45 minutes PRECEDING AND CURRENT ROW
     ) AS sum_total FROM df""").show()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.