2

I have a Spark DataFrame with the following structure:

shock_rule_id DATE value
A 2024-01-01 100
A 2024-01-02 null
A 2024-01-03 130
B 2024-01-01 50
B 2024-01-02 null
B 2024-01-03 null
B 2024-01-04 80

I want to perform linear interpolation of the value column within each shock_rule_id group.

I don’t want to use Pandas UDF – I’d like to do this using only Spark API / SQL functions.

My DataFrame contains only business-day dates (no weekends/holidays).

I already wrote one approach using Spark SQL window functions (first, last, row numbers, etc.) like this (simplified):

# Row numbers to simulate index positions
df_pos = (
    result_df
    .withColumn("row_num", row_number().over(w))
    .withColumn("prev_value", last("value", ignorenulls=True).over(w))
    .withColumn("prev_row", last("row_num", ignorenulls=True).over(w))
    .withColumn("next_value", first("value", ignorenulls=True).over(w.rowsBetween(0, Window.unboundedFollowing)))
    .withColumn("next_row", first("row_num", ignorenulls=True).over(w.rowsBetween(0, Window.unboundedFollowing)))
)

df_interp = (
    df_pos.withColumn(
        "interpolated_value",
        when(
            col("value").isNotNull(), col("value")
        ).otherwise(
            col("prev_value")
            + (col("next_value") - col("prev_value"))
            * ((col("row_num") - col("prev_row"))
               / when((col("next_row") - col("prev_row")) == 0, 1)
               .otherwise(col("next_row") - col("prev_row")))
        )
    )
)

# Final result
result = df_interp.select("shock_rule_id", "DATE", "interpolated_value")


But the output is not matching my expectation i mean not maching with pandas udf output

def interpolate(pdf):
    pdf = pdf.sort_values('DATE')
    # if pdf['InterpolationType'].iloc[0] == 'linear':
    pdf['value'] = pdf['value'].interpolate(method='linear')
    pdf['shock_rule_id'] = pdf['shock_rule_id'].astype(int)
    pdf['DATE'] = pd.to_datetime(pdf['DATE'])  # Ensure DATE is datetime
    return pdf[['shock_rule_id', 'DATE', 'value']]  # Only necessary columns

# Interpolate only where needed
result_interpolated = df_to_interpolate.groupby('shock_rule_id').applyInPandas(
    interpolate, schema="shock_rule_id int, DATE date, value double"
)

# Union with groups that had no missing values
result = result_interpolated.unionByName(df_no_missing.select('shock_rule_id', 'DATE', 'value'))

1 Answer 1

0

Here is a solution:

df = df.withColumn("date_ts", F.unix_timestamp("DATE", "yyyy-MM-dd").cast("long"))

windowSpec = Window.partitionBy("shock_rule_id").orderBy("date_ts")

lastKnownValue = F.last("value", ignorenulls=True).over(windowSpec.rowsBetween(Window.unboundedPreceding, -1))
lastKnownDate = F.last(F.when(F.col("value").isNotNull(), F.col("date_ts")), ignorenulls=True).over(windowSpec.rowsBetween(Window.unboundedPreceding, -1))
nextKnownValue = F.first("value", ignorenulls=True).over(windowSpec.rowsBetween(1, Window.unboundedFollowing))
nextKnownDate = F.first(F.when(F.col("value").isNotNull(), F.col("date_ts")), ignorenulls=True).over(windowSpec.rowsBetween(1, Window.unboundedFollowing))

df = df.withColumn("lastKnownValue", lastKnownValue) \
    .withColumn("lastKnownDate", lastKnownDate) \
    .withColumn("nextKnownValue", nextKnownValue) \
    .withColumn("nextKnownDate", nextKnownDate)
  
df = df.withColumn(
    "value",
    F.when(F.col("value").isNotNull(), F.col("value")).otherwise(
            F.col("lastKnownValue") + (F.col("nextKnownValue") - F.col("lastKnownValue")) * 
             (F.col("date_ts") - F.col("lastKnownDate")) / 
              (F.col("nextKnownDate") - F.col("lastKnownDate"))
        )
    ).drop("date_ts", "lastKnownValue", "lastKnownDate", "nextKnownValue", "nextKnownDate")

df.show()

Output:

+-------------+----------+-----+
|shock_rule_id|      DATE|value|
+-------------+----------+-----+
|            A|2024-01-01|100.0|
|            A|2024-01-02|115.0|
|            A|2024-01-03|130.0|
|            B|2024-01-01| 50.0|
|            B|2024-01-02| 60.0|
|            B|2024-01-03| 70.0|
|            B|2024-01-04| 80.0|
+-------------+----------+-----+

Here is a short explanation:

The formula of linear interpolation is:

Linear Interpolation(y) = y1 + ((y2−y1) / (x2−x1)) * (x−x1)

x1, y1 = Last value and Last date

x2, y2 = Next value and next date

x = Place where interpolation should be performed.

To get this in code, we need to first get x1, x2, y1, and y2.

lastKnownValue = F.last("value", ignorenulls=True).over(windowSpec.rowsBetween(Window.unboundedPreceding, -1))
lastKnownDate = F.last(F.when(F.col("value").isNotNull(), F.col("date_ts")), ignorenulls=True).over(windowSpec.rowsBetween(Window.unboundedPreceding, -1))
nextKnownValue = F.first("value", ignorenulls=True).over(windowSpec.rowsBetween(1, Window.unboundedFollowing))
nextKnownDate = F.first(F.when(F.col("value").isNotNull(), F.col("date_ts")), ignorenulls=True).over(windowSpec.rowsBetween(1, Window.unboundedFollowing))

df = df.withColumn("lastKnownValue", lastKnownValue) \
    .withColumn("lastKnownDate", lastKnownDate) \
    .withColumn("nextKnownValue", nextKnownValue) \
    .withColumn("nextKnownDate", nextKnownDate)
  

These columns are x1, x2, y1, y2 in order ("lastKnownValue" -> x1, etc)

Then here we do the calculation:

df = df.withColumn(
    "value",
    F.when(F.col("value").isNotNull(), F.col("value")).otherwise(
            F.col("lastKnownValue") + (F.col("nextKnownValue") - F.col("lastKnownValue")) * 
             (F.col("date_ts") - F.col("lastKnownDate")) / 
              (F.col("nextKnownDate") - F.col("lastKnownDate"))
        )
    ).drop("date_ts", "lastKnownValue", "lastKnownDate", "nextKnownValue", "nextKnownDate")

which at a deeper glance is the same as the formula shown above.

I've kept my solution short and to the point, so feel free to ask any questions!

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.