Почему я могу передать метод экземпляра для multiprocessing.Process, но не многопроцессорный.Pool?

Я пытаюсь написать приложение, которое одновременно применяет функцию с multiprocessing.Pool . Я хотел бы, чтобы эта функция была методом экземпляра (поэтому я могу определить ее по-разному в разных подклассах). Это не представляется возможным; как я узнал в другом месте, по-видимому, связанные методы не могут быть маринованными . Итак, почему начинается multiprocessing.Process с привязанным методом в качестве целевой работы? Следующий код:

 import multiprocessing def test1(): print "Hello, world 1" def increment(x): return x + 1 class testClass(): def process(self): process1 = multiprocessing.Process(target=test1) process1.start() process1.join() process2 = multiprocessing.Process(target=self.test2) process2.start() process2.join() def pool(self): pool = multiprocessing.Pool(1) for answer in pool.imap(increment, range(10)): print answer print for answer in pool.imap(self.square, range(10)): print answer def test2(self): print "Hello, world 2" def square(self, x): return x * x def main(): c = testClass() c.process() c.pool() if __name__ == "__main__": main() 

Производит этот вывод:

 Hello, world 1 Hello, world 2 1 2 3 4 5 6 7 8 9 10 Exception in thread Thread-2: Traceback (most recent call last): File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner self.run() File "C:\Python27\Lib\threading.py", line 504, in run self.__target(*self.__args, **self.__kwargs) File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks put(task) PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

Почему процессы обрабатывают связанные методы, но не пулы?

3 Solutions collect form web for “Почему я могу передать метод экземпляра для multiprocessing.Process, но не многопроцессорный.Pool?”

Модуль pickle обычно не может определять методы экземпляра:

 >>> import pickle >>> class A(object): ... def z(self): print "hi" ... >>> a = A() >>> pickle.dumps(az) Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps Pickler(file, protocol).dump(obj) File "/usr/local/lib/python2.7/pickle.py", line 224, in dump self.save(obj) File "/usr/local/lib/python2.7/pickle.py", line 306, in save rv = reduce(self.proto) File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex raise TypeError, "can't pickle %s objects" % base.__name__ TypeError: can't pickle instancemethod objects 

Тем не менее, multiprocessing модуль имеет собственный Pickler который добавляет некоторый код для включения этой функции :

 # # Try making some callable types picklable # from pickle import Pickler class ForkingPickler(Pickler): dispatch = Pickler.dispatch.copy() @classmethod def register(cls, type, reduce): def dispatcher(self, obj): rv = reduce(obj) self.save_reduce(obj=obj, *rv) cls.dispatch[type] = dispatcher def _reduce_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) ForkingPickler.register(type(ForkingPickler.save), _reduce_method) 

Вы можете воспроизвести это с copy_reg модуля copy_reg чтобы убедиться, что он работает самостоятельно:

 >>> import copy_reg >>> def _reduce_method(m): ... if m.im_self is None: ... return getattr, (m.im_class, m.im_func.func_name) ... else: ... return getattr, (m.im_self, m.im_func.func_name) ... >>> copy_reg.pickle(type(az), _reduce_method) >>> pickle.dumps(az) "c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n." 

Когда вы используете Process.start для создания нового процесса в Windows, он рассосает все параметры, которые вы передали дочернему процессу, используя этот пользовательский ForkingPickler :

 # # Windows # else: # snip... from pickle import load, HIGHEST_PROTOCOL def dump(obj, file, protocol=None): ForkingPickler(file, protocol).dump(obj) # # We define a Popen class similar to the one from subprocess, but # whose constructor takes a process object as its argument. # class Popen(object): ''' Start a subprocess to run the code of a process object ''' _tls = thread._local() def __init__(self, process_obj): # create pipe for communication with child rfd, wfd = os.pipe() # get handle for read end of the pipe and make it inheritable ... # start process ... # set attributes of self ... # send information to child prep_data = get_preparation_data(process_obj._name) to_child = os.fdopen(wfd, 'wb') Popen._tls.process_handle = int(hp) try: dump(prep_data, to_child, HIGHEST_PROTOCOL) dump(process_obj, to_child, HIGHEST_PROTOCOL) finally: del Popen._tls.process_handle to_child.close() 

Обратите внимание на раздел «Отправить информацию ребенку». Он использует функцию dump , которая использует ForkingPickler для ForkingPickler данных, что означает, что ваш метод экземпляра можно мариновать.

Теперь, когда вы используете методы multiprocessing.Pool для отправки метода дочернему процессу, он использует multiprocessing.Pipe обработку. В Python 2.7 multiprocessing.Pipe реализация реализована в C и вызывает pickle_dumps напрямую , поэтому она не использует преимущества ForkingPickler . Это означает, что травление метода экземпляра не работает.

Однако, если вы используете copy_reg для регистрации типа instancemethod , а не для пользовательского Pickler , все попытки травления будут затронуты. Таким образом, вы можете использовать это, чтобы включить методы сортировки экземпляров, даже через Pool :

 import multiprocessing import copy_reg import types def _reduce_method(m): if m.im_self is None: return getattr, (m.im_class, m.im_func.func_name) else: return getattr, (m.im_self, m.im_func.func_name) copy_reg.pickle(types.MethodType, _reduce_method) def test1(): print("Hello, world 1") def increment(x): return x + 1 class testClass(): def process(self): process1 = multiprocessing.Process(target=test1) process1.start() process1.join() process2 = multiprocessing.Process(target=self.test2) process2.start() process2.join() def pool(self): pool = multiprocessing.Pool(1) for answer in pool.imap(increment, range(10)): print(answer) print for answer in pool.imap(self.square, range(10)): print(answer) def test2(self): print("Hello, world 2") def square(self, x): return x * x def main(): c = testClass() c.process() c.pool() if __name__ == "__main__": main() 

Вывод:

 Hello, world 1 Hello, world 2 GOT (0, 0, (True, 1)) GOT (0, 1, (True, 2)) GOT (0, 2, (True, 3)) GOT (0, 3, (True, 4)) GOT (0, 4, (True, 5)) 1GOT (0, 5, (True, 6)) GOT (0, 6, (True, 7)) 2 GOT (0, 7, (True, 8)) 3 GOT (0, 8, (True, 9)) GOT (0, 9, (True, 10)) 4 5 6 7 8 9 10 GOT (1, 0, (True, 0)) 0 GOT (1, 1, (True, 1)) 1 GOT (1, 2, (True, 4)) 4 GOT (1, 3, (True, 9)) 9 GOT (1, 4, (True, 16)) 16 GOT (1, 5, (True, 25)) 25 GOT (1, 6, (True, 36)) 36 GOT (1, 7, (True, 49)) 49 GOT (1, 8, (True, 64)) 64 GOT (1, 9, (True, 81)) 81 GOT None 

Также обратите внимание, что в Python 3.x pickle может израсходовать типы методов экземпляров изначально, поэтому ни один из этих материалов не имеет значения. 🙂

Вот альтернатива, которую я иногда использую, и она работает в Python2.x:

Вы можете создать «псевдоним» верхнего уровня для методов экземпляра, которые принимают объект, методы экземпляра которого вы хотите запустить в пуле, и попросите его вызвать методы экземпляра для вас:

 import functools import multiprocessing def _instance_method_alias(obj, arg): """ Alias for instance method that allows the method to be called in a multiprocessing pool """ obj.instance_method(arg) return class MyClass(object): """ Our custom class whose instance methods we want to be able to use in a multiprocessing pool """ def __init__(self): self.my_string = "From MyClass: {}" def instance_method(self, arg): """ Some arbitrary instance method """ print(self.my_string.format(arg)) return # create an object of MyClass obj = MyClass() # use functools.partial to create a new method that always has the # MyClass object passed as its first argument _bound_instance_method_alias = functools.partial(_instance_method_alias, obj) # create our list of things we will use the pool to map l = [1,2,3] # create the pool of workers pool = multiprocessing.Pool() # call pool.map, passing it the newly created function pool.map(_bound_instance_method_alias, l) # cleanup pool.close() pool.join() 

Этот код производит этот вывод:

От MyClass: 1
От MyClass: 2
Из MyClass: 3

Одно из ограничений заключается в том, что вы не можете использовать это для методов, которые изменяют объект. Каждый процесс получает копию объекта, на который он вызывает методы, поэтому изменения не будут передаваться обратно в основной процесс. Если вам не нужно изменять объект из методов, которые вы вызываете, это может быть простым решением.

Вот более простой способ работы в Python 2, просто оберните исходный метод экземпляра. Хорошо работает на MacOSX и Linux, не работает на Windows, тестировал Python 2.7

 from multiprocessing import Pool class Person(object): def __init__(self): self.name = 'Weizhong Tu' def calc(self, x): print self.name return x ** 5 def func(x, p=Person()): return p.calc(x) pool = Pool() print pool.map(func, range(10)) 
  • Тайм-аут на вызов функции Python внутри timeit
  • Сортировка WTForms form.errors dict
  • Ошибка Selenium Webdriver: «Невозможно загрузить профиль»
  • Бинарные данные POST с использованием httplib вызывают исключения из Unicode
  • Использование нескольких версий Python
  • Как импортировать таблицу с заголовками в фрейм данных с помощью модуля pandas
  • Python - если нет инструкции с 0.0
  • Пример производительности Python от Julia в pypy
  • Python - лучший язык программирования в мире.