Изменение различных объектов python в параллельных процессах, соответственно

В двух словах

Я хочу изменить сложные объекты python одновременно, при этом каждый объект обрабатывается только одним процессом. Как я могу это сделать (наиболее эффективно)? Будет ли какая-то помощь в помощи по травлению? Будет ли это эффективно?

Полная проблема

У меня есть структура данных Python ArrayDict которая в основном состоит из массива numpy и словаря и отображает произвольные индексы в строки в массиве. В моем случае все ключи являются целыми числами.

 a = ArrayDict() a[1234] = 12.5 a[10] = 3 print(a[1234]) #12.5 print(a[10]) # 3.0 print(a[1234] == a.array[a.indexDict[1234]]) #true 

Теперь у меня есть несколько таких ArrayDict и хочу заполнить их в myMethod(arrayDict, params) . Поскольку myMethod стоит дорого, я хочу запустить его параллельно. Обратите внимание, что myMethod может добавить много строк в arrayDict . Каждый процесс изменяет собственный ArrayDict . Мне не нужен одновременный доступ к ArrayDict .

В myMethod я меняю записи в arrayDict (т. arrayDict внутренний массив numpy ), я добавляю записи в arrayDict (то есть добавляю еще один индекс в словарь и записываю новое значение во внутренний массив). В конце концов, я хотел бы иметь возможность обмениваться arrayDict массив arrayDict когда он становится слишком маленьким. Это случается нечасто, и я могу выполнить это действие в непараллельной части моей программы, если не существует лучшего решения. Мои собственные попытки не были успешными даже без обмена массивами.

Я провел несколько дней, исследуя общую память и многопроцессорный модуль python. Поскольку я, наконец, буду работать над linux, задача была довольно простой: системный вызов fork() позволяет эффективно работать с копиями аргументов. Моя мысль заключалась в том, чтобы изменить каждый ArrayDict в своем собственном процессе, вернуть измененную версию объекта и перезаписать исходный объект. Чтобы сохранить память и сохранить работу для копирования, я использовал в дополнение к массивам sharedmem для хранения данных в ArrayDict . Я знаю, что словарь все равно должен быть скопирован.

 from sharedmem import sharedmem import numpy as np n = ... # length of the data array myData = np.empty(n, dtype=object) myData[:] = [ArrayDict() for _ in range(n)] done = False while not done: consideredData = ... # numpy boolean array of length # n with True at the index of # considered data args = ... # numpy array containing arguments # for myMethod with sharedmem.MapReduce() as pool: results = pool.map(myMethod, list(zip(myData[considered], args[considered])), star=True) myData[considered] = results done = ... # depends on what happens in # myMethod - from sharedmem import sharedmem import numpy as np n = ... # length of the data array myData = np.empty(n, dtype=object) myData[:] = [ArrayDict() for _ in range(n)] done = False while not done: consideredData = ... # numpy boolean array of length # n with True at the index of # considered data args = ... # numpy array containing arguments # for myMethod with sharedmem.MapReduce() as pool: results = pool.map(myMethod, list(zip(myData[considered], args[considered])), star=True) myData[considered] = results done = ... # depends on what happens in # myMethod 

То, что я получаю, является ошибкой ошибки сегментации. Я смог обойти эту ошибку, создав глубокие ArrayDict s myMethod и сохранив их в myData . Я действительно не понимаю, почему это необходимо, и часто копируя мои (потенциально очень большие) массивы (цикл while занимает много времени) не так эффективен для меня. Однако, по крайней мере, это работало в определенной степени. Тем не менее, моя программа имеет некоторые ошибки на 3-й итерации из-за разделяемой памяти. Поэтому я считаю, что мой путь не оптимален.

Я читаю здесь и здесь, что можно сохранить aribtrary numpy-массивы в общей памяти с использованием multiprocessing.Array . Тем не менее, мне все равно придется делиться всем ArrayDict , который включает, в частности, словарь, который, в свою очередь, не подбирается.

Как я мог бы эффективно достичь своих целей? Было бы возможно (и эффективно) сделать мой объект подбираемым каким-то образом?

Все решения должны работать с поддержкой python 3 и полной поддержкой numpy / scipy на 64-битной Linux.

редактировать

Я нашел здесь, что как-то можно делиться произвольными объектами с использованием классов Multiprocessing «Manager» и определяемых пользователем прокси-классов. Будет ли это эффективно? Я хотел бы использовать, что мне не нужен одновременный доступ к объектам, хотя они не обрабатываются в основном процессе. Будет ли возможность создать менеджера для каждого объекта, который я хочу обработать? (У меня могут быть некоторые заблуждения относительно того, как работают манипуляторы.)

One Solution collect form web for “Изменение различных объектов python в параллельных процессах, соответственно”

Это похоже на довольно сложный класс, и я не могу полностью ожидать, будет ли это решение работать в вашем случае. Простым компромиссом для такого сложного класса является использование ProcessPoolExecutor .

Если это не ответит на ваш вопрос, тогда это будет хорошо с минимальным рабочим примером.

 from concurrent.futures import ProcessPoolExecutor import numpy as np class ArrayDict (): keys = None vals = None def __init__ (self): self.keys = dict () self.vals = np.random.rand (1000) def __str__ (self): return "keys: " + str(self.keys) + ", vals: " + str(self.vals.mean()) def myMethod (ad, args): print ("starting:", ad) if __name__ == '__main__': l = [ArrayDict() for _ in range (5)] args = [2, 3, 4, 1, 3] with ProcessPoolExecutor (max_workers = 2) as ex: d = ex.map (myMethod, l, args) 

Объекты клонируются при отправке дочернему процессу, вам нужно вернуть результат (так как изменения объекта не будут передаваться обратно в основной процесс) и обработать, как вы хотите их сохранить.

Обратите внимание, что изменения в переменных класса будут распространяться на другие объекты в одном и том же процессе, например, если у вас больше задач, чем процессов, изменения в переменных класса будут совместно использоваться экземплярами, запущенными в том же процессе. Это обычно нежелательное поведение.

Это высокоуровневый интерфейс для распараллеливания. ProcessPoolExecutor использует модуль multiprocessing и может использоваться только с выбираемыми объектами . Я подозреваю, что ProcessPoolExecutor имеет производительность, аналогичную «совместному использованию процессов между процессами» . Под капотом ProcessPoolExecutor использует multiprocessing.Process и должен обладать одинаковой производительностью, такой как Pool (за исключением использования очень длинных итераций с картой). ProcessPoolExecutor , по-видимому, является предполагаемым будущим API для одновременных задач в python.

Если это возможно, обычно быстрее использовать ThreadPoolExecutor (который может быть заменен для ProcessPoolExecutor ). В этом случае объект распределяется между процессами, а обновление до одного будет распространяться обратно в основной поток.

Как уже упоминалось, самым быстрым вариантом является, вероятно, ArrayDict так что он использует только объекты, которые могут быть представлены multiprocessing.Value . ArrayDict или Array .

Если ProcessPoolExecutor не работает, и вы не можете оптимизировать ArrayDict , вы можете зависеть от использования Manager . Есть хорошие примеры того, как это сделать здесь .

Наибольшее усиление производительности часто можно найти в myMethod . И, как я уже упоминал, накладные расходы на использование потоков меньше, чем на процессы.

Python - лучший язык программирования в мире.