0

I can make PySpark "work" no problem, but know very little and am very confused by documentation on performance. I have some source data partitioned by date, read it directory by directory (one parquet file in each of about 1-2gb) for a specified number of days, append to my data frame, select the fields I need, and run some SQL against it to write to new partitioned parquet files. I'm getting errors related to shuffle performance. I have pretty much any amount of memory, executors, and cores I could need, just don't understand which combination would be best for this scenario. I'm reading 365 days in this example, which equates to 365 parquet files here. I'd love some advice about shuffling, executors, cores, etc. My available setup is in the screenshot. Thanks for the help!

# source files partitioned by year, month, and day - each has a single file that is about 1 - 2GB each
SOURCE ="abfss://...my_files/Year={}/Month={}/Day={}";
DESTINATION = "abfss://...another_place";

# get a simple list of every day in the last DAYS_TO_LOAD days to iterate over the source files needed
DAYS_TO_LOAD = 365;
today = datetime.today();
start_date = today - timedelta(days = DAYS_TO_LOAD);
range_tuple = date_range_tuple(start_date, today);

print(range_tuple); # ((2024, 5, 19), (2024, 5, 20), (2024, 5, 21), (2024, 5, 22), ...)

# create a dataframe and append (via unionAll) each day to the dataframe
df = None;

for year, month, day in range_tuple:
    if year >= 2023:
        # right() just gets the last two characters for left-padding numbers with zeroes
        path = SOURCE.format(str(year), right("0" + str(month),2),right("0" + str(day),2)); 

    print(path);
    
    
    if df == None: # df hasn't been initialized yet
        df = spark.read.load(path);
    else: # append data to existing df
        df = df.unionAll(spark.read.load(path));
        
df\
    .select(\
        "about 20 fields here",\
        ...
    )\
    .createOrReplaceTempView("my_source");
    
# the partition field for the output year/month/day is different than the source date partition
df = df.withColumn('utc_date', from_unixtime(df.event_time_epoch / 1000, "yyyy-MM-dd"))\

print(df.rdd.getNumPartitions());

# I've tried both coalesce and repartition without much difference
# df = df.coalesce(DAYS_TO_LOAD);
# print(df.rdd.getNumPartitions());
df = df.repartition("utc_date");
print(df.rdd.getNumPartitions());

dfResults = spark.sql(SOURCE_QUERY); # references the "my_source" temp view above

dfResults\
    .write\
    .partitionBy("Year", "Month", "Day")\
    .parquet(DESTINATION, mode = 'overwrite');

enter image description here

4
  • 2
    Try removing the unionAll. Read the entire data set first, then df = df.filter(df.year==2024) This should reduce a lot of overhead, and then try repartitioning to a large number before writing, such as 1000. Commented May 21 at 14:01
  • @Chris will do. Part of the problem I have is that I really don't want to load all of history, which goes back almost a decade. Would it help to read by month and union those, rather than each day? Commented May 22 at 12:55
  • 1
    I would take a few minutes to look up lazy evaluation and execution plans for spark, as well as the benefits of using parquet file format with spark. You'll see that even though you're saying to load all data and then filtering - spark will end up only loading the required data based on what operations you are running later on in the code and this should be way more efficient overall. Commented May 22 at 16:19
  • 1
    i second Chris. also read about "predicate pushdown" in spark/parquet. Commented May 29 at 10:29

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.