Использование многопроцессорного пула из задачи celery вызывает исключение

ДЛЯ ТОГО ЧИТАЮТ ЭТО: Я решил использовать RQ вместо этого, который не сбой при запуске кода, который использует модуль многопроцессорности. Я предлагаю вам использовать это.

Я пытаюсь использовать многопроцессорный пул из задачи celery, используя Python 3, и redis как брокер (работает на Mac). Однако, похоже, я даже не могу создать объект многопроцессорного пула из задачи Сельделия! Вместо этого я получаю странное исключение, которое я действительно не знаю, что делать.

Может ли кто-нибудь сказать мне, как это сделать?

Задание:

from celery import Celery from multiprocessing.pool import Pool app = Celery('tasks', backend='redis', broker='redis://localhost:6379/0') @app.task def test_pool(): with Pool() as pool: # perform some task using the pool pool.close() return 'Done!' 

который я добавляю к сельдерейу:

 celery -A tasks worker --loglevel=info 

а затем запустить его через следующий скрипт python:

 import tasks tasks.test_pool.delay() 

который возвращает следующий выход сельдерея:

 [2015-01-12 15:08:57,571: INFO/MainProcess] Connected to redis://localhost:6379/0 [2015-01-12 15:08:57,583: INFO/MainProcess] mingle: searching for neighbors [2015-01-12 15:08:58,588: INFO/MainProcess] mingle: all alone [2015-01-12 15:08:58,598: WARNING/MainProcess] celery@Simons-MacBook-Pro.local ready. [2015-01-12 15:09:02,425: INFO/MainProcess] Received task: tasks.test_pool[38cab553-3a01-4512-8f94-174743b05369] [2015-01-12 15:09:02,436: ERROR/MainProcess] Task tasks.test_pool[38cab553-3a01-4512-8f94-174743b05369] raised unexpected: AttributeError("'Worker' object has no attribute '_config'",) Traceback (most recent call last): File "/usr/local/lib/python3.4/site-packages/celery/app/trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "/usr/local/lib/python3.4/site-packages/celery/app/trace.py", line 438, in __protected_call__ return self.run(*args, **kwargs) File "/Users/simongray/Code/etilbudsavis/offer-sniffer/tasks.py", line 17, in test_pool with Pool() as pool: File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 150, in __init__ self._setup_queues() File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 243, in _setup_queues self._inqueue = self._ctx.SimpleQueue() File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue return SimpleQueue(ctx=self.get_context()) File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__ self._rlock = ctx.Lock() File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock return Lock(ctx=self.get_context()) File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 163, in __init__ SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 59, in __init__ kind, value, maxvalue, self._make_name(), File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 117, in _make_name return '%s-%s' % (process.current_process()._config['semprefix'], AttributeError: 'Worker' object has no attribute '_config' 

2 Solutions collect form web for “Использование многопроцессорного пула из задачи celery вызывает исключение”

Это известная проблема с сельдереем. Это связано с проблемой, введенной в зависимость от бильярда. _config – это вручную установить атрибут _config для текущего процесса. Благодаря пользователю @martinth для работы ниже.

 from celery.signals import worker_process_init from multiprocessing import current_process @worker_process_init.connect def fix_multiprocessing(**kwargs): try: current_process()._config except AttributeError: current_process()._config = {'semprefix': '/mp'} 

Крючок worker_process_init будет выполнять код при инициализации рабочего процесса. Мы просто проверяем, существует ли _config и устанавливаем его, если это не так.

Быстрое решение заключается в использовании поточной multiprocessing реализации на основе потоков . + Изменить

 from multiprocessing import Pool # or whatever you're using 

в

 from multiprocessing.dummy import Pool 

Однако, поскольку этот параллелизм основан на потоках, применяются обычные оговорки (GIL).

  • Пример Redis в действии
  • Как хранить и извлекать словарь с помощью redis
  • как сохранить изображение в redis с помощью python / PIL
  • Django - Как использовать асинхронную очередь задач с сельдереем и redis
  • Сельдерей пытается связаться с неправильным брокером
  • Как настроить сотрудника RQ на Heroku с помощью RedisCloud с помощью Flask
  • ImportError: нет модуля с именем redis
  • Запуск фоновых задач Heroku с помощью всего лишь 1 веб-дино и 0 рабочих динамиков
  • Python - лучший язык программирования в мире.