Многопроцессорный пул Python pool.map для нескольких аргументов

В библиотеке многопроцессорности Python есть вариант pool.map, который поддерживает несколько аргументов?

text = "test" def harvester(text, case): X = case[0] return text+ str(X) if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET pool.map(harvester(text,case),case, 1) pool.close() pool.join() 

13 Solutions collect form web for “Многопроцессорный пул Python pool.map для нескольких аргументов”

Ответ на этот вопрос зависит от версии и ситуации. Самый общий ответ для последних версий Python (начиная с 3.3) был впервые описан ниже JF Sebastian . 1 Он использует метод Pool.starmap , который принимает последовательность кортежей аргументов. Затем он автоматически распаковывает аргументы из каждого кортежа и передает их в заданную функцию:

 import multiprocessing from itertools import product def merge_names(a, b): return '{} & {}'.format(a, b) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with multiprocessing.Pool(processes=3) as pool: results = pool.starmap(merge_names, product(names, repeat=2)) print(results) # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ... 

В более простых случаях с фиксированным вторым аргументом вы также можете использовать partial , но только в Python 2.7 и выше:

 import multiprocessing from functools import partial def merge_names(a, b): return '{} & {}'.format(a, b) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with multiprocessing.Pool(processes=3) as pool: results = pool.map(partial(merge_names, b='Sons'), names) print(results) # Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ... 

Если вы используете Python 2.6, вам нужно написать вспомогательную функцию, чтобы явно распаковать аргументы.

 import multiprocessing from itertools import product def merge_names(a, b): return '{} & {}'.format(a, b) def merge_names_unpack(args): return merge_names(*args) if __name__ == '__main__': names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie'] with multiprocessing.Pool(processes=3) as pool: results = pool.map(merge_names_unpack, product(names, repeat=2)) print(results) # Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ... 

1. Большая часть этого была вдохновлена ​​его ответом, который, вероятно, должен был быть принят вместо этого. Но так как этот застрял наверху, лучше всего улучшить его для будущих читателей.

есть ли вариант pool.map, который поддерживает несколько аргументов?

Python 3.3 включает pool.starmap() :

 #!/usr/bin/env python3 from functools import partial from itertools import repeat from multiprocessing import Pool, freeze_support def func(a, b): return a + b def main(): a_args = [1,2,3] second_arg = 1 with Pool() as pool: L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)]) M = pool.starmap(func, zip(a_args, repeat(second_arg))) N = pool.map(partial(func, b=second_arg), a_args) assert L == M == N if __name__=="__main__": freeze_support() main() 

Для более старых версий:

 #!/usr/bin/env python2 import itertools from multiprocessing import Pool, freeze_support def func(a, b): print a, b def func_star(a_b): """Convert `f([1,2])` to `f(1,2)` call.""" return func(*a_b) def main(): pool = Pool() a_args = [1,2,3] second_arg = 1 pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg))) if __name__=="__main__": freeze_support() main() 

Вывод

 1 1 2 1 3 1 

Обратите внимание, как здесь используются здесь itertools.izip() и itertools.repeat() .

Из- за ошибки, упомянутой в @unutbu, вы не можете использовать functools.partial() или аналогичные возможности на Python 2.6, поэтому простую функцию-оболочку func_star() следует явно указывать. См. Также обходной путь, предложенный uptimebox .

Я думаю, что ниже будет лучше

 def multi_run_wrapper(args): return add(*args) def add(x,y): return x+y if __name__ == "__main__": from multiprocessing import Pool pool = Pool(4) results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)]) print results 

вывод

 [3, 5, 7] 

Использование Python 3.3+ с pool.starmap():

 from multiprocessing.dummy import Pool as ThreadPool def write(i, x): print(i, "---", x) a = ["1","2","3"] b = ["4","5","6"] pool = ThreadPool(2) pool.starmap(write, zip(a,b)) pool.close() pool.join() 

Результат:

 1 --- 4 2 --- 5 3 --- 6 

Вы также можете zip () zip(a,b,c,d,e) больше аргументов: zip(a,b,c,d,e)

Если вы хотите иметь постоянное значение, переданное как аргумент, вы должны использовать import itertools а затем zip(itertools.repeat(constant), a) например.

Узнав об itertools в ответе JF Sebastian, я решил сделать еще один шаг и написать пакет parmap , который заботится о распараллеливании, предлагая функции map и starmap на python-2.7 и python-3.2 (и позже также), которые могут принимать любое число позиционных аргументов.

Монтаж

 pip install parmap 

Как распараллеливать:

 import parmap # If you want to do: y = [myfunction(x, argument1, argument2) for x in mylist] # In parallel: y = parmap.map(myfunction, mylist, argument1, argument2) # If you want to do: z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist] # In parallel: z = parmap.starmap(myfunction, mylist, argument1, argument2) # If you want to do: listx = [1, 2, 3, 4, 5, 6] listy = [2, 3, 4, 5, 6, 7] param = 3.14 param2 = 42 listz = [] for (x, y) in zip(listx, listy): listz.append(myfunction(x, y, param1, param2)) # In parallel: listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2) 

Я загрузил parmap в PyPI и в репозиторий github .

В качестве примера на вопрос можно ответить следующим образом:

 import parmap def harvester(case, text): X = case[0] text+ str(X) if __name__ == "__main__": case = RAW_DATASET # assuming this is an iterable parmap.map(harvester, case, "test", chunksize=1) 

Существует вилка multiprocessing называемая pathos ( обратите внимание: используйте версию на github ), которая не нуждается в starmap – функции карты отражают API для карты python, поэтому карта может принимать несколько аргументов. С pathos вы также можете выполнять многопроцессорную обработку в интерпретаторе вместо того, чтобы застревать в блоке __main__ . Пафос должен быть выпущен после некоторого мягкого обновления – в основном преобразования в python 3.x.

  Python 2.7.5 (default, Sep 30 2013, 20:15:49) [GCC 4.2.1 (Apple Inc. build 5566)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> def func(a,b): ... print a,b ... >>> >>> from pathos.multiprocessing import ProcessingPool >>> pool = ProcessingPool(nodes=4) >>> pool.map(func, [1,2,3], [1,1,1]) 1 1 2 1 3 1 [None, None, None] >>> >>> # also can pickle stuff like lambdas >>> result = pool.map(lambda x: x**2, range(10)) >>> result [0, 1, 4, 9, 16, 25, 36, 49, 64, 81] >>> >>> # also does asynchronous map >>> result = pool.amap(pow, [1,2,3], [4,5,6]) >>> result.get() [1, 32, 729] >>> >>> # or can return a map iterator >>> result = pool.imap(pow, [1,2,3], [4,5,6]) >>> result <processing.pool.IMapIterator object at 0x110c2ffd0> >>> list(result) [1, 32, 729] 

Вы можете использовать следующие две функции, чтобы избежать написания обертки для каждой новой функции:

 import itertools from multiprocessing import Pool def universal_worker(input_pair): function, args = input_pair return function(*args) def pool_args(function, *args): return zip(itertools.repeat(function), zip(*args)) 

Используйте функцию function со списками аргументов arg_0 , arg_1 и arg_2 следующим образом:

 pool = Pool(n_core) list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2) pool.close() pool.join() 

Лучше использовать декоратор вместо того, чтобы вручную писать функцию обертки . Особенно, когда у вас есть много функций для отображения, декоратор сэкономит ваше время, избегая писать обертку для каждой функции. Обычно украшенная функция не подбирается, однако мы можем использовать functools чтобы обойти ее. Здесь можно найти больше разногласий.

Вот пример

 def unpack_args(func): from functools import wraps @wraps(func) def wrapper(args): if isinstance(args, dict): return func(**args) else: return func(*args) return wrapper @unpack_args def func(x, y): return x + y 

Затем вы можете сопоставить его с заархивированными аргументами

 np, xlist, ylist = 2, range(10), range(10) pool = Pool(np) res = pool.map(func, zip(xlist, ylist)) pool.close() pool.join() 

Конечно, вы всегда можете использовать Pool.starmap в Python 3 (> = 3.3), как упоминалось в других ответах.

Другая простая альтернатива – обернуть параметры функции в кортеж, а затем обернуть параметры, которые также должны быть переданы в кортежах. Это, возможно, не идеально подходит для обработки больших фрагментов данных. Я считаю, что он будет делать копии для каждого кортежа.

 from multiprocessing import Pool def f((a,b,c,d)): print a,b,c,d return a + b + c +d if __name__ == '__main__': p = Pool(10) data = [(i+0,i+1,i+2,i+3) for i in xrange(10)] print(p.map(f, data)) p.close() p.join() 

Дает вывод в некотором случайном порядке:

 0 1 2 3 1 2 3 4 2 3 4 5 3 4 5 6 4 5 6 7 5 6 7 8 7 8 9 10 6 7 8 9 8 9 10 11 9 10 11 12 [6, 10, 14, 18, 22, 26, 30, 34, 38, 42] 

Другой способ – передать список списков в однопараметрическую процедуру:

 import os from multiprocessing import Pool def task(args): print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1] pool = Pool() pool.map(task, [ [1,2], [3,4], [5,6], [7,8] ]) 

Можно построить список списков аргументов с помощью своего любимого метода.

Из python 3.4.4 вы можете использовать multiprocessing.get_context () для получения объекта контекста для использования нескольких методов запуска:

 import multiprocessing as mp def foo(q, h, w): q.put(h + ' ' + w) print(h + ' ' + w) if __name__ == '__main__': ctx = mp.get_context('spawn') q = ctx.Queue() p = ctx.Process(target=foo, args=(q,'hello', 'world')) p.start() print(q.get()) p.join() 

Или просто просто замените

 pool.map(harvester(text,case),case, 1) 

от:

 pool.apply_async(harvester(text,case),case, 1) 

Лучшее решение для python2:

 from multiprocessing import Pool def func((i, (a, b))): print i, a, b return a + b pool = Pool(3) pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))]) 

2 3 4

1 2 3

0 1 2

вне[]:

[3, 5, 7]

В официальной документации говорится, что он поддерживает только один итеративный аргумент. Мне нравится использовать apply_async в таких случаях. В вашем случае я бы сделал:

 from multiprocessing import Process, Pool, Manager text = "test" def harvester(text, case, q = None): X = case[0] res = text+ str(X) if q: q.put(res) return res def block_until(q, results_queue, until_counter=0): i = 0 while i < until_counter: results_queue.put(q.get()) i+=1 if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET m = Manager() q = m.Queue() results_queue = m.Queue() # when it completes results will reside in this queue blocking_process = Process(block_until, (q, results_queue, len(case))) blocking_process.start() for c in case: try: res = pool.apply_async(harvester, (text, case, q = None)) res.get(timeout=0.1) except: pass blocking_process.join() - from multiprocessing import Process, Pool, Manager text = "test" def harvester(text, case, q = None): X = case[0] res = text+ str(X) if q: q.put(res) return res def block_until(q, results_queue, until_counter=0): i = 0 while i < until_counter: results_queue.put(q.get()) i+=1 if __name__ == '__main__': pool = multiprocessing.Pool(processes=6) case = RAW_DATASET m = Manager() q = m.Queue() results_queue = m.Queue() # when it completes results will reside in this queue blocking_process = Process(block_until, (q, results_queue, len(case))) blocking_process.start() for c in case: try: res = pool.apply_async(harvester, (text, case, q = None)) res.get(timeout=0.1) except: pass blocking_process.join() 
Interesting Posts

Травление __setstate__ и __getstate__ не вызывает класс

Как вы создаете разные имена переменных в цикле? (Python)

BeautifulSoup извлекает данные из нескольких таблиц

Установка lxml для Python 3.4 в Windows x 86 (32 бит) с помощью Visual Studio C ++ 2010 Express

Стэнфордский Парсер и НЛТК

Преобразование списка деревьев в иерархию dict

Python pandas / matplotlib аннотирует метки над столбцами столбцов

Groupby Pandas DataFrame и вычислить среднее значение и stdev одного столбца и добавить std в качестве нового столбца с параметром reset_index

Как заставить Tkinter работать с mac

Сделать монохромный массив без контура Python

Создание повторяющихся дат с использованием python?

Psycopg2 с использованием подстановочных знаков вызывает TypeError

Как фильтровать на рассчитанные значения модели, используя понимание списка

Как создать пул соединений mysql или любой другой способ инициализации нескольких баз данных?

Сравнение двух данных и получение различий

Python - лучший язык программирования в мире.