1

I am a newbie and wanted to create a pipeline such that the Function given to Python Operator takes variables and using XCOM push approach, give these variables to Bash Operator. Using Bash Operator, I am trying to run the whole python script which requires these variables taken from XCOM push approach. Could someone help me to create this kind of pipeline.

I am attaching the broken code below. In the below code app.py is the python script which takes count_check and recon_check value. Also can a single bash command pull multiple variables?

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator

from airflow.utils.dates import datetime



args= {'owner': 'airflow', 'start_date': datetime(2022, 1, 1) }


def take_args(ti):
    count_check = 'Y'
    recon_check = 'Y'
    ti.xcom_push(key='count', value=count_check)
    ti.xcom_push(key='recon', value=recon_check)



with DAG(dag_id='data-validation-dag-python',
         default_args=args,
         schedule_interval='@daily',
         max_active_runs=1,
         catchup=False) as dag:

    task_1 = PythonOperator(
        task_id='Storing_Args',
        python_callable= take_args
    )

    task_2 = BashOperator(
            task_id='task_validation_checks',
            bash_command='cd /root/airflow/dags && python3 app.py 
            {{ti.xcom_pull(task_ids[\"Storing_Args\"]) }}',
            do_xcom_push=False

    )
    task_1 >> task_2

1 Answer 1

0

task_ids is an argument on the method xcom_pull. The correct syntax is:

task_2 = BashOperator(
    task_id='task_validation_checks',
    bash_command='cd /root/airflow/dags && python3 app.py {{ ti.xcom_pull(task_ids="Storing_Args") }}',
    do_xcom_push=False
)

Having task_ids[\"Storing_Args\"] will tell Python to look up a key named "Storing_Args" in the dictionary task_ids, which doesn't exist and will raise an error jinja2.exceptions.UndefinedError: 'task_ids' is undefined.

However, this will still not work in your case since you have pushed two XComs with keys count and recon. If no key is given, Airflow will assign a default key "return_value". When you then call ti.xcom_pull() without specifying a key, Airflow will look up XComs with the default key "return_value", which doesn't exist in your case. Therefore, ti.xcom_pull() must be called twice, once for each key:

task_2 = BashOperator(
    task_id="task_validation_checks",
    bash_command="cd /root/airflow/dags && python3 app.py {{ ti.xcom_pull(task_ids='foobar', key='count') }} {{ ti.xcom_pull(task_ids='foobar', key='recon') }}",
    do_xcom_push=False,
)
Sign up to request clarification or add additional context in comments.

8 Comments

@ Bas Harenslak thanks for the correction, but still the python script app.py is not able to take the values of count_check and recon_check. Could you please tell where I am going wrong?
You'll need to give more details to answer that question. For example: what is app.py, what did you try, and what is the error you're getting? Check the Airflow task logs to figure out what is being executed.
In app.py, I am running a loop which reads the Input(csv) file, takes the values corresponding to each column(where count_check and recon_check are columns) and perform the various checks corresponding to values taken (such as Y/N) on a table present in a database. This works fine if I run the app.py script in airflow using only bash operator. But now I don't want to take the values of count_check and recon_check from csv file and want to give it before running a dag, like I am trying now. I want these values to be catch by the app.py file. (1/2)
When I check the log file, it says no variable named count_check and recon_check since it is not able to catch the values. Is push and pulling values from XCOM right approach?(2/2)
I feel there's some missing code. In the code you've shared count_check and recon_check are defined. Please post relevant code and a stack trace. This helps others reproduce and understand the problem. If you want to set those values externally (not in code, but when triggering a DAG), you might be interested in dag_run.conf: airflow.apache.org/docs/apache-airflow/stable/….
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.