3

I created a BranchPythonOperator which calls 2 tasks depending on the condition like:

typicon_check_table = BranchPythonOperator(
    task_id='typicon_check_table',
    python_callable=CheckTable(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_create_table = PythonOperator(
    task_id='typicon_create_table',
    python_callable=CreateTable(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_load_data = PythonOperator(
    task_id='typicon_load_data',
    python_callable=LoadData(),
    provide_context=True,
    dag=typicon_task_dag)

typicon_check_table.set_downstream([typicon_load_data, typicon_create_table])
typicon_create_table.set_downstream(typicon_load_data)

This is the CheckTable callable class:

class CheckTable:
    """
    DAG task to check if table exists or not.
    """

    def __call__(self, **kwargs) -> None:
        pg_hook = PostgresHook(postgres_conn_id="postgres_docker")
        query = "SELECT EXISTS ( \
            SELECT 1 FROM information_schema.tables \
            WHERE table_schema = 'public' \
            AND table_name = 'users');"

        table_exists = pg_hook.get_records(query)[0][0]
        if table_exists:
            return "typicon_load_data"
        return "typicon_create_table"

The issue is both the tasks are getting skipped when the typicon_check_table task is run.

How to fix this issue?

enter image description here

3 Answers 3

3

I have worked out with same scenario , its working fine with me for below code

BranchPythonOperator(task_id='slot_population_on_is_y_or_n', python_callable=DAGConditionalValidation('Y'),
                         trigger_rule='one_success')
slot_population_on_is_y = DummyOperator(task_id='slot_population_on_is_y')
slot_population_on_is_n = DummyOperator(task_id='slot_population_on_is_n')
slot_population_on_is_y_or_n >> [slot_population_on_is_y, slot_population_on_is_n]


class DAGConditionalValidation:

    def __init__(self, conditional_param_key):
        self.conditional_param_key = conditional_param_key


    def __call__(self, **kwargs):
        if (conditional_param_key == 'Y'):
            return slot_population_on_is_y
        return slot_population_on_is_n

It looks all your code fine, but you're missing the trigger rule, please set trigger rule as trigger_rule='one_success'.
This should work for you as well.

Sign up to request clarification or add additional context in comments.

1 Comment

ya thats true , i'm new contributor so need to learn the posting techniques :)
1

The task typicon_load_data has typicon_create_table as a parent and the default trigger_rule is all_success, so I am not surprised by this behaviour.

Two possible cases here:

  1. CheckTable() returns typicon_load_data, then typicon_create_table is skipped, but typicon_load_data being downstream is also skipped.
  2. CheckTable() returns typicon_create_table, that's executed and it triggers typicon_load_data which is skipped because it was the excluded branch.

I assume your screenshot is from case 1.?

Comments

0

Add a trigger_rule="all_done" rule to the typicon_check_table as below

typicon_check_table = BranchPythonOperator(
    task_id='typicon_check_table',
    python_callable=CheckTable(),
    provide_context=True,
    trigger_rule="all_done",
    dag=typicon_task_dag)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.