138 questions
1
vote
0
answers
40
views
Airflow ExternalTaskSensor waits forever even though the task has finished
Airflow ExternalTaskSensor waits forever after TriggerDagRunOperator
I just started learning Airflow and I have a problem with ExternalTaskSensor.
I have 2 DAGs:
A trigger_dag that waits for a file, ...
0
votes
0
answers
125
views
Airflow DAG Import errors even after clearing import_error table
I have these DAG Import Errors on my Airflow UI. I am working on a test server so I have free hand. I have deleted all records from task_instance; dag_run; import_error; serialized_dag; and dag; ...
2
votes
1
answer
266
views
In Airflow 2.10 can I use dynamic task mapping with BranchPythonOperator?
Below is a minimal implementation of a branch operator using the taskflow api.
The dag will execute either odd_task or even_task based on the string given by branch_on_condition. odd_task or even_task ...
0
votes
0
answers
65
views
NoneType' object is not callable [Airflow Dag Error while creating dependencies]
In my use case I want to trigger another DAG from one DAG. My first DAG is as below :-
with DAG(
dag_id = "data_insertion",
start_date = datetime(2024, 10, 8),
schedule_interval = SCHEDULE,
...
0
votes
0
answers
94
views
Airflow DockerOperator, Dynamic Tasks and XCom
I'm in airflow 2.10.2. I'm using a DockerOperator to generate a list of configuration (a list of dict). My goal is to pass those configurations to a TriggerDagOperator task, expanding the list to ...
0
votes
1
answer
128
views
How to make a DAG fail when we are raising exception in our python code?
I am new to Airflow DAGs. I have a python script to fetch API calls. I have added a raise Exception in my code, if all API calls failed i.e. status_code!=200, then it should raise exception. However, ...
0
votes
0
answers
155
views
Airflow 2.10.3 how to extract information about task instances?
I use Airflow 2.10.3 and I'd like to retrieve informations about when a task starts, ends, its status (e.g. success, fail) and similar attributes.
I found a PostgreSQL database with all the list of ...
0
votes
1
answer
61
views
Pass data to jinja template in .sql file from a CustomOperator(BaseOperator)
How can we pass data by using a custom operator task ?
Here is my code. The sql file does not take the value of the key table_name. The rendered template suposed to be
SELECT COUNT(*) FROM product;
...
0
votes
1
answer
83
views
Airflow @task decorator not having the correct order
I'm trying the new TaskFlow API
I have 4 @task then I would love to have at the end t1 >> t2 >> t3 >>> t4.
I endup doing :
t1 = task1()
t2 = task2(t1)
t3 = task3(t2)
t4 = task4(t2)...
0
votes
0
answers
57
views
How do Airflow sftpsensor push xcom value
Airflow v. 2.8.2
I need check several files on sftp at some day_time. If all files exist-all good go next task. But if can't find some files, i want send email with text ('Files: file_name1, ...
0
votes
1
answer
193
views
[Airflow]: Dynamic Task Mapping on DockerOperator using Xcoms
I am creating a dag that should do the following:
fetch event ids
for each event id, fetch event details ( DockerOperator )
The code below is my attempt to do what I want:
from datetime import ...
1
vote
3
answers
394
views
Dynamic Task Mapping for http operator and query string
Let's say that I have the following (simplified) dag: I have a task that returns a series of query parameters values, and I want to spawn a dynamic task instance of httpoperator to do a query like ...
1
vote
0
answers
307
views
Airflow dbt workflow run failed in docker
I'm totally new to apache airflow and dbt. I'm running airflow on docker and when I tried to run the dbt demo workflow, I get this error. I've managed to run the dbt model on my local. Any idea what ...
0
votes
1
answer
278
views
Airflow TriggerDagRunOperator randomly failing with parallel run
I have a DAG that is running another DAGs with TriggerDagRunOperator
trigger_1 = TriggerDagRunOperator(
task_id='trigger_dag1',
trigger_dag_id='job1',
wait_for_completion=True,
...
2
votes
0
answers
32
views
Is there a way to control maximum dag concurrency from UI param?
In airflow is it possible to control the concurrency of the DAG with an UI param?
As far as I know DAG run parameters are only know as task runtime, so it does not look like a good approach.
...
0
votes
0
answers
301
views
How to Handle Skipping Subsequent Tasks for Specific Indices in Dynamic Task Mapping in Airflow Task Groups
I’ve recently started working with Airflow and I’m using Airflow 2.9.1 with the Taskflow API. I’ve created a task group with dynamic task mapping and I’m using task decorators exclusively (no ...
0
votes
1
answer
418
views
Airflow XCom not retrieving values between tasks in DAG
I'm experiencing an issue with Apache Airflow where values pushed to XCom in one task are not retrievable in a subsequent task within the same DAG. Here is a minimal example of my code:
from airflow ...
0
votes
1
answer
522
views
Airflow - Dynamic mapped Task Group - Removing mapped task dependencies for all the sub task, and access mapped_input in task group directly
I am working with Airflow and have created a DAG that uses dynamic task mapping inside a task group.
I have two questions:
How to remove the line from get_files to process_file_step2?
I want ...
0
votes
1
answer
89
views
In Airflow how to avoid a DAG to run a child DAG which is disabled in the Airflow UI?
I was told to disable the dbt DAG in Airflow, which I did, but the dbt DAG in my case is called by a parent "main" DAG, which calls an "extract" DAG before calling the dbt one.
I ...
-1
votes
1
answer
76
views
the dag is getting triggered immediately when toggle is ON
This is my dag:
dag_pr_api = DAG(
dag_id='API-PERSONAL-COMPUTER',
default_args=args,
schedule_interval='30 07 * * *',
start_date=pendulum.yesterday('Europe/Berlin'),
catchup=False,
...
0
votes
1
answer
196
views
Can I use airflow tags to determine if all DAGs with a certain tag has finished running?
I have a number of individual DAGs that are not connected to each other. The one thing that 'connects' them are that they are tagged with US_TAG.
I am trying to run a new task after all of these DAGs (...
0
votes
1
answer
344
views
How can I create DatabricksSubmitRunOperator tasks dynamically in Airflow?
I have following use case:
I am trying to create an Airflow DAG which will be used to automate historical data load. I am passing 5 arguments to this DAG - jar_path, main_class_name, start_param, ...
1
vote
0
answers
144
views
How do I pass xcom to 3rd party operator when using TaskFlow?
UPDATE. I realise I haven't explained the problem concisely enough so I've posted a revised question at How do I pass xcom to traditional operator from task created using @task decorator?
I am using ...
1
vote
1
answer
269
views
"ValueError: non-default argument follows default argument" when upgrading from Airflow 2.2.2 to 2.8.1
Have been doing an upgrade from Airflow 2.2.2 to 2.8.1. All of my DAGs parse after doing relatively minor modifications, except for one.
Using the TaskFlow API, I have a task with this structure:
@...
0
votes
1
answer
754
views
Airflow : Complete all tasks in a TaskGroup before running to the next one and avoid dependancies between TaskGroup
I would like to set up a DAG where all tasks in a single TaskGroup are done before running to the next one.
Meaning that in the example (cf screenshot), the Workflow_FRA has to be done with the tasks ...