4

I am trying to do group tasks and wait until all the group subtasks finished then run the last task. But when I call task it calls group and last tasks but the last task finished before group finish. Is it possible to wait until all the tasks inside group finish?

@shared_task(name="print")
def print_order():
    print("PRINT #1")
    mylist = [(1, 2), (4, 6), (1, 4)]
    group([(add.s(*i) | order_id_print.s()) for i in mylist]).delay()


@shared_task(name="print.add")
def add(x,y):
    print("ADD #2")
    chain(add_task1.s(x, y, 'task id') | add_task2.si(x, y, "task_id")).delay()
    return x+y

@shared_task(name="add_task_1")
def add_task1(order_id, ftype, task_id):
    print("ADD task #2-1")
    print("add tasks task1 order_id {} {} {}".format(order_id, ftype, task_id))

@shared_task(name="add_task_2")
def add_task2(order_id, ftype, task_id):
    print("ADD task #2-2")
    print("add tasks task2 order_id {} {} {}".format(order_id, ftype, task_id))


@shared_task(name="print.order_id_print")
def order_id_print(id):
    print("ORDER #3")
    print("order id is {}".format(id))
1
  • 1
    Not so long ago I was asking the same question on the Celery IRC channel at FreeNode. They explained to me that the best thing to do is to make a chain out of your group and add a task that collects the data (or processes the results of a group). Commented Apr 11, 2019 at 7:59

2 Answers 2

3

What you probably want, is a chord instead of a group. A chord is a task that only executes after all of the tasks in a group have finished executing.

Have a look at the docs:

https://docs.celeryproject.org/en/latest/userguide/canvas.html#chords

Sign up to request clarification or add additional context in comments.

7 Comments

So in my case, if i use chord, after all the add task finished then order_id_print will invoke?
correct. something like chord([add.s(*i) for i in mylist], order_id_print.s()).delay() will asynchronously run all the add tasks. When all add tasks are finished, it will asynchronously invoke your order_id_print task. Beware that Celery will pass all individual add results to order_id_print as a list so you have to add some tasks arguments.
I did exactly as it is, but in celery, it shows ADD task #2-2 right after order_id_print. Got the celery results like this [2019-04-11 17:14:45,475: WARNING/ForkPoolWorker-3] ORDER #3 [2019-04-11 17:14:45,476: WARNING/ForkPoolWorker-3] order id is [5, 3, 10] [2019-04-11 17:14:45,476: INFO/ForkPoolWorker-3] Task print.order_id_print[77b50397-a2bb-4141-8380-e86aa1c04642] succeeded in 0.0006398810000973754s: None [2019-04-11 17:14:45,476: INFO/MainProcess] Received task: add_task_2[30fbbf70-cae8-482c-81fd-fdc17a816f48]
that is because you asynchronously trigger another task (chain) within your add task. If this is not what you want, you need to rearrange your workflows.
Oh, i get it. So instead of calling add i have to call add_task1 and add_task2 in chord right? so that way there wont be any third async process ??? but it needed to be inside for loop so inside chord there should be group ? like chord([(add_task1(*i), add_task2(*i)) for i in my_list], order_id_print.s()).delay()
|
0

For this there is 2 options: ''group'' and "chord"(Chord does not work with rpc) . Both are celery constructs that allow you to run multiple tasks in parallel and then wait until all tasks have completed.

with chord it is possible to define a ''callback'' task and do the whole process asynchronously. If your server needs to wait for tasks before return to the client then group may be the best option

Group:

tasks = [get_foods.s(x,y), get_drinks.s(z,w)]
results = group(tasks)().get() #lock until everything done
for result in results:
    print(result)

Chord:

chord([get_foods.s(x,y), get_drinks.s(x,y)], finish_order.s()).delay()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.