0

I'm generating dynamically based on JSON files some DAGs.

I'm creating a WHILE loop system with TriggerDagRunOperator (with wait_for_completion=True), triggering a DAG which self-calls itself until a condition met (also with TriggerDagRunOperator).

However, when I create this "sub-DAG" (it is not technically a SubDagOperator, but you get the idea), and create tasks inside that sub-DAG, I also catch every implicit TaskGroup that were above my WHILE loop. So my tasks inside the "independent" sub-DAG are expecting for a group that doesn't exist in their own DAG, but only exists in the main DAG.

Is there a way to specify to ignore every implicit TaskGroup when creating a task?

I have to migrate some workflow logic to Airflow, but the workflow logic can have WHILE blocs inside.

I tried to set task_group=None to none, yet, I have this issue. Like, if I'm doing:

            if task_group:
                print(f"[parse_wrkflw] task_group={task_group}")
            else:
                print(f"[parse_wrkflw] task_group is None")
            print(f"[parse_wrkflw] task_id={task_id}")

            task = PythonOperator(
                task_id=task_id,
                python_callable=execute_sql,
                op_args=[sql_file],
                dag=dag,
                task_group=task_group
            )
            
            print(f"[parse_wrkflw] task.task_id={task.task_id}")

I will have:

INFO - [parse_wrkflw] task_group is None
INFO - [parse_wrkflw] task_id=_if_0_true_while_0_loop_content_call_sql_0_test_file
INFO - [parse_wrkflw] task.task_id=_if_0_true_tasks._if_0_true_while_0_loop_content_call_sql_0_test_file

…where you can see _if_0_true_tasks being the TaskGroup id. That's because the source JSON will be like:

{
  "wrkflw" : [ {
    "typ" : "IF",
    "el" : "cond1",
    "children" : [ {
      "typ" : "WHILE",
      "el" : "cond2",
      "children" : [ {
        "typ" : "CALL SQL",
        "el" : "test_file.sql"
      } ]
    } ]
  } ]
}

…and when processing the typ "IF", I'm creating a TaskGroup to hold the children of that block:

with TaskGroup(group_id=f"{if_prefix}_true_tasks", task_group=task_group, dag=dag) as tg_true:
    ...

(creating the DAG recursively by calling the function parse_wrkflw on children, that creates tasks and groups based on the typ of the current elements)

The idea with the WHILE logic is this one:

{previous tasks} >> entry_task (EmptyOperator) >> condition_task (BranchPythonOperator, will check if the WHILE condition is met and return the next task_id to run between trigger_loop_dag and end_loop) >> [trigger_loop_dag (TriggerDagRunOperator with wait_for_completion=True) | end_loop (EmptyOperator with trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)] and trigger_loop_dag >> end_loop >> {next tasks}

trigger_loop_dag will call a DAG dedicated for the loop:

{children tasks of the WHILE} >> loop_branch_task (BranchPythonOperator, will check if the WHILE condition is met and return the next task_id to run between trigger_self and end_loop) >> [trigger_self (TriggerDagRunOperator with wait_for_completion=True, will call itself) | end_loop (EmptyOperator)]

Basically, this works perfectly fine if I don't have an implicit TaskGroup above the WHILE.

This is why I just need to find a way to tell Airflow to ignore the implicit TaskGroup when creating the {children tasks of the WHILE}.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.