Почему связь через разделяемую память происходит намного медленнее, чем через очереди?

Я использую Python 2.7.5 на недавнем vintage Apple MacBook Pro, который имеет четыре аппаратных и восемь логических процессоров; т.е. утилита sysctl дает:

$ sysctl hw.physicalcpu hw.physicalcpu: 4 $ sysctl hw.logicalcpu hw.logicalcpu: 8 

Мне нужно выполнить некоторую довольно сложную обработку в большом 1-D списке или массиве, а затем сохранить результат как промежуточный вывод, который будет использоваться снова в дальнейшем в последующем вычислении в моем приложении. Структура моей проблемы довольно естественна для распараллеливания, поэтому я подумал, что попытаюсь использовать многопроцессорный модуль Python для разделения массива 1D на несколько частей (либо 4 части, либо 8 штук, я еще не уверен, что), выполните параллельные вычисления, а затем снова собрать полученный результат в свой окончательный формат. Я пытаюсь решить, следует ли использовать multiprocessing.Queue() (очереди сообщений) или multiprocessing.Array() (разделяемая память) в качестве моего предпочтительного механизма для передачи полученных вычислений из дочерних процессов обратно в основной родительский процесс, и я экспериментировали с несколькими «игрушечными» моделями, чтобы убедиться, что я понимаю, как работает многопроцессорный модуль. Однако я столкнулся с довольно неожиданным результатом: при создании двух принципиально эквивалентных решений одной и той же проблемы версия, использующая разделяемую память для межпроцессного взаимодействия, кажется, требует гораздо большего времени выполнения (например, 30X больше!), Чем версия, использующая сообщение очереди. Ниже я включил две разные версии исходного кода примера для «игрушечной» проблемы, которая генерирует длинную последовательность случайных чисел с использованием параллельных процессов и передает агломерированный результат обратно в родительский процесс двумя разными способами: сначала используя очереди сообщений , и второй раз с использованием общей памяти.

Вот версия, которая использует очереди сообщений:

 import random import multiprocessing import datetime def genRandom(count, id, q): print("Now starting process {0}".format(id)) output = [] # Generate a list of random numbers, of length "count" for i in xrange(count): output.append(random.random()) # Write the output to a queue, to be read by the calling process q.put(output) if __name__ == "__main__": # Number of random numbers to be generated by each process size = 1000000 # Number of processes to create -- the total size of all of the random # numbers generated will ultimately be (procs * size) procs = 4 # Create a list of jobs and queues jobs = [] outqs = [] for i in xrange(0, procs): q = multiprocessing.Queue() p = multiprocessing.Process(target=genRandom, args=(size, i, q)) jobs.append(p) outqs.append(q) # Start time of the parallel processing and communications section tstart = datetime.datetime.now() # Start the processes (ie calculate the random number lists) for j in jobs: j.start() # Read out the data from the queues data = [] for q in outqs: data.extend(q.get()) # Ensure all of the processes have finished for j in jobs: j.join() # End time of the parallel processing and communications section tstop = datetime.datetime.now() tdelta = datetime.timedelta.total_seconds(tstop - tstart) msg = "{0} random numbers generated in {1} seconds" print(msg.format(len(data), tdelta)) 

Когда я запускаю его, я получаю результат, который обычно выглядит примерно так:

 $ python multiproc_queue.py Now starting process 0 Now starting process 1 Now starting process 2 Now starting process 3 4000000 random numbers generated in 0.514805 seconds 

Теперь, это эквивалентный сегмент кода, но реорганизуется немного, так что он использует разделяемую память вместо очередей:

 import random import multiprocessing import datetime def genRandom(count, id, d): print("Now starting process {0}".format(id)) # Generate a list of random numbers, of length "count", and write them # directly to a segment of an array in shared memory for i in xrange(count*id, count*(id+1)): d[i] = random.random() if __name__ == "__main__": # Number of random numbers to be generated by each process size = 1000000 # Number of processes to create -- the total size of all of the random # numbers generated will ultimately be (procs * size) procs = 4 # Create a list of jobs and a block of shared memory jobs = [] data = multiprocessing.Array('d', size*procs) for i in xrange(0, procs): p = multiprocessing.Process(target=genRandom, args=(size, i, data)) jobs.append(p) # Start time of the parallel processing and communications section tstart = datetime.datetime.now() # Start the processes (ie calculate the random number lists) for j in jobs: j.start() # Ensure all of the processes have finished for j in jobs: j.join() # End time of the parallel processing and communications section tstop = datetime.datetime.now() tdelta = datetime.timedelta.total_seconds(tstop - tstart) msg = "{0} random numbers generated in {1} seconds" print(msg.format(len(data), tdelta)) 

Однако, когда я запускаю версию разделяемой памяти, типичный результат выглядит примерно так:

 $ python multiproc_shmem.py Now starting process 0 Now starting process 1 Now starting process 2 Now starting process 3 4000000 random numbers generated in 15.839607 seconds 

Мой вопрос: почему существует такая огромная разница в скорости выполнения (примерно 0,5 секунды против 15 секунд, коэффициент 30X!) Между двумя версиями моего кода? И, в частности, как я могу изменить версию разделяемой памяти, чтобы заставить ее работать быстрее?

One Solution collect form web for “Почему связь через разделяемую память происходит намного медленнее, чем через очереди?”

Это связано с тем, что multiprocessing.Array по умолчанию использует блокировку для предотвращения одновременного доступа к нескольким процессам:

multiprocessing.Array (typecode_or_type, size_or_initializer, *, lock = True)

Если блокировка True (по умолчанию), то создается новый объект блокировки для синхронизации доступа к значению. Если блокировка является объектом Lock или RLock, тогда будет использоваться синхронизация доступа к значению. Если блокировка False, доступ к возвращенному объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным процессом».

Это означает, что вы не выполняете одновременную запись в массив – только один процесс может получить к нему доступ за раз. Так как ваш пример рабочих почти ничего, кроме массива пишет, постоянно ждут на этом замке сильно ухудшает производительность. Если вы используете lock=False при создании массива, производительность намного лучше:

С lock=True :

 Now starting process 0 Now starting process 1 Now starting process 2 Now starting process 3 4000000 random numbers generated in 4.811205 seconds 

С lock=False :

 Now starting process 0 Now starting process 3 Now starting process 1 Now starting process 2 4000000 random numbers generated in 0.192473 seconds 

Обратите внимание, что использование lock=False означает, что вам необходимо вручную защищать доступ к Array когда вы делаете что-то, что не является безопасным процессом. В вашем примере процессы обрабатываются уникальными частями, поэтому все в порядке. Но если вы пытались прочитать это, когда делали это, или выполняли разные процессы для перекрытия частей, вам нужно было бы вручную получить блокировку.

  • Почему создание набора из конкатенированного списка происходит быстрее, чем использование `.update`?
  • Стоимость обработчиков исключений в Python
  • суммировать каждое значение в списке кортежей
  • и {} vs list () и dict (), что лучше?
  • Возвращаемые списки экземпляров не имеют никакого значения? А как насчет производительности?
  • Ускорение в петлеобразных структурах
  • Python defaultdict (list) de / serialization performance
  • Python: итерация по спискам по сравнению с элементами элементов dict
  • itertools.islice по сравнению со списком
  • оператор python, оператор для "не в"
  • Кто-нибудь знает эту структуру данных Python?
  • Python - лучший язык программирования в мире.