Сломанная труба при использовании менеджеров многопроцессорности Python (BaseManager / SyncManager) для обмена очередью с удаленными машинами

В прошлом месяце у нас была постоянная проблема с пакетом многопроцессорности Python 2.6.x, когда мы пытались использовать его для совместного использования очереди между несколькими различными (Linux) компьютерами. Я задал этот вопрос непосредственно Джесси Ноллеру, так как мы еще не нашли ничего, что разъясняет проблему в документах StackOverflow, Python, исходном коде или в другом месте в Интернете.

Наша команда инженеров не смогла решить эту проблему, и мы поставили вопрос довольно многим людям в группах пользователей python безрезультатно. Я надеялся, что кто-то может пролить некоторое понимание, так как я чувствую, что мы делаем что-то неправильное, но слишком близко к проблеме, чтобы увидеть, что это такое.

Вот признак:

Traceback (most recent call last): File "/var/django_root/dev/com/brightscope/data/processes/daemons/deferredupdates/servers/queue_server.py", line 65, in get_from_queue return queue, queue.get(block=False) File "<string>", line 2, in get File "/usr/local/lib/python2.6/multiprocessing/managers.py", line 725, in _callmethod conn.send((self._id, methodname, args, kwds)) IOError: [Errno 32] Broken pipe 

(Я показываю, где наш код вызывает queue.get () для объекта общей очереди, размещенного менеджером, который расширяет SyncManger).

Что особенного в этой проблеме, так это то, что если мы подключаемся к этой общей очереди на одной машине (давайте назовем этот machine A ), даже из-за множества параллельных процессов, мы никогда не сталкиваемся с проблемой. Только когда мы подключаемся к очереди (опять же, используя класс, который расширяет многопроцессорность SyncManager и в настоящее время не добавляет дополнительных функций) от других машин (давайте назовем эти machines B and C ) и запустим большой объем элементов в очередь и из очереди в то же время мы сталкиваемся с проблемой.

Это похоже на то, что пакет многопроцессорности python обрабатывает локальные соединения (хотя они все еще используют один и тот же метод соединения manager.connect ()) таким образом, который работает с machine A но когда удаленные соединения выполняются одновременно, по крайней мере, из одной из machines B or C мы получаем ошибку Broken pipe.

Во всех чтениях моей команды мы решили, что проблема связана с блокировкой. Мы думали, возможно, нам не следует использовать Queue.Queue , но вместо этого multiprocessing.Queue Queue.Queue , но мы переключились и проблема осталась (мы также заметили, что собственная общая очередь SyncManager – это экземпляр Queue.Queue).

Мы вытягиваем наши волосы о том, как даже отлаживать проблему, так как ее трудно воспроизвести, но происходит довольно часто (много раз в день, если мы вставляем и .get () загружаем множество предметов из очереди).

Метод, который мы создали get_from_queue пытается повторить попытку получения элемента из очереди ~ 10 раз со случайными интервалами сна, но кажется, что если он терпит неудачу один раз, он провалится все десять раз (что заставило меня поверить, что .register () и. connect () с менеджером, возможно, не дает другого подключения сокета к серверу, но я не мог подтвердить это, читая документы или глядя на внутренний исходный код Python).

Может ли кто-нибудь дать представление о том, где мы можем смотреть, или как мы можем отслеживать, что на самом деле происходит?

Как мы можем начать новое соединение в случае сломанного трубопровода с использованием multiprocessing.BaseManager или multiprocessing.SyncManager ?

Как мы можем предотвратить сломанную трубу в первую очередь?

FYI. Если кто-то другой работает по этой же ошибке, после обширного консультирования с Ask Solem и Jesse Noller из основной команды разработчиков Python, похоже, что это на самом деле ошибка в текущем python 2.6.x (и, возможно, 2.7+ и, возможно, 3.x ). Они рассматривают возможные решения, и исправление, вероятно, будет включено в будущую версию Python.

Я страдал от одной и той же проблемы, даже если подключение на localhost в python 2.7.1. После дня отладки я нашел причину и обходной путь:

Причина. Класс BaseProxy имеет локальное хранилище потоков, которое кэширует соединение, которое повторно используется для будущих подключений, что приводит к ошибкам «сломанной трубы» даже при создании нового диспетчера

Обход проблемы: удалите кешированное соединение перед повторным подключением

 if address in BaseProxy._address_to_local: del BaseProxy._address_to_local[address][0].connection 

Также вы можете попытаться поймать исключение в дочерних процессах, чтобы он не пытался закрыть соединение UN-ожидаемо. То же самое происходило со мной, и, наконец, я должен был подавить ошибки, чтобы труба не закрывалась внезапно.