1

I am used to work with Argo which has capabilities to execute process steps in parallel, based on inputs from previous steps (e.g. see https://dzone.com/articles/parallel-workflows-on-kubernetes).

Let's use an easy example: a first step requests an API and receives a list of matching entries. The second will download and process the entries accordingly. The second step can be parallelized as the entries are independent from each other. In a third step, the results are stored somewhere, depending on all executions from step 2 (e.g. "item X succesfully procesed"). Classic divide & conquer, see below illustration.

enter image description here

I could not find documentation on how to do this in Airflow (2.0+). Is it possible, maybe with XComs?

Edit: One possible use cases would be satellite data processing. Consider e.g. the Open Access Hub of Copernicus, which provides an API to search for satellite scenes. I want to execute the processing of e.g. Sentintel-1 (radar data) in parallel for a given query. All scenes that match the query are relevant but do not depend on each other. Computation is heavy CPU + possibly GPU intensive, so spreading on workers makes sense in terms of overall performance.

1
  • You can't create tasks based on output from previous steps. Tasks can share metadata via xcom. This means that the n part needs to be known in advanced. The reason for this is Airflow expect workflows to be as static as possible / slowly changing. In airflow dags can be backfilled thus if every run the dag has different structure because n always changing you can not backfill correctly. Commented Jun 21, 2021 at 11:31

2 Answers 2

3

For Airflow >= 2.3.0:

A feature has been added to support Dynamic Task Mapping. Dynamic Task Mapping allows a way for a workflow to create a number of tasks at runtime based upon current data, rather than the DAG author having to know in advance how many tasks would be needed.

There are several options of mapping: Simple, Repeated, Multiple Parameters

For Airflow < 2.3.0:

Airflow does not support creating tasks dynamically based on output of previous steps (run time). The dynamic nature of DAGs in Airflow is in terms of values that are known when DAG at parsing time of the DAG file. The docs states:

Workflows are expected to be mostly static or slowly changing. You can think of the structure of the tasks in your workflow as slightly more dynamic than a database structure would be. Airflow workflows are expected to look similar from a run to the next, this allows for clarity around unit of work and continuity.

That said - tasks can share metadata (file paths, file names etc...)

You didn't explained much about your use case so it's hard to give recommendations. I can suggest the following: in ETLs it's easier to divide data by dates. So for example each branch can be a month/week. You can for example think of n in terms of weeks in year. Then you have 52 branches (n=52) and associate each file to branch based on it's creation_date (converted into week of year). If in a specific run a branch has no files to process it's simply being skipped. There are many possible solutions in that area.

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks Elad for these insights. I have added a use case to the question. Is it possible to spawn a fixed (pre-defined) amount of workers for the same operator task? The logic on how to divide the input data could be done based on a counter (the "n" in the illustration) inside the task.
it's related to number of workers. if you are talking about auto scaling of workers that a whole different domain. In airflow you can parallel as you want just not based on previous task output.
great, thanks. One last question: is it possible to tell a worker that he is #2 of 5 total workers? They all would get the same input and can then decide, based on their position, which part of the input array to work on.
I think you are a bit confusing with argo (where I assume you can specify to run on specific worker). In Airflow you have the queue parameter of BaseOperator where you can tell a task on which worker to run... but again this is a bit out of context. In the task itself you code the logic so many task can look on the same input (for example xcom/file/table etc..) but handle only specific portion of it according to what you set.
Thanks, I will do some experiments. My naming of concepts ("workers", ..) is most likely not matching Airflow, sorry for that :)
1

As mentioned in Airflow official tutorial, the DAG definition "needs to evaluate quickly (seconds, not minutes) since the scheduler will execute it periodically to reflect the changes if any". So it's not recommended to query an API to create a DAG "on the fly".

That being said, you can use Variables to dynamically define a DAG, or even a local file to the workers (that's more complicated if you have more than one workers). None of both are "good practices", but you can still use it. For the Variables option, you could have two DAGs, one for querying the API and setting the configuration variable and the other to read it, loop through it and create the tasks. That's also not very recommended since you're going to create connections to the metadata DB every time you do Variable.get("variable_name").

As @Elad mentioned, it's not possible via XComs, not even in Airflow 2.1 (I tried some solution but didn't work).

1 Comment

Using Variables to dynamically define a DAG is a bad practice because this means having Variable.get() as top level code. This results in opening connection to the database every 30 seconds (default of min_file_process_interval).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.