Как объединить Pool.map с Array (разделяемая память) в многопроцессорной обработке Python?

У меня есть очень большой (только для чтения) массив данных, который я хочу обрабатывать несколькими процессами параллельно.

Мне нравится функция Pool.map и я хотел бы использовать ее для вычисления функций по этим данным параллельно.

Я видел, что для использования данных разделяемой памяти между процессами можно использовать класс Value или Array. Но когда я пытаюсь использовать это, я получаю RuntimeError: «Объекты SynchronizedString должны использоваться только совместно между процессами через наследование при использовании функции Pool.map:

Вот упрощенный пример того, что я пытаюсь сделать:

from sys import stdin from multiprocessing import Pool, Array def count_it( arr, key ): count = 0 for c in arr: if c == key: count += 1 return count if __name__ == '__main__': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" # want to share it using shared memory toShare = Array('c', testData) # this works print count_it( toShare, "a" ) pool = Pool() # RuntimeError here print pool.map( count_it, [(toShare,key) for key in ["a", "b", "s", "d"]] ) 

Может ли кто-нибудь сказать мне, что я делаю неправильно здесь?

Так что я хотел бы сделать, это передать информацию о вновь созданном распределенном распределяемом массиве памяти для процессов после того, как они были созданы в пуле процессов.

  • Заполнение очереди и управление многопроцессорностью в python
  • Многопроцессорный пул внутри процесса Время ожидания
  • Консоль форматирования Выход
  • Как получить количество «работы», оставшегося для пула многопроцессорности Python?
  • Существует ли эквивалент Python оператора C # null-coalescing?
  • Как позволить Pool.map использовать функцию лямбда
  • Использование пула многопроцессорности python в терминале и в модулях кода для Django или Flask
  • Невозможно pickle <type 'instancemethod'> с использованием многопроцессорной обработки python Pool.apply_async ()
  • 4 Solutions collect form web for “Как объединить Pool.map с Array (разделяемая память) в многопроцессорной обработке Python?”

    Повторяю, как только я увидел щедрость;)

    В принципе, я думаю, что сообщение об ошибке означает, что он сказал – многопроцессорная разделяемая память. Массивы не могут передаваться в качестве аргументов (путем травления). Не имеет смысла сериализовать данные – точка – это данные общей памяти. Таким образом, вы должны сделать общий массив глобальным. Я думаю, что опередить его как атрибут модуля, как и в моем первом ответе, но просто оставить его как глобальную переменную в вашем примере также хорошо работает. Принимая во внимание вашу мысль о том, что вы не хотите устанавливать данные перед вилкой, здесь приведен пример. Если вы хотите иметь более одного возможного общего массива (и именно поэтому вы хотели передать toShare в качестве аргумента), вы могли бы аналогичным образом сделать глобальный список общих массивов и просто передать индекс в count_it (который станет for c in toShare[i]: .

     from sys import stdin from multiprocessing import Pool, Array, Process def count_it( key ): count = 0 for c in toShare: if c == key: count += 1 return count if __name__ == '__main__': # allocate shared array - want lock=False in this case since we # aren't writing to it and want to allow multiple processes to access # at the same time - I think with lock=True there would be little or # no speedup maxLength = 50 toShare = Array('c', maxLength, lock=False) # fork pool = Pool() # can set data after fork testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" if len(testData) > maxLength: raise ValueError, "Shared array too small to hold data" toShare[:len(testData)] = testData print pool.map( count_it, ["a", "b", "s", "d"] ) 

    [РЕДАКТИРОВАТЬ: Вышеописанное не работает на окнах из-за того, что вы не используете fork. Однако нижеследующее работает в Windows, все еще используя пул, поэтому я думаю, что это самое близкое к тому, что вы хотите:

     from sys import stdin from multiprocessing import Pool, Array, Process import mymodule def count_it( key ): count = 0 for c in mymodule.toShare: if c == key: count += 1 return count def initProcess(share): mymodule.toShare = share if __name__ == '__main__': # allocate shared array - want lock=False in this case since we # aren't writing to it and want to allow multiple processes to access # at the same time - I think with lock=True there would be little or # no speedup maxLength = 50 toShare = Array('c', maxLength, lock=False) # fork pool = Pool(initializer=initProcess,initargs=(toShare,)) # can set data after fork testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" if len(testData) > maxLength: raise ValueError, "Shared array too small to hold data" toShare[:len(testData)] = testData print pool.map( count_it, ["a", "b", "s", "d"] ) 

    Не уверен, почему карта не будет сортировать массив, но процесс и пул – я думаю, возможно, он был перенесен в момент инициализации подпроцесса в окнах. Обратите внимание, что данные все еще установлены после fork.

    Проблема, которую я вижу, заключается в том, что пул не поддерживает травление общих данных через свой список аргументов. Это то, что сообщение об ошибке означает «объекты должны делиться только между процессами через наследование». Общие данные должны быть наследованы, то есть глобальными, если вы хотите поделиться им с помощью класса Pool.

    Если вам нужно передать их явно, возможно, вам придется использовать multiprocessing.Process. Вот ваш переработанный пример:

     from multiprocessing import Process, Array, Queue def count_it( q, arr, key ): count = 0 for c in arr: if c == key: count += 1 q.put((key, count)) if __name__ == '__main__': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" # want to share it using shared memory toShare = Array('c', testData) q = Queue() keys = ['a', 'b', 's', 'd'] workers = [Process(target=count_it, args = (q, toShare, key)) for key in keys] for p in workers: p.start() for p in workers: p.join() while not q.empty(): print q.get(), 

    Выход: ('s', 9) ('a', 2) ('b', 3) ('d', 12)

    Порядок элементов в очереди может отличаться.

    Чтобы сделать это более общим и похожим на Пул, вы можете создать фиксированное число N процессов, разделить список ключей на N частей, а затем использовать функцию-оболочку в качестве цели процесса, которая вызовет count_it для каждого ключа в списке он передается, как:

     def wrapper( q, arr, keys ): for k in keys: count_it(q, arr, k) 

    Если данные считываются, просто сделайте их переменной в модуле перед вилкой из пула. Затем все дочерние процессы должны иметь доступ к нему, и он не будет скопирован, если вы не напишете на него.

     import myglobals # anything (empty .py file) myglobals.data = [] def count_it( key ): count = 0 for c in myglobals.data: if c == key: count += 1 return count if __name__ == '__main__': myglobals.data = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" pool = Pool() print pool.map( count_it, ["a", "b", "s", "d"] ) 

    Если вы хотите попытаться использовать Array, хотя можете попробовать с аргументом lock=False (по умолчанию это верно).

    Модуль multiprocessing.sharedctypes предоставляет функции для выделения объектов ctypes из разделяемой памяти, которые могут быть унаследованы дочерними процессами.

    Таким образом, ваше использование sharedctypes неверно. Вы хотите наследовать этот массив из родительского процесса или хотите передать его явно? В первом случае вам нужно создать глобальную переменную, как предлагают другие ответы. Но вам не нужно использовать sharedctypes чтобы передать его явно, просто передайте оригинальные testData .

    Кстати, ваше использование Pool.map() неверно. Он имеет тот же интерфейс, что и встроенная функция map() (вы испортили его с помощью starmap() ?). Ниже приведен пример работы с явным передачей массива:

     from multiprocessing import Pool def count_it( (arr, key) ): count = 0 for c in arr: if c == key: count += 1 return count if __name__ == '__main__': testData = "abcabcs bsdfsdf gdfg dffdgdfg sdfsdfsd sdfdsfsdf" pool = Pool() print pool.map(count_it, [(testData, key) for key in ["a", "b", "s", "d"]]) 
    Interesting Posts
    Python - лучший язык программирования в мире.