I’m running Celery with Django and Celery Beat. Celery Beat triggers an outer task every 30 minutes, and inside that task I enqueue another task per item. Both tasks are decorated to use the same custom queue, but the inner task still lands in the default queue.
from celery import shared_task
from django.db import transaction
@shared_task(queue="outer_queue")
def sync_all_items():
"""
This outer task is triggered by Celery Beat every 30 minutes.
It scans the DB for outdated items and enqueues a per-item task.
"""
items = Item.objects.find_outdated_items()
for item in items:
# I expect this to enqueue on outer_queue as well
process_item.apply_async_on_commit(args=(item.pk,))
@shared_task(queue="outer_queue")
def process_item(item_id):
do_some_processing(item_id=item_id)
Celery beat config:
CELERY_BEAT_SCHEDULE = {
"sync_all_items": {
"task": "myapp.tasks.sync_all_items",
"schedule": crontab(minute="*/30"),
# Beat is explicitly sending the outer task to outer_queue
"options": {"queue": "outer_queue"},
}
}
But, when I run the process_item task manually e.g. in the Django view, it respect the decorator and lands in expected queue.
I’ve tried:
Adding queue='outer_queue' to apply_async_on_commit
Calling process_item.delay(item.pk) instead
Using .apply_async(args=[item.pk], queue='outer_queue') inside transaction.on_commit
But no matter what, the inner tasks still show up in the default queue.