I can make PySpark "work" no problem, but know very little and am very confused by documentation on performance. I have some source data partitioned by date, read it directory by directory (one parquet file in each of about 1-2gb) for a specified number of days, append to my data frame, select the fields I need, and run some SQL against it to write to new partitioned parquet files. I'm getting errors related to shuffle performance. I have pretty much any amount of memory, executors, and cores I could need, just don't understand which combination would be best for this scenario. I'm reading 365 days in this example, which equates to 365 parquet files here. I'd love some advice about shuffling, executors, cores, etc. My available setup is in the screenshot. Thanks for the help!
# source files partitioned by year, month, and day - each has a single file that is about 1 - 2GB each
SOURCE ="abfss://...my_files/Year={}/Month={}/Day={}";
DESTINATION = "abfss://...another_place";
# get a simple list of every day in the last DAYS_TO_LOAD days to iterate over the source files needed
DAYS_TO_LOAD = 365;
today = datetime.today();
start_date = today - timedelta(days = DAYS_TO_LOAD);
range_tuple = date_range_tuple(start_date, today);
print(range_tuple); # ((2024, 5, 19), (2024, 5, 20), (2024, 5, 21), (2024, 5, 22), ...)
# create a dataframe and append (via unionAll) each day to the dataframe
df = None;
for year, month, day in range_tuple:
if year >= 2023:
# right() just gets the last two characters for left-padding numbers with zeroes
path = SOURCE.format(str(year), right("0" + str(month),2),right("0" + str(day),2));
print(path);
if df == None: # df hasn't been initialized yet
df = spark.read.load(path);
else: # append data to existing df
df = df.unionAll(spark.read.load(path));
df\
.select(\
"about 20 fields here",\
...
)\
.createOrReplaceTempView("my_source");
# the partition field for the output year/month/day is different than the source date partition
df = df.withColumn('utc_date', from_unixtime(df.event_time_epoch / 1000, "yyyy-MM-dd"))\
print(df.rdd.getNumPartitions());
# I've tried both coalesce and repartition without much difference
# df = df.coalesce(DAYS_TO_LOAD);
# print(df.rdd.getNumPartitions());
df = df.repartition("utc_date");
print(df.rdd.getNumPartitions());
dfResults = spark.sql(SOURCE_QUERY); # references the "my_source" temp view above
dfResults\
.write\
.partitionBy("Year", "Month", "Day")\
.parquet(DESTINATION, mode = 'overwrite');

unionAll. Read the entire data set first, thendf = df.filter(df.year==2024)This should reduce a lot of overhead, and then try repartitioning to a large number before writing, such as 1000.