многопроцессорность и сбор мусора

В py2.6 + модуль multiprocessing предлагает класс Pool , поэтому можно сделать:

 class Volatile(object): def do_stuff(self, ...): pool = multiprocessing.Pool() return pool.imap(...) 

Однако при стандартной реализации Python на 2.7.2 этот подход вскоре приводит к «IOError: [Errno 24] Слишком много открытых файлов». По-видимому, объект pool никогда не собирает мусор, поэтому его процессы никогда не заканчиваются, накапливая все дескрипторы, открытые внутри. Я думаю, это потому, что следующие работы:

 class Volatile(object): def do_stuff(self, ...): pool = multiprocessing.Pool() result = pool.map(...) pool.terminate() return result 

Я хотел бы оставить «ленивый» итераторный подход imap ; как работает сборщик мусора в этом случае? Как исправить код?

3 Solutions collect form web for “многопроцессорность и сбор мусора”

В конце концов, я закончил передачу ссылки на pool и завершал ее вручную после pool.imap итератора pool.imap :

 class Volatile(object): def do_stuff(self, ...): pool = multiprocessing.Pool() return pool, pool.imap(...) def call_stuff(self): pool, results = self.do_stuff() for result in results: # lazy evaluation of the imap pool.terminate() 

В случае, если кто-то наткнется на это решение в будущем: параметр chunksize очень важен в Pool.imap (в отличие от простого Pool.map , где это не имело значения). Я вручную устанавливаю его так, чтобы каждый процесс получал задания 1 + len(input) / len(pool) . Оставляя его по умолчанию chunksize=1 дал мне такую ​​же производительность, как если бы я не использовал параллельную обработку вообще … плохо.

Я полагаю, что нет никакой реальной пользы от использования заказанной map imap или упорядоченной map , я просто лично люблю итераторы.

В python у вас практически нет гарантии того, когда все будет уничтожено, и в этом случае это не то, как многопроцессорные пулы предназначены для использования.

Правильное решение – предоставить единый пул между несколькими вызовами функции. Самый простой способ сделать это – сохранить пул в качестве переменной класса (или, может быть, экземпляра):

 class Dispatcher: pool = multiprocessing.Pool() def do_stuff(self, ...): result = self.pool.map(...) return result 

Действительно, даже когда все пользовательские ссылки на объект pool удаляются, и в коде очереди нет заданий, и вся сборка мусора завершена, то все же процессы остаются непригодными для зомби в операционной системе – плюс у нас есть 3 потока обслуживания зомби из висячего Pool (Python 2.7 и 3.4):

 >>> del pool >>> gc.collect() 0 >>> gc.garbage [] >>> threading.enumerate() [<_MainThread(MainThread, started 5632)>, <Thread(Thread-8, started daemon 5252)>, <Thread(Thread-9, started daemon 5260)>, <Thread(Thread-7, started daemon 7608)>] 

И далее Pool() добавит все больше и больше зомби процессов и потоков … которые остаются до тех пор, пока основной процесс не будет прекращен.

Для этого требуется специальный трюк, чтобы остановить такой зомби-пул – через его служебную нить _handle_workers :

 >>> ths = threading.enumerate() >>> for th in ths: ... try: th.name, th._state, th._Thread__target ... except AttributeError: pass ... ('MainThread', 1, None) ('Thread-8', 0, <function _handle_tasks at 0x01462A30>) ('Thread-9', 0, <function _handle_results at 0x014629F0>) ('Thread-7', 0, <function _handle_workers at 0x01462A70>) >>> ths[-1]._state = multiprocessing.pool.CLOSE # or TERMINATE >>> threading.enumerate() [<_MainThread(MainThread, started 5632)>] >>> 

Это завершает другие сервисные потоки, а также завершает дочерние процессы.


Я думаю, что одна проблема заключается в том, что в библиотеке Python есть ошибка утечки ресурсов , которая может быть исправлена ​​путем правильного использования weakref .

Другой момент заключается в том, что создание и завершение Pool дорого (в том числе 3 потока обслуживания для каждого пула только для управления!), И обычно нет причин иметь гораздо больше рабочих процессов, чем ядра ЦП (высокие загрузки ЦП) или более чем ограниченное число в соответствии с другим ограничивающим ресурсом (например, пропускной способностью сети). Поэтому разумно рассматривать пул больше как глобальный ресурс уникального приложения (возможно, управляемый таймаутом), а не быстрый объект, просто удерживаемый закрытием (или обходным путем terminate () из-за ошибки).

Например:

 try: _unused = pool # reload safe global var except NameError: pool = None def get_pool(): global pool if pool is None: atexit.register(stop_pool) pool = Pool(CPUCORES) return pool def stop_pool(): global pool if pool: pool.terminate() pool = None 
Python - лучший язык программирования в мире.