I have an Airflow DAG, that branches to whether to send an email or not. will_send_email_task is a BranchPythonOperator, that if len(message) > 0, it should go to the branch task send_email_notification_task. Otherwise, that task is skipped and will go straight to the DummyOperator join_task. The DAG works OK when the result of the branch is True (yes, it should send an email). However, the rest of the DAG is skipped when the result is False, which is not what the expected. The expected outcome if will_send_email_task is False should be that only send_email_notification_task is skipped/bypassed, but the rest of the flow continue as normal.
Here is the Airflow DAG snippet:
# this function determines whether to send an email or not
def will_send_email(push_task, **context):
message = context["task_instance"].xcom_pull(task_ids=push_task)
if len(message) > 0:
logging.info(f"email body: {message}")
context["task_instance"].xcom_push(key="message", value=message)
return 'send_email_notification_task'
else:
return 'join_task'
def some_python_callable(table_name, **context):
...
will_send_email_task = BranchPythonOperator(
task_id='will_send_email_task',
provide_context=True,
python_callable=will_send_email,
op_kwargs={'push_task': 'some_previous_task'},
dag=dag
)
join_task = DummyOperator(
task_id='join_task',
dag=dag
)
send_email_notification_task = EmailOperator(
task_id='send_email_notification_task',
to=default_args['email'],
subject="some email subject",
html_content="{{ task_instance.xcom_pull(key='message') }}",
dag=dag
)
end_task = DummyOperator(
task_id='end_task',
dag=dag
)
...
for table, val in some_dict.items():
offload_task = PythonOperator(
task_id = f"offload_{table}_task",
dag=dag,
provide_context=True,
python_callable=some_python_callable,
op_kwargs={'table_name': table}
)
offload_task.set_upstream(join_task)
offload_task.set_downstream(end_task)
How should I configure my DAG so it would still run as expected?
