1

Under a sequential executor, I have a DAG file where I specify three tasks that are needed to be run sequentially (t1-->t2-->t3):

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2017, 6, 14, 23 , 20),
    'email_on_failure': False,
    'email_on_retry': False,
    }

dag = DAG('test_dag', default_args=default_args, schedule_interval="*/5 * * * *")

t1 = BashOperator(
    task_id='form_dataset',
    bash_command='python 1.py',
    dag=dag)

t2 = BashOperator(
    task_id='form_features',
    bash_command='python 2.py',
    depends_on_past=True,
    dag=dag)

t3 = BashOperator(
    task_id='train',
    bash_command='python 3.py',
    depends_on_past=True,
    dag=dag)

t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)

I assume the sequential behavior t1-->t2-->t3 to be a default one, thought it's not the case in my situation (the order is pretty much random, e.g. t1-->t2-->t2-->t1-->t3). What kind of argument I am missing that would correct the behavior?

1 Answer 1

3

You need to add the statement

t1 >> t2 >> t3

at the end of the file. More details for this are on the following link: https://airflow.incubator.apache.org/concepts.html#bitshift-composition

For completeness, you can also do it by using set_upstream() or set_downstream() methods for tasks.

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for your reply, I actually have it (sorry I didn't put it in the code). I guess the problem is that the total amount of running time for all three tasks is greater that the schedule interval. I would like to set priority on the execution of the all tasks within a DAG over running a new DAG, but didn't find a related attribute so far.
Does the execution of every DAG run depend on the successful completion of previous run? In that case, you can look into the behavior of the parameter depends_on_true. There is another option to assign each task to a pool and limiting the size of that pool to 1. Or maybe I am misunderstanding your use case? In general, the dag tasks should be as much independent of other runs as possible.
In airflow.cfg there is a setting max_active_runs_per_dag. Setting this to 1 should also prevent the same dag from starting again before the previous one finishes.
Thank you very much - 'max_active_runs_per_dag' is just what needed! Good luck with your study at ETH (graduated there a short time ago)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.