I use the polars, urllib and tldextract packages in python to parse 2 columns of URL strings in zstd-compressed parquet files (averaging 8GB, 40 million rows). The parsed output include the scheme, netloc, subdomain, domain, suffix, path, query, and fragment. On my PC, processing a single file takes about 16 minutes (64GB RAM, 32 logical cores). Input data is read from one SSD (x:/) and output is written to a separate SSD (y:/).
My code doesn't use multiprocessing. It relies on polars' streaming and vectorization features for efficiency. RAM usage is near the limit when processing. I don't get the full benefit of polars' vectorization since urllib and tldextract are not native to polars' engine (rust).
Are there alternative approaches or modifications that would help speed processing? Is 16 minute processing time reasonable? I've considered creating a Rust extension for polars that has the same functionality of urllib and tldextract but that seems like reinventing the wheel.
The method below does the heavy lifting (python 3.12, polars 1.31)
def build_parsed_url_for_date(
event_date: str,
silver_root: Path,
gold_root: Path,
compression: str = "zstd",
) -> None:
"""
For a given event_date (YYYY-MM-DD):
- Read SILVER parquet from: silver_root / f"event_date={event_date}" / *.parquet
- Parse `url` and `referrer` columns into components (PSL-based).
- Write GOLD/parsed_url parquet to:
gold_root / "parsed_url" / f"event_date={event_date}" / "part.parquet"
"""
# input paths
silver_partition = silver_root / f"event_date={event_date}"
if not silver_partition.exists():
raise FileNotFoundError(f"SILVER partition not found: {silver_partition}")
silver_files = sorted(silver_partition.glob("*.parquet"))
if not silver_files:
raise FileNotFoundError(f"No parquet files in {silver_partition}")
# polars lazy frame
lf = pl.scan_parquet([str(f) for f in silver_files])
# wrappers for urllib and tldextract parsers
parser_url = make_polars_parser("url") # returns dict of parsed components
parser_ref = make_polars_parser("ref")
# Predicate pushdown
lf_parsed = (
lf.select(["id", "referrer", "url"])
.with_columns(
pl.col("url").map_elements(parser_url, return_dtype=url_struct_dtype("url")).alias("url_parsed"),
pl.col("referrer").map_elements(parser_ref, return_dtype=url_struct_dtype("ref")).alias("ref_parsed"),
)
.unnest("url_parsed")
.unnest("ref_parsed")
)
# Output path
gold_parsed_root = gold_root / "parsed_url"
out_dir = gold_parsed_root / f"event_date={event_date}"
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / "part.parquet"
# Stream/write to parquet
lf_parsed.sink_parquet(str(out_path), compression=compression)