Использование многопроцессорного пула из задачи celery вызывает исключение

ДЛЯ ТОГО ЧИТАЮТ ЭТО: Я решил использовать RQ вместо этого, который не сбой при запуске кода, который использует модуль многопроцессорности. Я предлагаю вам использовать это.

Я пытаюсь использовать многопроцессорный пул из задачи celery, используя Python 3, и redis как брокер (работает на Mac). Однако, похоже, я даже не могу создать объект многопроцессорного пула из задачи Сельделия! Вместо этого я получаю странное исключение, которое я действительно не знаю, что делать.

Может ли кто-нибудь сказать мне, как это сделать?

Задание:

from celery import Celery from multiprocessing.pool import Pool app = Celery('tasks', backend='redis', broker='redis://localhost:6379/0') @app.task def test_pool(): with Pool() as pool: # perform some task using the pool pool.close() return 'Done!' 

который я добавляю к сельдерейу:

 celery -A tasks worker --loglevel=info 

а затем запустить его через следующий скрипт python:

 import tasks tasks.test_pool.delay() 

который возвращает следующий выход сельдерея:

 [2015-01-12 15:08:57,571: INFO/MainProcess] Connected to redis://localhost:6379/0 [2015-01-12 15:08:57,583: INFO/MainProcess] mingle: searching for neighbors [2015-01-12 15:08:58,588: INFO/MainProcess] mingle: all alone [2015-01-12 15:08:58,598: WARNING/MainProcess] celery@Simons-MacBook-Pro.local ready. [2015-01-12 15:09:02,425: INFO/MainProcess] Received task: tasks.test_pool[38cab553-3a01-4512-8f94-174743b05369] [2015-01-12 15:09:02,436: ERROR/MainProcess] Task tasks.test_pool[38cab553-3a01-4512-8f94-174743b05369] raised unexpected: AttributeError("'Worker' object has no attribute '_config'",) Traceback (most recent call last): File "/usr/local/lib/python3.4/site-packages/celery/app/trace.py", line 240, in trace_task R = retval = fun(*args, **kwargs) File "/usr/local/lib/python3.4/site-packages/celery/app/trace.py", line 438, in __protected_call__ return self.run(*args, **kwargs) File "/Users/simongray/Code/etilbudsavis/offer-sniffer/tasks.py", line 17, in test_pool with Pool() as pool: File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 150, in __init__ self._setup_queues() File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/pool.py", line 243, in _setup_queues self._inqueue = self._ctx.SimpleQueue() File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 111, in SimpleQueue return SimpleQueue(ctx=self.get_context()) File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/queues.py", line 336, in __init__ self._rlock = ctx.Lock() File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/context.py", line 66, in Lock return Lock(ctx=self.get_context()) File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 163, in __init__ SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx) File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 59, in __init__ kind, value, maxvalue, self._make_name(), File "/usr/local/Cellar/python3/3.4.2_1/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/synchronize.py", line 117, in _make_name return '%s-%s' % (process.current_process()._config['semprefix'], AttributeError: 'Worker' object has no attribute '_config' 

2 Solutions collect form web for “Использование многопроцессорного пула из задачи celery вызывает исключение”

Это известная проблема с сельдереем. Это связано с проблемой, введенной в зависимость от бильярда. _config – это вручную установить атрибут _config для текущего процесса. Благодаря пользователю @martinth для работы ниже.

 from celery.signals import worker_process_init from multiprocessing import current_process @worker_process_init.connect def fix_multiprocessing(**kwargs): try: current_process()._config except AttributeError: current_process()._config = {'semprefix': '/mp'} 

Крючок worker_process_init будет выполнять код при инициализации рабочего процесса. Мы просто проверяем, существует ли _config и устанавливаем его, если это не так.

Быстрое решение заключается в использовании поточной multiprocessing реализации на основе потоков . + Изменить

 from multiprocessing import Pool # or whatever you're using 

в

 from multiprocessing.dummy import Pool 

Однако, поскольку этот параллелизм основан на потоках, применяются обычные оговорки (GIL).

  • Как использовать sadd с несколькими элементами в Redis с использованием API Python?
  • Как я могу управлять статусом / цветка сельдерея без опции -A?
  • как я могу передать бесконечность redis из python?
  • Неудовлетворительная производительность работы с Python RQ
  • Asyncio + aiohttp - redis Паб / Sub и websocket чтение / запись в одном обработчике
  • Получить все ключи в базе данных Redis с помощью python
  • Получение значений с правильным типом в Redis
  • Как / где хранить временные файлы и журналы для облачного приложения?
  • Получение всех идентификаторов задач из вложенных цепочек и аккордов
  • Как использовать redis с Django?
  • Поиск бенчмаркинга из redis vs memory в python (с использованием timeit)
  • Python - лучший язык программирования в мире.