Как связать задачу сельдерея, которая возвращает список в группу?

Я хочу создать группу из списка, возвращаемого задачей Celery, так что для каждого элемента в наборе результатов задачи одна задача будет добавлена ​​в группу.

Вот простой пример кода для объяснения варианта использования. ??? должен быть результатом предыдущей задачи.

 @celery.task def get_list(amount): # In reality, fetch a list of items from a db return [i for i in range(amount)] @celery.task def process_item(item): #do stuff pass process_list = (get_list.s(10) | group(process_item.s(i) for i in ???)) 

Я, вероятно, не подходит к этому правильно, но я уверен, что небезопасно вызывать задачи из задач:

 @celery.task def process_list(): for i in get_list.delay().get(): process_item.delay(i) 

Мне не нужен результат из задачи секунд.

One Solution collect form web for “Как связать задачу сельдерея, которая возвращает список в группу?”

Вы можете получить такое поведение, используя промежуточную задачу. Вот демонстрация создания такого «картографического» метода, который работает так, как вы предлагали.

 from celery import task, subtask, group @task def get_list(amount): return [i for i in range(amount)] @task def process_item(item): # do stuff pass @task def dmap(it, callback): # Map a callback over an iterator and return as a group callback = subtask(callback) return group(callback.clone([arg,]) for arg in it)() # runs process_item for each item in the return of get_list process_list = (get_list.s(10) | dmap.s(process_item.s())) 

Позвольте спросить Solem за то, что он дал мне это предложение, когда я попросил его о помощи по аналогичной проблеме.

  • Как отправить периодические задания в определенную очередь в Сельдерей
  • Как я могу настроить Celery для вызова пользовательской функции инициализации перед выполнением моих задач?
  • Могу ли я использовать Tornado + Celery + RabbitMQ + Redis?
  • Динамически добавлять / удалять потоки в рабочий пул в сельдерее
  • Ошибки кадрирования в сельдерее 3.0.1
  • Должны ли экземпляры объекта модели django передавать сельдерею?
  • Сельдерей: как проигнорировать задачу в аккорде или цепочке?
  • График задач сельдерея (выполнение задачи выполняется только по одному)
  • Python - лучший язык программирования в мире.