I’ve recently started working with Airflow and I’m using Airflow 2.9.1 with the Taskflow API. I’ve created a task group with dynamic task mapping and I’m using task decorators exclusively (no operators).
Here’s my issue: If one of the mapped tasks fails, I want all the subsequent tasks for that particular index in the task group to be skipped, but I still want the other indices to continue executing if their upstream task was successful. There is no data passed between the tasks.
To achieve this, I’ve set the downstream tasks’ trigger rule to all_done, so they can start even if there is a failure in one or many indexed tasks. However, since there is no input to the downstream task from the upstream task, it is running even when its respective upstream task has failed. The expected behavior is for these tasks to fail or skip based on the failure of their respective upstream task. I don’t want to skip the execution of all tasks; only the tasks at the respective indices in the dynamic task mapping should be skipped or failed.
Options Considered:
- Using XCom to send the task state to downstream tasks: This requires every task to know its upstream task’s name and send the state as input to the downstream task. This is feasible but adds complexity.
- Custom Trigger Rules: Airflow’s built-in trigger rules like all_done, one_failed, etc., don’t seem to cater to my specific requirement directly.
Question:
Is there an alternative approach to achieve the desired behavior without resorting to custom XCom handling? Or is there a way to leverage Airflow’s existing functionalities to handle this scenario more gracefully?
Any input or suggestions would be greatly appreciated.
Here is an example DAG
from airflow.decorators import task, dag, task_group
from airflow.utils.dates import days_ago
from airflow.operators.python import get_current_context
from airflow.exceptions import AirflowSkipException
@dag(schedule_interval=None, start_date=days_ago(1), catchup=False)
def skip_subsequent_tasks():
@task
def start_task():
return ['a', 'b', 'c'] # List of values for dynamic task mapping
@task_group
def my_task_group(value):
@task
def process_task(value):
if value == 'b':
raise ValueError("Intentional Failure")
if value == 'c':
raise AirflowSkipException("Intentional Skip")
return f"Processed {value}"
@task(trigger_rule='all_done')
def end_task():
context = get_current_context()
ti = context['ti']
current_task_index = ti.map_index
# TODO: This should fail for index of 1, with value of 'b'
# TODO: This should skip for index of 2, with value of 'c'
upstream_task_val = ti.xcom_pull(task_ids=f'my_task_group.process_task', map_indexes=current_task_index)
return f"Ended '{current_task_index}' with output '{upstream_task_val}'"
process_task(value) >> end_task()
start = start_task()
my_task_group.expand(value=start)
dag = skip_subsequent_tasks()
Below are some screenshots to reference:


