0

I have a dag, which runs several sql scripts on certain tables. There are two options to run this dag:

  1. On the production tables
  2. On frozen archived tables

I want to be able to select which tables to use based on a dag_run param value. The general structure is something like this:


from datetime import datetime, timedelta, date
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import DagRun
from airflow.decorators import task

data_dict = {
    'prod':{'tab_1': 'table_name_1','tab_2': 'table_name_2'},
    'arch':{'tab_1': 'table_name_1_arch', 'tab_2': 'table_name_2_arch'}
}


with DAG(
        'sgk_test_2',
        description='sgk_test_2',
        tags=["sgk_test"],
        schedule_interval=None,
        start_date=datetime(2025, 7, 1),
        default_args={
            'retries': 0,
            'retry_delay': timedelta(minutes=1),
            'conn_id': 'sgk_gp_tau_pvr'
        },
        params={
            'tab_type':'',
        }
) as dag:

    @task(task_id='task_0')
    def get_type(**context):
        params = context.get('params', {})
        tab_type = params.get('tab_type')
        return tab_type

    tab_type = get_type()

    task_1 = SQLExecuteQueryOperator(
        task_id='task_1',
        sql=f"select * from {data_dict[tab_type]['tab_1']}"
    )
    
    task_2 = SQLExecuteQueryOperator(
        task_id='task_2',
        sql=f"select * from {data_dict[tab_type]['tab_2']}"
    )

    task_0 >> task_1 >> task_2

The code above give the following error:

    sql=f"select * from {data_dict[tab_type]['tab_1']}"
                         ~~~~~~~~~^^^^^^^^^^
TypeError: unhashable type: 'PlainXComArg'

After on google for a couple of hours I was unable to solve the problem. Please help.

0

2 Answers 2

0
Instead of global variables , it would be more convenient and recommended to use variables

keep your data dict in an airflow variable with key data_dict and value as


{
    'prod':{'tab_1': 'table_name_1','tab_2': 'table_name_2'},
    'arch':{'tab_1': 'table_name_1_arch', 'tab_2': 'table_name_2_arch'}
}



from datetime import datetime, timedelta, date
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import DagRun
from airflow.decorators import task
from airflow.models import Variable

with DAG(
        'sgk_test_2',
        description='sgk_test_2',
        tags=["sgk_test"],
        schedule_interval=None,
        start_date=datetime(2025, 7, 1),
        default_args={
            'retries': 0,
            'retry_delay': timedelta(minutes=1),
            'conn_id': 'sgk_gp_tau_pvr'
        }
) as dag:

    @task(task_id='task_0')
    def get_type(**context):
        data_dict = Variable.get('data_dict')
        table_info = data_dict.get(context["params"].get("tab_type"))
        return table_info

    tab_type = get_type()

    task_1 = SQLExecuteQueryOperator(
        task_id='task_1',
        sql=f"select * from {table_info['tab_1']}"
    )
    
    task_2 = SQLExecuteQueryOperator(
        task_id='task_2',
        sql=f"select * from {table_info['tab_2']}"
    )

    task_0 >> task_1 >> task_2
Sign up to request clarification or add additional context in comments.

Comments

0

The issue with the current code is that you try to access tab_type return value during the parsing of the Dag - which is not possible as the value has not been evaluated yet.

While the option of using Airflow variables might be suitable for this case as Abhishek's answer suggested, you could modify the current code by accessing the tab_type inside get_type, as follows:

from datetime import datetime, timedelta, date
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import DagRun
from airflow.decorators import task

data_dict = {
    'prod':{'tab_1': 'table_name_1','tab_2': 'table_name_2'},
    'arch':{'tab_1': 'table_name_1_arch', 'tab_2': 'table_name_2_arch'}
}


with DAG(
        'sgk_test_2',
        description='sgk_test_2',
        tags=["sgk_test"],
        schedule_interval=None,
        start_date=datetime(2025, 7, 1),
        default_args={
            'retries': 0,
            'retry_delay': timedelta(minutes=1),
            'conn_id': 'sgk_gp_tau_pvr'
        },
        params={
            'tab_type':'',
        }
) as dag:

    @task(task_id='task_0')
    def get_type(**context):
        params = context.get('params', {})
        tab_type = params.get('tab_type')
        return data_dict[tab_type] # <- tab_type is available in this context, so you could use it as a key

    tab_type = get_type()

    task_1 = SQLExecuteQueryOperator(
        task_id='task_1',
        sql=f"select * from {tab_type['tab_1']}" # <- Changed from data_dict[tab_type]
    )
    
    task_2 = SQLExecuteQueryOperator(
        task_id='task_2',
        sql=f"select * from {tab_type['tab_2']}" # <- Same
    )

    task_0 >> task_1 >> task_2

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.