1

Let's say I have an Airflow (2.3) DAG that looks like this:

@task
def retrieve_ingest_setup_queries():
    settings = Variable.get("athena_settings", deserialize_json=True)
    # settings = {'drops': ["DROP TABLE my_table", "DROP TABLE my_table2"],
    #             'creates': ["CREATE TABLE ...", ...]}
    return settings

@dag(
    dag_id='athena_something',
    default_args=default_args,
    schedule_interval=None,
    render_template_as_native_obj=True,
)
def somedag():
    ingest_setup = retrieve_ingest_setup_queries()

    ingest_db_setup_drops = AthenaOperator.partial(
        task_id='db_drops',
        database="{{ var.json.athena.database }}",
        output_location="{{ var.json.athena.output_location }}",
        aws_conn_id='aws_athena'
    ).expand(query=ingest_setup??????)

    ingest_db_setup_creates = AthenaOperator.partial(
        task_id='db_creates',
        database="{{ var.json.athena.database }}",
        output_location="{{ var.json.athena.output_location }}",
        aws_conn_id='aws_athena'
    ).expand(query=ingest_setup??????)

I am looking for a way to set "query" in the expand method as ingest_setup['drops'] for my first operator and as ingest_setup['creates'] for the second.

I could use two different retrieval functions, but I'd like to use only one. I want to use taskflow if at all possible. Expand doesn't support templating, so I don't see how/if this can be done (see ?????? in the code).

2
  • I think that you're missing the function definition your @dag should be decorating. i.e: @dag(...) def example_dag(): Commented Sep 29, 2022 at 20:09
  • fixed it, technically I should call it at the end too but it's not really relevant to what I'm trying to do. Commented Sep 30, 2022 at 0:14

1 Answer 1

0

I needed to use multiple_outputs=True for the task decorator.
Then ingest_setup['creates'] works as intended.

This only works with task decorators though, accessing the key of a dictionary that's an operator's result (XComArg) is far from intuitive. It is discussed here.

From there, I have created the following class for operator results:

class XcomDict:
    def __init__(self, operator: Operator):
        self.operator_output = str(operator.output).strip("{ }")

    def __getitem__(self, item: str):
        return f"{{{{ {self.operator_output}['{item}'] }}}}"

Of course, it assumes the operator's return value is a dictionary. I use it this way:

    job = CreateJobOperator(
        task_id='create_job', ...)

    wait = WaiForJobOperator(
        task_id='wait_for_job_to_complete',
        job_id=XcomDict(job)['JobId'],..)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.