6

I have two dataframes: a (~600M rows) and b (~2M rows). What is the best approach for joining b onto a, when using 1 equality condition and 2 inequality conditions on the respective columns?

  • a_1 = b_1
  • a_2 >= b_2
  • a_3 >= b_3

I have explored the following paths so far:

  • Polars:
    • join_asof(): only allows for 1 inequality condition
    • join_where() with filter(): even with a small tolerance window, the standard Polars installation runs out of rows (4.3B row limit) during the join, and the polars-u64-idx installation runs out of memory (512GB)
  • DuckDB: ASOF LEFT JOIN: also only allows for 1 inequality condition
  • Numba: As the above didn't work, I tried to create my own join_asof() function - see code below. It works fine but with increasing lengths of a, it becomes prohibitively slow. I tried various different configurations of for/ while loops and filtering, all with similar results.

Now I'm running a bit out of ideas... What would be a more efficient way to implement this?

Thank you

import numba as nb
import numpy as np
import polars as pl
import time


@nb.njit(nb.int32[:](nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:]), parallel=True)
def join_multi_ineq(a_1, a_2, a_3, b_1, b_2, b_3, b_4):
    output = np.zeros(len(a_1), dtype=np.int32)

    for i in nb.prange(len(a_1)):

        for j in range(len(b_1) - 1, -1, -1):

            if a_1[i] == b_1[j]:

                if a_2[i] >= b_2[j]:

                    if a_3[i] >= b_3[j]:
                        output[i] = b_4[j]
                        break

    return output


length_a = 5_000_000
length_b = 2_000_000

start_time = time.time()
output = join_multi_ineq(a_1=np.random.randint(1, 1_000, length_a, dtype=np.int32),
                         a_2=np.random.randint(1, 1_000, length_a, dtype=np.int32),
                         a_3=np.random.randint(1, 1_000, length_a, dtype=np.int32),
                         b_1=np.random.randint(1, 1_000, length_b, dtype=np.int32),
                         b_2=np.random.randint(1, 1_000, length_b, dtype=np.int32),
                         b_3=np.random.randint(1, 1_000, length_b, dtype=np.int32),
                         b_4=np.random.randint(1, 1_000, length_b, dtype=np.int32))
print(f"Duration: {(time.time() - start_time):.2f} seconds")
3
  • The best place for improvement likely is within the equi join. Are the equi join columns' values duplicated? Heavily duplicated? A binary search within the equi join and subsequent binary searches within the non equi joins may provide some perf improvement. If U can, provide sample input dataframes, with expected output dataframe Commented Dec 28, 2024 at 6:39
  • did you check .join_where( ) ? Commented Dec 28, 2024 at 8:07
  • 1
    @rehaqds he mentioned it in no. 2 of his attempts Commented Dec 28, 2024 at 10:10

2 Answers 2

2

Using Numba here is a good idea since the operation is particularly expensive. That being said, the complexity of the algorithm is O(n²) though it is not easy to do much better (without making the code much more complex). Moreover, the array b_1, which might not fit in the L3 cache, is fully read 5_000_000 times making the code rather memory bound.

We can strongly speed up the code by building an index so not to travel the whole array b_1, but only the values where a_1[i] == b_1[j]. This is not enough to improve the complexity since a lot of j values fulfil this condition. We can improve the (average) complexity by building a kind of tree for all nodes of the index but in practice, this makes the code much more complex and the time to build the tree would be so big that it actually does not worth doing that in practice. Indeed, a basic index is enough to strongly reduce the execution time on the provided random dataset (with uniformly distributed numbers). Here is the resulting code:

import numba as nb
import numpy as np
import time

length_a = 5_000_000
length_b = 2_000_000

a_1=np.random.randint(1, 1_000, length_a, dtype=np.int32)
a_2=np.random.randint(1, 1_000, length_a, dtype=np.int32)
a_3=np.random.randint(1, 1_000, length_a, dtype=np.int32)
b_1=np.random.randint(1, 1_000, length_b, dtype=np.int32)
b_2=np.random.randint(1, 1_000, length_b, dtype=np.int32)
b_3=np.random.randint(1, 1_000, length_b, dtype=np.int32)
b_4=np.random.randint(1, 1_000, length_b, dtype=np.int32)

IntList = nb.types.ListType(nb.types.int32)

@nb.njit(nb.int32[:](nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:], nb.int32[:]), parallel=True)
def join_multi_ineq_fast(a_1, a_2, a_3, b_1, b_2, b_3, b_4):
    output = np.zeros(len(a_1), dtype=np.int32)
    b1_indices = nb.typed.Dict.empty(key_type=nb.types.int32, value_type=IntList)
    for j in range(len(b_1)):
        val = b_1[j]
        if val in b1_indices:
            b1_indices[val].append(j)
        else:
            lst = nb.typed.List.empty_list(item_type=np.int32)
            lst.append(j)
            b1_indices[val] = lst
    kmean = 0
    for i in nb.prange(len(a_1)):
        if a_1[i] in b1_indices:
            indices = b1_indices[a_1[i]]
            v2 = a_2[i]
            v3 = a_3[i]
            for k in range(len(indices) - 1, -1, -1):
                j = indices[np.uint32(k)]
                #assert a_1[i] == b_1[j]
                if v2 >= b_2[j] and v3 >= b_3[j]:
                    output[i] = b_4[j]
                    break
    return output

%time join_multi_ineq_fast(a_1, a_2, a_3, b_1, b_2, b_3, b_4)

Note that, in average, only 32 k values are tested (which is reasonable enough not to build a more efficient/complicated data structure). Also please note that the result is strictly identical to the one provided by the naive implementation.


Benchmark

Here are results on my i5-9600KF CPU (6 cores):

Roman's code:        >120.00 sec     (require a HUGE amount of RAM: >16 GiB)
Naive Numba code:      24.85 sec
This implementation:    0.83 sec     <-----

Thus, this implementation is about 30 times faster than the initial code.

Sign up to request clarification or add additional context in comments.

Comments

2

You can use DuckDB (Postgresql) distinct on clause:

import duckdb

df_res = duckdb.sql("""
    select distinct on (a.a1, a.a2, a.a3)
        a.a1,
        a.a2,
        a.a3,
        b.b4
    from df_a as a
        inner join df_b as b on
            a.a1 = b.b1 and
            a.a2 >= b.b2 and
            a.a3 >= b.b3
    order by
        b.b2 desc,
        b.b3 desc
""").pl()

You could also try to use pl.DataFrame.join_where() but in lazy mode. I'm assuming your 'a' dataframe has unique key, in this example case - a1,a2,a3.

df_res = (
    df_a.lazy()
    .join_where(
        df_b.lazy(),
        pl.col.a1 == pl.col.b1,
        pl.col.a2 >= pl.col.b2,
        pl.col.a3 >= pl.col.b3
    )
    .sort(["a1","a2","a3","b2","b3"])
    .drop(["b2","b3"])
    .unique(["a1","a2","a3"], keep="first")
).collect()

If none of these work, you could try to split one of the frames into N chunks with pl.DataFrame.partition_by(), process chunks separately and then use pl.concat() to concat them back.

N = 20

df_a_list = (
    df_a
    .with_columns(r = pl.int_range(pl.len()) * N // pl.len())
    .partition_by("r", include_key=False)
)

df_res = pl.concat([
    df_a_t.join_where(
        df_b,
        pl.col.a1 == pl.col.b1,
        pl.col.a2 >= pl.col.b2,
        pl.col.a3 >= pl.col.b3
    )
    .sort(["a1","a2","a3","b2","b3"])
    .drop(["b2","b3"])
    .unique(["a1","a2","a3"], keep="first")
    for df_a_t in df_a_list
])

4 Comments

I got an error on the first code (Argument 'name' has incorrect type). The code 2 and 3 takes >16 GiB of RAM (huge). The code 2 takes so much RAM that my machine was frozen (>22 GiB). Moreover, the Polar codes are slower than the one of the OP (requiring nearly no additional memory space)...
yep, lazy processing of large dataframes doesn't always work as expected. have you tried the one with partitioning? maybe you can increase amount of chunks?
Thank you Roman for your suggestions! The DISTINCT ON/ ORDER BY query is quite elegant and it ran in ~2min on the actual data. However, Jérôme's suggestion ended up being quite a bit faster (~30sec, incl. all the pre- and post-processing required for numba) so I'll mark that one as the answer.
@usdn interesting, thanks. I guess unless there's proper way of doing "asof" join on multiple columns it's hard to beat numba.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.