В сельдерее как обновить состояние основной задачи, пока не будут выполнены все его подзадачи?

В Celery я выполняю основную задачу, которая запускает одну подзадачу для каждого элемента, который он получает из запроса. Подзадачи должны выполняться параллельно. В пользовательском интерфейсе у меня есть индикатор выполнения, который показывает, сколько подзадач сделано в общей сложности. Я обновляю основное состояние задачи, чтобы предоставить информацию в индикатор выполнения. Моя проблема заключается в том, что основная задача завершилась сразу после нажатия на все подзадачи брокеру, чтобы я больше не мог обновлять его состояние. Хотелось бы, чтобы основная задача могла подождать, пока все подзадачи не будут выполнены. Является ли это возможным? Любые другие решения? Вот мой псевдокод (реальный код не использует global ;-)).

total = 0 done = 0 @task(ignore_result=True) def copy_media(path): global total, done copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) documents = Document.objects.all() total = documents.count() copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) for document in documents: process_doc.delay(document, path, copy_media) @task(ignore_result=True) def process_doc(document, path, copy_media): global total, done # Do some stuff done += 1 copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) 

Я нашел способ использовать TaskSet . Но я не полностью удовлетворен, потому что не могу проигнорировать результат подзадач. Если я проигнорирую результат для задачи process_doc results.ready() всегда возвращает False , results.completed_count() всегда возвращает 0 и т. Д. Вот код:

 @task(ignore_result=True) def copy_media(path): copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) documents = Document.objects.all() total = documents.count() copy_media.update_state(state=STARTED, meta={'total': total, 'done': done}) job = TaskSet(tasks=[process_doc.subtask((document, path)) for document in documents]) results = job.apply_async() doc_name = '' while not results.ready(): done = results.completed_count() if done: last = done - 1 for idx in xrange(last, -1, -1): if results[idx].ready(): doc_name = results[idx].result break copy_media.update_state(state=STARTED, meta={'total': total, 'done': done, 'doc-name': doc_name}) time.sleep(0.25) @task() def process_doc(document, path): # Do some stuff return document 

Вы можете использовать кэширование с кэшем memcached для хранения количества полных задач. Существует даже cache.inrc в API кэша django для атомного приращения, чтобы убедиться, что одновременные обновления счетчика не cache.inrc .

Кроме того, выполнение основной задачи, выполняемой до завершения всех подзадач, является плохой идеей, потому что вы в течение длительного времени блокируете одного из работников сельдерея. Если сельдерей работает с одним рабочим процессом, это приведет к бесконечной блокировке.

Я не знаю, какую версию сельдерея вы используете, но вы можете посмотреть подзадачи групп (новый в версии 3.0).