I have a Spark DataFrame with the following structure:
| shock_rule_id | DATE | value |
|---|---|---|
| A | 2024-01-01 | 100 |
| A | 2024-01-02 | null |
| A | 2024-01-03 | 130 |
| B | 2024-01-01 | 50 |
| B | 2024-01-02 | null |
| B | 2024-01-03 | null |
| B | 2024-01-04 | 80 |
I want to perform linear interpolation of the value column within each shock_rule_id group.
I don’t want to use Pandas UDF – I’d like to do this using only Spark API / SQL functions.
My DataFrame contains only business-day dates (no weekends/holidays).
I already wrote one approach using Spark SQL window functions (first, last, row numbers, etc.) like this (simplified):
# Row numbers to simulate index positions
df_pos = (
result_df
.withColumn("row_num", row_number().over(w))
.withColumn("prev_value", last("value", ignorenulls=True).over(w))
.withColumn("prev_row", last("row_num", ignorenulls=True).over(w))
.withColumn("next_value", first("value", ignorenulls=True).over(w.rowsBetween(0, Window.unboundedFollowing)))
.withColumn("next_row", first("row_num", ignorenulls=True).over(w.rowsBetween(0, Window.unboundedFollowing)))
)
df_interp = (
df_pos.withColumn(
"interpolated_value",
when(
col("value").isNotNull(), col("value")
).otherwise(
col("prev_value")
+ (col("next_value") - col("prev_value"))
* ((col("row_num") - col("prev_row"))
/ when((col("next_row") - col("prev_row")) == 0, 1)
.otherwise(col("next_row") - col("prev_row")))
)
)
)
# Final result
result = df_interp.select("shock_rule_id", "DATE", "interpolated_value")
But the output is not matching my expectation i mean not maching with pandas udf output
def interpolate(pdf):
pdf = pdf.sort_values('DATE')
# if pdf['InterpolationType'].iloc[0] == 'linear':
pdf['value'] = pdf['value'].interpolate(method='linear')
pdf['shock_rule_id'] = pdf['shock_rule_id'].astype(int)
pdf['DATE'] = pd.to_datetime(pdf['DATE']) # Ensure DATE is datetime
return pdf[['shock_rule_id', 'DATE', 'value']] # Only necessary columns
# Interpolate only where needed
result_interpolated = df_to_interpolate.groupby('shock_rule_id').applyInPandas(
interpolate, schema="shock_rule_id int, DATE date, value double"
)
# Union with groups that had no missing values
result = result_interpolated.unionByName(df_no_missing.select('shock_rule_id', 'DATE', 'value'))