2

Below is a minimal implementation of a branch operator using the taskflow api.

The dag will execute either odd_task or even_task based on the string given by branch_on_condition. odd_task or even_task will also use the return_int value on which the branching decision was made. Then the final_task will be executed. All straight forward.

from airflow.decorators import dag, task


@dag(dag_id='Example_Dag_Simple_Branch')
def simple_dag():
    
    @task(task_id='return_int')
    def return_int():
        return 3


    @task.branch(task_id='branch_on_condition')
    def branch_on_condition(upstream_value):
        if upstream_value & 1:
            return 'odd_task'
        else:
            return 'even_task'
        
        
    @task(task_id='odd_task')
    def odd_task(input_val):
        print(f"{input_val} is an odd number")
        return input_val
    
    
    @task(task_id='even_task')
    def even_task(input_val):
        print(f"{input_val} is an even number")
        return input_val        


    @task(task_id='final_task', trigger_rule='one_success')
    def final_task():
        print('final task executed')
        return
    
    
    returned_int = return_int()
    branch_value = branch_on_condition(upstream_value=returned_int)
    even_task_return = even_task(input_val=returned_int)
    odd_task_return = odd_task(input_val=returned_int)
    final_return = final_task()
    
    branch_value >> [even_task_return, odd_task_return] >> final_return
    
simple_dag()

This logs either INFO - 3 is an odd number or INFO - 2 is an even number.

Why am I unable to implement a similar pattern using dynamically mapped tasks?

If the return_int task were to now return an n length list of integers return_list I can dynamically branch for this list fine, but BOTH branches are executed!

from airflow.decorators import dag, task


@dag(dag_id='Example_Dag_Dynamic_Branch')
def simple_dag():
    
    @task(task_id='return_list')
    def return_list():
        return [1,2,3,4,5,6,7]


    @task.branch(task_id='branch_on_condition')
    def branch_on_condition(upstream_value):
        if upstream_value & 1:
            return 'odd_task'
        else:
            return 'even_task'
        
        
    @task(task_id='odd_task')
    def odd_task(input_val):
        print(f"{input_val} is an odd number")
        return input_val
    
    
    @task(task_id='even_task')
    def even_task(input_val):
        print(f"{input_val} is an even number")
        return input_val        


    @task(task_id='final_task', trigger_rule='one_success')
    def final_task():
        print('final task executed')
        return
    
    
    returned_list = return_list()
    branch_value = branch_on_condition.expand(upstream_value=returned_list)
    even_task_return = even_task.expand(input_val=returned_list)
    odd_task_return = odd_task.expand(input_val=returned_list)
    final_return = final_task()
    
    branch_value >> [even_task_return, odd_task_return] >> final_return
    
simple_dag()

And so the output of the odd_task is for example 7 tasks all that say INFO - n is an odd number for n 1 through 7, which is of course not true.

Output of dag execution

I have tried

  • Packing the branch and downstream tasks into a task group.
  • Messing around with .expand() and .partial()
  • Reading related questions: 1. 2. 3.

None of these really answers the question!

Finally, I am aware that this behavior could literally just a be standalone if else block within an operator. But in reality the if else behavior could become quite complex and might be nice to separate into multiple tasks.

2
  • 1
    Hm... I just tested exacly your code and indeed both branches are executed, however each branch only runs corresponding odd/even tasks, rest are skipped ->[i.imgur.com/M3qAb3e.png]. That's with Airflow 2.10.5. Commented Apr 4 at 23:55
  • Thanks for checking it out, we're currently on 2.10.3. Your image link is indeed what I want to happen with task execution. I have edited the question to show the current output for us for the exact code written in the question. I'll update airflow to 2.10.5 and see what happens. Commented Apr 8 at 9:17

1 Answer 1

0

Yes this was an airflow version issue.

Thank you @Kombajn zbożowy for testing the code in airflow 2.10.5

I have since upgraded from 2.10.3 --> 2.10.5 and get the desired task outputs. This issue is probably the one highlighted on github here.

Despite attempts using version 2.10.3 to ensure trigger_rule='all_success' in the @task decorator args this bug was only fixed with upgrading.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.