49

I want the tasks in the DAG to all finish before the 1st task of the next run gets executed.

I have max_active_runs = 1, but this still happens.

default_args = {
    'depends_on_past': True,
    'wait_for_downstream': True,
    'max_active_runs': 1,
    'start_date': datetime(2018, 03, 04),
    'owner': 't.n',
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=4)
}

dag = DAG('example', default_args=default_args, schedule_interval = schedule_interval)

(All of my tasks are dependent on the previous task. Airflow version is 1.8.0)

Thank you

4 Answers 4

74

You've put the 'max_active_runs': 1 into the default_args parameter and not into the correct spot.

max_active_runs is a constructor argument for a DAG and should not be put into the default_args dictionary.

Here is an example DAG that shows where you need to move it to:

dag_args = { 
    'owner': 'Owner',
    # 'max_active_runs': 1, # <--- Here is where you had it.
    'depends_on_past': False,                          # |
    'start_date': datetime(2018, 01, 1, 12, 00),       # |
    'email_on_failure': False                          # |
}                                                      # |
                                                       # |
sched = timedelta(hours=1)                             # |
dag = DAG(                                             # |
          job_id,                                      # |
          default_args=dag_args,                       # |
          schedule_interval=sched,                     # V
          max_active_runs=1 # <---- Here is where it is supposed to be
      ) 

If the tasks that your dag is running are actually sub-dags then you may need to pass max_active_runs into the subdags too but not 100% sure on this.

Sign up to request clarification or add additional context in comments.

2 Comments

I passed max_active_runs as a default argument instead. Wonder if it's the problem. Will try passing it in the DAG function and see if it works.
If you want to set this for all your DAGs, you can use set max_active_runs_per_dag
63

I changed to put max_active_runs as an argument of DAG() instead of in default_arguments, and it worked.

Thanks SimonD for giving me the idea, though not directly pointing to it in your answer.

3 Comments

This working in which version? I'm using 1.10.14. It is still running all the dag runs simultaneously.
This was answered for 1.8.0
What happens to the next execution date if max_active_runs=1 and the run takes longer than the schedule interval?
-3

Actually you should use DAG_CONCURRENCY=1 as environment var. Worked for me.

Comments

-5

You can use xcoms to do it. First take 2 python operators as 'start' and 'end' to the DAG. Set the flow as:

start ---> ALL TASKS ----> end

'end' will always push a variable

last_success = context['execution_date'] to xcom (xcom_push). (Requires provide_context = True in the PythonOperators).

And 'start' will always check xcom (xcom_pull) to see whether there exists a last_success variable with value equal to the previous DagRun's execution_date or to the DAG's start_date (to let the process start).

Followed this answer

1 Comment

Seems like a promising workaround. Will be trying this.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.