Создание отдельного подключения к базе данных для каждого работника сельдерея

Я продолжаю сталкиваться с проблемами mysql, в то время как рабочие выполняют задачи сразу после создания.

Мы используем django 1.3, сельдерей 3.1.17, djorm-ext-pool 0.5

Мы начинаем процесс сельдерея с параллелизмом. До сих пор я наблюдал, что, когда начинается процесс работы, все они получают ту же связь mysql. Мы регистрируем идентификатор соединения db, как показано ниже.

from django.db import connection connection.cursor() logger.info("Task %s processing with db connection %s", str(task_id), str(connection.connection.thread_id())) 

Когда все рабочие получают задания, первый выполняется успешно, а остальные два дают странные ошибки Mysql. Это либо ошибки с «Mysql server ушли», либо с условием, когда Django выбрасывает ошибку «DoNotExist». очевидно, объекты, которые Django запрашивает, существуют.

После этой ошибки каждый рабочий начинает получать свое собственное соединение с базой данных, после чего мы не обнаруживаем никаких проблем.

Каково поведение сельдерея по умолчанию? Он предназначен для совместного использования одного и того же соединения с базой данных. Если да, то как обрабатывается связь между процессами? В идеале я бы предпочел разное подключение к базе данных для каждого рабочего.

Я пробовал код, упомянутый ниже, который не работает. Объединение соединений с базами данных сельдерей

Мы также установили код сельдерея, предложенный ниже. https://github.com/celery/celery/issues/2453

Для тех, кто подавляет вопрос, любезно сообщите мне причину падения.

One Solution collect form web for “Создание отдельного подключения к базе данных для каждого работника сельдерея”

Сельдерей начинается с команды ниже

 celery -A myproject worker --loglevel=debug --concurrency=3 -Q testqueue 

myproject.py как часть основного процесса делал некоторые запросы к базе данных mysql перед тем, как развернуть рабочие процессы.

В рамках потока запросов в основном процессе django ORM создает пул соединений sqlalchemy, если он еще не существует. Затем создаются рабочие процессы.

Сельдерей как часть фитингов django закрывает существующие соединения.

  def close_database(self, **kwargs): if self._close_old_connections: return self._close_old_connections() # Django 1.6 if not self.db_reuse_max: return self._close_database() if self._db_recycles >= self.db_reuse_max * 2: self._db_recycles = 0 self._close_database() self._db_recycles += 1 

Фактически, что может случиться, так это то, что объект пула sqlalchemy с одним неиспользуемым соединением db копируется в 3 рабочих процесса при раздвоении. Таким образом, 3 разных пула имеют 3 объекта соединения, указывающие на один и тот же файл описания файла.

Рабочие при выполнении задач при запросе на соединение db все рабочие получают одно и то же неиспользуемое соединение из пула sqlalchemy, потому что это в настоящее время не используется. Тот факт, что все соединения указывают на один и тот же дескриптор файла, вызвал ошибки в соединении MySQL.

Новые соединения, созданные после этого, являются новыми и не указывают на один и тот же дескриптор файла сокета.

Решение:

В основном процессе добавьте

 from django.db import connection connection.cursor() 

перед любым импортом. т.е. до добавления модуля djorm-ext-pool .

Таким образом, все запросы db будут использовать соединение, созданное django вне пула. Когда крепеж django для сельдерея закрывает соединение, соединение фактически закрывается, а не возвращается в алхимический пул, оставляя пул алхимии без каких-либо подключений в нем во время обращения ко всем рабочим при раздвоении. После того, как работники запросят соединение db, sqlalchemy возвращает одно из вновь созданных соединений.

  • Python прописывает в sqlalchemy
  • SQLAlchemy - можете ли вы добавить пользовательские методы к объекту запроса?
  • Как использовать SQLAlchemy отражение с Sybase?
  • Выполнение сопоставлений DateTime в фильтрах SQLAlchemy
  • Запрос, чтобы проверить, равен ли размер коллекции 0 или пустой в SQLAlchemy?
  • Идентичные базы данных в Flask-SQLAlchemy
  • SQLAlchemy: как группировать по двум полям и фильтровать по дате
  • Стоит ли использовать sqlalchemy-migrate?
  • Python - лучший язык программирования в мире.