Подключите новую периодическую задачу сельдерея в джанго

Это не вопрос, а помощь тем, кто найдет, что декларация периодических задач, описанных в документации на сельдерей 4.0.1, трудно интегрировать в django: http://docs.celeryproject.org/en/latest/userguide/periodic-tasks .html # записи

скопировать каталог main_app/celery.py файла main_app/celery.py :

 from celery import Celery from celery.schedules import crontab app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello'), name='add every 10') # Calls test('world') every 30 seconds sender.add_periodic_task(30.0, test.s('world'), expires=10) # Executes every Monday morning at 7:30 am sender.add_periodic_task( crontab(hour=7, minute=30, day_of_week=1), test.s('Happy Mondays!'), ) @app.task def test(arg): print(arg) 

Вопрос

Но что, если мы используем django, и наши задачи помещаются в другое приложение? С сельдереем 4.0.1 у нас больше нет декоратора @periodic_task . Итак, давайте посмотрим, что мы можем сделать.

Первый случай

Если вы предпочитаете сохранять задачи и их расписание близко друг к другу:

main_app/some_app/tasks.py

 from main_app.celery import app as celery_app @celery_app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello')) @celery_app.task def test(arg): print(arg) 

Мы можем запустить beat в режиме отладки:

 celery -A main_app beat -l debug 

и мы увидим, что такой периодической задачи нет.

Второй случай

Мы можем попытаться описать все периодические задачи в файле конфигурации следующим образом:

main_app/celery.py

 ... app = Celery() @app.on_after_configure.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. from main_app.some_app.tasks import test sender.add_periodic_task(10.0, test.s('hello')) ... 

Результат тот же. Но он будет вести себя по-другому, что вы можете увидеть с помощью ручной отладки через pdb. В первом примере setup_periodic_tasks вызов setup_periodic_tasks не будет запущен вообще. Но во втором примере мы получим django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet. (это исключение не будет распечатываться)

Для django нам нужно использовать другой сигнал: @celery_app.on_after_finalize.connect . Он может использоваться для обоих:

  • объявление графика задач близко к задаче в app/tasks.py потому что этот сигнал будет запущен после того, как все tasks.py импортированы и все возможные приемники уже подписаны (первый случай).
  • централизованное объявление графика, поскольку приложения django будут уже инициализированы и готовы к импорту (второй случай)

Я думаю, что я должен написать заключительную декларацию:

Первый случай

Задание расписания задач, близких к задаче:

main_app/some_app/tasks.py

 from main_app.celery import app as celery_app @celery_app.on_after_finalize.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. sender.add_periodic_task(10.0, test.s('hello')) @celery_app.task def test(arg): print(arg) 

Второй случай

Централизованное объявление расписания в файле конфигурации main_app/celery.py :

 ... app = Celery() @app.on_after_finalize.connect def setup_periodic_tasks(sender, **kwargs): # Calls test('hello') every 10 seconds. from main_app.some_app.tasks import test sender.add_periodic_task(10.0, test.s('hello')) ...