1

I have been trying to make shards from .parquet file of size 20GB and my code tends to crash the system by the time it reaches the last shard but sometimes also works. I am quite new to working with Polars/Pyarrows etc and would love if someone could help me figure out why it behaves the way it does and how I can optimse it.

I am attaching the code of the function I am using :

def shard_parquet_to_arrow(file_path:str,shard_size:int,output_dir:str,shard_prefix:str):
    shard_size=int(shard_size)
    chunk_idx = 0
    total_rows = pq.ParquetFile(file_path).metadata.num_rows
    lf         = pl.scan_parquet(file_path)
    for offset in range(0, total_rows, shard_size):
        lf_chunk = lf.slice(offset, shard_size)
        try:
            lf_chunk.sink_ipc(Path(f"{output_dir}/{shard_prefix}_shard{chunk_idx}.arrow"))
            print(f"✅ Saved rows to shard {chunk_idx+1}.")
        except Exception as e:
            print("Caught exception:", e)
            import traceback
            traceback.print_exc()
            raise

        
        chunk_idx += 1

Any and all help is welcome. Thank you!

I have tried to do the same job with other modules/Classes from pyarrow/ polars.Dataset but each of them tend to make the system crash. Also I have tried to collect the dataframe before writing it locally but that does the same and is more memory heavy if I understand well. I am keeping a shard size of 10000 rows as my system has been able to do that well with other files.

Keep in mind that your text type functions also give an error due to one of the columns in the parquet file is a struct type.

some info about my system for context. i7 cpu 13th gen 32 GB RAM polars version 1.32.3

1 Answer 1

1

From a memory perspective, I believe the best you can do is to chunk it by row group rather than an arbitrary number of rows. Try something like this:

import pyarrow.parquet as pq
from pyarrow.feather import write_feather
parq=pq.ParquetFile(file_path)
row_groups = parq.num_row_groups
for rg in range(row_groups):
    tabl = parq.read_row_group(rg)
    write_feather(tabl, f"{output_dir}/{shard_prefix}_shard{chunk_idx}.arrow")

If you need your destination files to be an exact size that differs from the row group size you'd have to add in some more logic using slice and a buffer that can persist between for iterations.

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you for your response but the read_row_group(rg) call returns the following error message : File "pyarrow\_parquet.pyx", line 1642, in pyarrow._parquet.ParquetReader.read_row_group File "pyarrow\_parquet.pyx", line 1678, in pyarrow._parquet.ParquetReader.read_row_groups File "pyarrow\\error.pxi", line 92, in pyarrow.lib.check_status pyarrow.lib.ArrowNotImplementedError: Nested data conversions not implemented for chunked array outputs
Where did the file come from? Can you try with a different file?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.