0

I have a quite large dask dataframe mydataframe and a numpy array mycodes. I want to filter the rows of mydataframe and keep only those where the column CODE is not in mycodes. I reseted the index of the dataframe so that partitions are known, as I read it was important after I got an error. I tried the following code

is_new = ~mydataframe["CODE"].isin(mycodes).compute().values.flatten()
new_codes = aduanas.loc[is_new,"codigo_nc"].drop_duplicates().compute()

and variations. I get errors regarding the number of partitions or the length of the index I pass to filter... I have tried other approaches and got other errors, sometimes assertion errors. I can't seem to be able to do something as simple as filtering the rows of a dataframe.

Forgive the lack of concrete examples, but I don't find them really necessary, the question is really general: can anyone please give me some indications on how to filter the rows of a large dask dataframe? The things I need to take into account, or limitations.

You can find the data I am working with for mydataframe here. I am testing with the data in the first zip file. It's a fwf and you have the design in this gist. The only relevant variable is CODE, which I read it as string. For mycodes you can try any subset.

2
  • Can you provide a small sample of your dataframe and codes to keep? Commented Feb 6 at 16:32
  • @globglogabgalab I added a link to the data :) Commented Feb 7 at 8:41

1 Answer 1

0

Based on the information you've provided, I'd probably do something like this:

import dask.dataframe as dd
import numpy as np

# Example: Large Dask DataFrame
df = dd.read_parquet("mydata.parquet")  # Replace with your actual data source

# Example: NumPy array of codes to exclude
mycodes = np.array([1001, 1002, 1003])

# Convert NumPy array to a list for better compatibility
mycodes_list = mycodes.tolist()

# Filter rows where CODE is NOT in mycodes
filtered_df = df[~df["CODE"].isin(mycodes_list)]

# Trigger computation if needed
# This pulls the result into memory
result = filtered_df.compute()
print(result)

Regrading best practices, it's best to use compute sparingly (dask docs link).

Sign up to request clarification or add additional context in comments.

1 Comment

If I run this I get an AssertionError that traces back to this check if not self._broadcast_dep(arg): assert arg.divisions == dependencies[0].divisions that dask performs within the optimizer. It's triggered when running compute()

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.