1

I've worked through this Polars plugins tutorial, which covers both cumulative iteration through a column using Rust's scan and working with structs as input and output. I got stuck trying to achieve the combination of the two.

Minimal example

A custom function custom_scan should take a struct with two integers a, b as the input and emit a struct composed of the cumulative sum of a+b and the cumulative sum of a-b. Of course there are easier ways to achieve the same outcome; the question is about the inner workings of the API here.

The call should look like this (run.py):

import polars as pl
import myplugin as mp

df = pl.DataFrame({"a": ["14", "15", "20"], "b": ["3","1","5"],})

df_eval = df.with_columns(pl.struct("a", "b").alias("struct_in"))\
.with_columns(mp.custom_scan("struct_in").alias("struct_out"))

print(df_eval)
┌───────┬─────────┬────────────┬────────────┐
│ a     ┆ b       ┆ struct_in  ┆ struct_out │
│ ---   ┆ ---     ┆ ---        ┆ ---        │
│ i64   ┆ i64     ┆ struct[2]  ┆ struct[2]  │
╞═══════╪═════════╪════════════╪════════════╡
│ 14    ┆ 3       ┆ {14,3}     ┆ {17,11}    │
│ 15    ┆ 1       ┆ {15,1}     ┆ {33,25}    │
│ 20    ┆ 5       ┆ {20,5}     ┆ {58,40}    │
└───────┴─────────┴────────────┴────────────┘

This is how far I got:

myplugin/__init__.py

def custom_scan(struct_in: IntoExprColumn) -> pl.Expr:
    return register_plugin_function(
        args=[struct_in],
        plugin_path=LIB,
        function_name="custom_scan",
        is_elementwise=False,
    )

src/expressions.rs

use polars::prelude::*;
use pyo3_polars::derive::polars_expr;

fn custom_scan_output(_input_fields: &[Field]) -> PolarsResult<Field> {
    Ok(Field::new(
        "struct_out".into(),
        DataType::Struct(vec![
            Field::new("cum_sum".into(), DataType::Int64),
            Field::new("cum_diff".into(), DataType::Int64),
        ]),
    ))
}

#[polars_expr(output_type_func=custom_scan_output)]
fn custom_scan(inputs: &[Series]) -> PolarsResult<Series> { 
    struct ScanState {
        cum_sum: i64,
        cum_diff: i64,
    }
    let struct_in_chunked: &StructChunked = inputs[0].struct_()?;
    
    let out = struct_in_chunked
        .iter()
        .scan(
            ScanState { cum_sum: 0_i64, cum_diff: 0_i64 },
            |state: &mut ScanState, struct_in: Option<StructType>| match struct_in {
                Some(struct_in) => {
                    state.cum_sum += struct_in.a + struct_in.b;
                    state.cum_diff += struct_in.a - struct_in.b;
                    Some(Some(*state))
                },
                None => Some(None),
            },
        )
        .collect_trusted();
    
    Ok(out.into_series())
}

This doesn't work, which makes sense, because Option<StructType> likely isn't the correct type, and there is no mapping to the actual output struct that's specified in custom_scan_output. How is it possible to achieve this functionality?

1 Answer 1

0

Here's something that at least passes the linter's checks geared towards an arbitrary number of columns in the struct.

fn cust_scan(inputs: &[Series]) -> PolarsResult<Series> {
    use polars_arrow::array::MutablePrimitiveArray;
    let struct_in_chunked = inputs[0].struct_()?;
    let fields = struct_in_chunked.fields_as_series();
    let fields_len = (&fields.len()).clone();
    let rows = (&fields[0].len()).clone();
    let mut arrs: Vec<MutablePrimitiveArray<i64>> = (0..fields_len)
        .into_iter()
        .map(|_| MutablePrimitiveArray::with_capacity(rows))
        .collect();

    let fields_ca: Vec<ChunkedArray<Int64Type>> = fields
        .into_iter()
        .map(|s| s.cast(&DataType::Int64).unwrap().i64().unwrap().clone())
        .collect();

    let mut cum_sum = 0i64;
    let mut cum_diff = 0i64;

    for r in 0..rows {
        for c in 0..fields_len {
            let val = fields_ca[c].get(r).unwrap_or(0i64);
            cum_sum += val;
            if c == 0 {
                cum_diff += val;
            } else {
                cum_diff -= val;
                arrs[c].push_value(cum_diff);
            }
        }
        arrs[0].push_value(cum_sum);
    }

    let out_cols: Vec<Column> = arrs
        .into_iter()
        .enumerate()
        .map(|(i, mut arr)| {
            let name = if i == 0 {
                String::from("cum_sum")
            } else {
                format!("cum_diff_{}", i)
            };
            Series::from_arrow(name.into(), arr.as_box())
                .unwrap()
                .into_column()
        })
        .collect();

    let out = StructChunked::from_columns("out".into(), rows, &out_cols)?
        .into_series();
    Ok(out)
}

The main crux is that you need to extract all the nestedness of the struct, do your math, and then nest up your results in a new struct.

Another approach you could try is to turn the struct into a dataframe and then use its rowwise iteration, just a quick snippet of that might start with

    let df=inputs[0].into_frame();
    let rows = df.shape.0;
    for i in (0..rows) {
        let row=df.get_row(idx).unwrap().0;
        let first_col = row[0];
        let val = match first_col {
            AnyValue::Int64(x)=> x,
            AnyValue::Int32(x)=>x as i64,
            _=>panic!("do all the datatypes")
        }
     /// and so on
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.