I have a dag, which runs several sql scripts on certain tables. There are two options to run this dag:
- On the production tables
- On frozen archived tables
I want to be able to select which tables to use based on a dag_run param value. The general structure is something like this:
from datetime import datetime, timedelta, date
from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.models import DagRun
from airflow.decorators import task
data_dict = {
'prod':{'tab_1': 'table_name_1','tab_2': 'table_name_2'},
'arch':{'tab_1': 'table_name_1_arch', 'tab_2': 'table_name_2_arch'}
}
with DAG(
'sgk_test_2',
description='sgk_test_2',
tags=["sgk_test"],
schedule_interval=None,
start_date=datetime(2025, 7, 1),
default_args={
'retries': 0,
'retry_delay': timedelta(minutes=1),
'conn_id': 'sgk_gp_tau_pvr'
},
params={
'tab_type':'',
}
) as dag:
@task(task_id='task_0')
def get_type(**context):
params = context.get('params', {})
tab_type = params.get('tab_type')
return tab_type
tab_type = get_type()
task_1 = SQLExecuteQueryOperator(
task_id='task_1',
sql=f"select * from {data_dict[tab_type]['tab_1']}"
)
task_2 = SQLExecuteQueryOperator(
task_id='task_2',
sql=f"select * from {data_dict[tab_type]['tab_2']}"
)
task_0 >> task_1 >> task_2
The code above give the following error:
sql=f"select * from {data_dict[tab_type]['tab_1']}"
~~~~~~~~~^^^^^^^^^^
TypeError: unhashable type: 'PlainXComArg'
After on google for a couple of hours I was unable to solve the problem. Please help.