I'm generating dynamically based on JSON files some DAGs.
I'm creating a WHILE loop system with TriggerDagRunOperator (with wait_for_completion=True), triggering a DAG which self-calls itself until a condition met (also with TriggerDagRunOperator).
However, when I create this "sub-DAG" (it is not technically a SubDagOperator, but you get the idea), and create tasks inside that sub-DAG, I also catch every implicit TaskGroup that were above my WHILE loop. So my tasks inside the "independent" sub-DAG are expecting for a group that doesn't exist in their own DAG, but only exists in the main DAG.
Is there a way to specify to ignore every implicit TaskGroup when creating a task?
I have to migrate some workflow logic to Airflow, but the workflow logic can have WHILE blocs inside.
I tried to set task_group=None to none, yet, I have this issue.
Like, if I'm doing:
if task_group:
print(f"[parse_wrkflw] task_group={task_group}")
else:
print(f"[parse_wrkflw] task_group is None")
print(f"[parse_wrkflw] task_id={task_id}")
task = PythonOperator(
task_id=task_id,
python_callable=execute_sql,
op_args=[sql_file],
dag=dag,
task_group=task_group
)
print(f"[parse_wrkflw] task.task_id={task.task_id}")
I will have:
INFO - [parse_wrkflw] task_group is None
INFO - [parse_wrkflw] task_id=_if_0_true_while_0_loop_content_call_sql_0_test_file
INFO - [parse_wrkflw] task.task_id=_if_0_true_tasks._if_0_true_while_0_loop_content_call_sql_0_test_file
…where you can see _if_0_true_tasks being the TaskGroup id.
That's because the source JSON will be like:
{
"wrkflw" : [ {
"typ" : "IF",
"el" : "cond1",
"children" : [ {
"typ" : "WHILE",
"el" : "cond2",
"children" : [ {
"typ" : "CALL SQL",
"el" : "test_file.sql"
} ]
} ]
} ]
}
…and when processing the typ "IF", I'm creating a TaskGroup to hold the children of that block:
with TaskGroup(group_id=f"{if_prefix}_true_tasks", task_group=task_group, dag=dag) as tg_true:
...
(creating the DAG recursively by calling the function parse_wrkflw on children, that creates tasks and groups based on the typ of the current elements)
The idea with the WHILE logic is this one:
{previous tasks} >> entry_task (EmptyOperator) >> condition_task (BranchPythonOperator, will check if the WHILE condition is met and return the next task_id to run between trigger_loop_dag and end_loop) >> [trigger_loop_dag (TriggerDagRunOperator with wait_for_completion=True) | end_loop (EmptyOperator with trigger_rule=TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS)] and trigger_loop_dag >> end_loop >> {next tasks}
trigger_loop_dag will call a DAG dedicated for the loop:
{children tasks of the WHILE} >> loop_branch_task (BranchPythonOperator, will check if the WHILE condition is met and return the next task_id to run between trigger_self and end_loop) >> [trigger_self (TriggerDagRunOperator with wait_for_completion=True, will call itself) | end_loop (EmptyOperator)]
Basically, this works perfectly fine if I don't have an implicit TaskGroup above the WHILE.
This is why I just need to find a way to tell Airflow to ignore the implicit TaskGroup when creating the {children tasks of the WHILE}.