0

I have a user case like this. I have a list of many queries. I am running multi-threading with pyspark with each thread submitting some sql.

There are some queries that report success but the final table has no data. Spark-ui has some tasks in stage that report OOM, but in the end it is successful. Have you encountered any similar cases or have any comments? spark-ui

def run_single_rule(self, log):
    try:
        dataset = self.spark.sql(sql_filter)
        result_count = dataset.count()
        print(
            f"""
              Statement for building rule [{log.rule_id}] result:
              {sql_filter}
              dataset after processed contains {result_count} records
              """
        )
        write_to_uc_table(dataset, self.job_output)

    except Exception:
        logger.warning(
            f"""
                       rule_id = {log.rule_id} has failed with log:
                       {full_error_log}
                       """
        )
    return update_values
with ThreadPoolExecutor(max_workers=10) as executor:
    future_job_runs = {
        executor.submit(self.run_single_rule, query): query
        for query in not_processed_log
    }
    wait(future_job_runs)
    for future in as_completed(future_job_runs):
        log = future_job_runs[future]
        try:
            log_records.append(future.result())
        except Exception as exc:
            print(f"rule_id: {log.rule_id}  generated an exception: {exc}")
        else:
            print(
                f"Finist log_id: {log.rule_id} finised with result: {future.result()}"
            )
def write_to_uc_table(df, job_output: JobOutput):
    writer = df.write
    if job_output.partition_columns:
        writer = writer.partitionBy(job_output.partition_columns)
    if job_output.write_mode == WriteMode.OVERWRITE_PARTITION:
        writer = writer.option("partitionOverwriteMode", "dynamic")
        writer = writer.mode("overwrite")
    else:
        writer = writer.mode(job_output.write_mode)
    if job_output.options:
        for k, v in job_output.options.items():
            writer = writer.option(k, v)
    writer.saveAsTable(
        f"{job_output.target_catalog}.{job_output.target_schema}.{job_output.target_table}"
    )

There are some queries that report success but the final table has no data for example:

Statement for building rule [10030006] result:
SELECT ... FROM ...
dataset after processed contains 650048 records
Finist log_id: 10030006 finised with result: {'job_start_time': datetime.datetime(2025, 10, 31, 1, 7, 2, 469565, tzinfo=datetime.timezone.utc), 'log_id': '5763b7d8-b5ee-11f0-a43c-00163eb2a776', 'result_record_count': None, 'state': 'FAILED', 'job_end_time': datetime.datetime(2025, 10, 31, 1, 7, 25, 763043, tzinfo=datetime.timezone.utc)}

1 Answer 1

0

Best Approach : Use a Spark-level parallelism instead of Python threads where possible

  • If you can reformulate your "many queries" problem into a single Spark job for example , union all queries, or parameterize them... , Spark will naturally parallelize work across its executors.

If you must run separate queries / jobs in parallel from Python... then use fair scheduling

from spark docs

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.set("spark.scheduler.mode", "FAIR")
val sc = new SparkContext(conf)

or --conf alternative for this same property

its bad habit it use threads in spark jobs its undeterministic experts wont suggest this

Further reading : see my post here https://stackoverflow.com/a/53236535/647053

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.