1

Let's say that I have the following (simplified) dag: I have a task that returns a series of query parameters values, and I want to spawn a dynamic task instance of httpoperator to do a query like http://example.com?key=value1, http://example.com?key=value2, etc. But I'm not able to map the data field in the operator

@task
def get_values():
  return ["value1","value2","value3"]

@dag
def mydag() -> None:
    values = get_values()
    gets = HttpOperator.partial(task_id='gets', method='GET').expand(
        data={ 'key': values }, # expanding in the ui, but "ValueError: too many values to unpack (expected 2)" at runtime
        data=[{'key': value} for value in values], # dag error: "TypeError: 'XComArg' object is not iterable"
    )

I'm not sure how to set up the parameter to actually use the values object properly.

worse, in the real dag data normally would refer also to additional qp and some of them comes from other tasks (with a static mapping)

3 Answers 3

0

I think the problem is that it is trying to expand on 'key' and on the array values, so on two parameters, instead of just one array. I don't have much experience with the HttpOperator, but perhaps the key can be passed in the partial part, and then you can simply do data=values. Another alternative is to use requests.get, like this:

@task
def send_get_request(data):
    response = requests.get(f"https://www.something.com?{data}")
    return response.json()

and then you can simply do:

.expand(data=values)

Hope that helps.

Sign up to request clarification or add additional context in comments.

Comments

0

FWIW I got a way to expand the query parameter to multiple values. To do it I relies on Operator.expand_kwargs and XComArg.map:

@task
def get_values():
  return ["value1","value2","value3"]

@dag
def mydag() -> None:
    values = get_values()
    gets = HttpOperator.partial(task_id='gets', method='GET').expand_kwargs(
        values.map(lambda value: { 'data': {'key': value} }
    )

Comments

0

this is the correct way to do it --> credit to : Vito De Tullio (thank bro you saved me)

@task
def get_values():
    return ["value1","value2","value3"]

@dag
def mydag() -> None:
    values = get_values()
    gets = HttpOperator.partial(task_id='gets', method='GET').expand_kwargs(
        values.map(lambda value: { 'data': {'key': value} }

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.