Python Multiprocessing сохраняет данные до дальнейшего вызова в каждом процессе

У меня есть большой объект типа, который не может быть разделен между процессами. У него есть методы для его создания и работы над его данными.

Текущий способ, которым я это делаю, – это сначала создать объект в основном родительском процессе, а затем передать его в подпроцессы, когда произойдет какое-то событие. Проблема в том, что всякий раз, когда выполняются подпроцессы, они копируют объект в память каждый раз, когда требуется некоторое время. Я хочу сохранить его в памяти, которая доступна только для них, так что им не нужно копировать их каждый раз, когда они вызывают функцию этого объекта.

Как я могу хранить объект только для собственного использования этого процесса?

Изменить: Код

class MultiQ: def __init__(self): self.pred = instantiate_predict() #here I instantiate the big object def enq_essay(self,essay): p = Process(target=self.compute_results, args=(essay,)) p.start() def compute_results(self, essay): predictions = self.pred.predict_fields(essay) #computation in the large object that doesn't modify the object 

Это копирует большой объект в память каждый раз. Я стараюсь избегать этого.

Редактировать 4: образец короткого кода, который выполняется по 20 данным групп новостей

 import sklearn.feature_extraction.text as ftext import sklearn.linear_model as lm import multiprocessing as mp import logging import os import numpy as np import cPickle as pickle def get_20newsgroups_fnames(): all_files = [] for i, (root, dirs, files) in enumerate(os.walk("/home/roman/Desktop/20_newsgroups/")): if i>0: all_files.extend([os.path.join(root,file) for file in files]) return all_files documents = [unicode(open(f).read(), errors="ignore") for f in get_20newsgroups_fnames()] logger = mp.get_logger() formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s', datefmt = '%H:%M:%S') handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.WARNING) mp._log_to_stderr = True def free_memory(): """ Return free memory available, including buffer and cached memory """ total = 0 with open('/proc/meminfo', 'r') as f: for line in f: line = line.strip() if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')): field, amount, unit = line.split() amount = int(amount) if unit != 'kB': raise ValueError( 'Unknown unit {u!r} in /proc/meminfo'.format(u=unit)) total += amount return total def predict(large_object, essay="this essay will be predicted"): """this method copies the large object in memory which is what im trying to avoid""" vectorized_essay = large_object[0].transform(essay) large_object[1].predict(vectorized_essay) report_memory("done") def train_and_model(): """this is very similar to the instantiate_predict method from my first code sample""" tfidf_vect = ftext.TfidfVectorizer() X = tfidf_vect.fit_transform(documents) y = np.random.random_integers(0,1,19997) model = lm.LogisticRegression() model.fit(X, y) return (tfidf_vect, model) def report_memory(label): f = free_memory() logger.warn('{l:<25}: {f}'.format(f=f, l=label)) def dump_large_object(large_object): f = open("large_object.obj", "w") pickle.dump(large_object, f, protocol=2) f.close() def load_large_object(): f = open("large_object.obj") large_object = pickle.load(f) f.close() return large_object if __name__ == '__main__': report_memory('Initial') tfidf_vect, model = train_and_model() report_memory('After train_and_model') large_object = (tfidf_vect, model) procs = [mp.Process(target=predict, args=(large_object,)) for i in range(mp.cpu_count())] report_memory('After Process') for p in procs: p.start() report_memory('After p.start') for p in procs: p.join() report_memory('After p.join') 

Выход 1:

 19:01:39: [ MainProcess] Initial : 26585728 19:01:51: [ MainProcess] After train_and_model : 25958924 19:01:51: [ MainProcess] After Process : 25958924 19:01:51: [ MainProcess] After p.start : 25925908 19:01:51: [ Process-1] done : 25725524 19:01:51: [ Process-2] done : 25781076 19:01:51: [ Process-4] done : 25789880 19:01:51: [ Process-3] done : 25802032 19:01:51: [ MainProcess] After p.join : 25958272 roman@ubx64:$ du -h large_object.obj 4.6M large_object.obj 

Так что, возможно, большой объект даже не большой, и моя проблема заключалась в использовании памяти из метода преобразования tfidf vectorizer.

теперь, если я изменю основной метод на это:

 report_memory('Initial') large_object = load_large_object() report_memory('After loading the object') procs = [mp.Process(target=predict, args=(large_object,)) for i in range(mp.cpu_count())] report_memory('After Process') for p in procs: p.start() report_memory('After p.start') for p in procs: p.join() report_memory('After p.join') 

Я получаю следующие результаты: Выход 2:

 20:07:23: [ MainProcess] Initial : 26578356 20:07:23: [ MainProcess] After loading the object : 26544380 20:07:23: [ MainProcess] After Process : 26544380 20:07:23: [ MainProcess] After p.start : 26523268 20:07:24: [ Process-1] done : 26338012 20:07:24: [ Process-4] done : 26337268 20:07:24: [ Process-3] done : 26439444 20:07:24: [ Process-2] done : 26438948 20:07:24: [ MainProcess] After p.join : 26542860 

Затем я изменил основной метод:

 report_memory('Initial') large_object = load_large_object() report_memory('After loading the object') predict(large_object) report_memory('After Process') 

И получили эти результаты: Результат 3:

 20:13:34: [ MainProcess] Initial : 26572580 20:13:35: [ MainProcess] After loading the object : 26538356 20:13:35: [ MainProcess] done : 26513804 20:13:35: [ MainProcess] After Process : 26513804 

На данный момент я понятия не имею, что происходит, но многопроцессорность определенно использует больше памяти.

2 Solutions collect form web for “Python Multiprocessing сохраняет данные до дальнейшего вызова в каждом процессе”

Linux использует copy-on-write , что означает, что когда подпроцесс раздвоен, глобальные переменные в каждом подпроцессе используют один и тот же адрес памяти до тех пор, пока значение не будет изменено. Только когда значение изменяется, оно копируется.

Теоретически, если большой объект не модифицирован, он может использоваться подпроцессами, не потребляя больше памяти. Давайте проверим эту теорию.

Вот ваш код, связанный с ведением журнала использования памяти:

 import sklearn.feature_extraction.text as ftext import sklearn.linear_model as lm import multiprocessing as mp import logging logger = mp.get_logger() formatter = logging.Formatter('%(asctime)s: [%(processName)12s] %(message)s', datefmt='%H:%M:%S') handler = logging.StreamHandler() handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.WARNING) mp._log_to_stderr = True def predict(essay="this essay will be predicted"): """this method copies the large object in memory which is what im trying to avoid""" vectorized_essay = large_object[0].transform(essay) large_object[1].predict(vectorized_essay) report_memory("done") def train_and_model(): """this is very similar to the instantiate_predict method from my first code sample""" tfidf_vect = ftext.TfidfVectorizer() N = 100000 corpus = [ 'This is the first document.', 'This is the second second document.', 'And the third one.', 'Is this the first document?', ] * N y = [1, 0, 1, 0] * N report_memory('Before fit_transform') X = tfidf_vect.fit_transform(corpus) model = lm.LogisticRegression() model.fit(X, y) report_memory('After model.fit') return (tfidf_vect, model) def free_memory(): """ Return free memory available, including buffer and cached memory """ total = 0 with open('/proc/meminfo', 'r') as f: for line in f: line = line.strip() if any(line.startswith(field) for field in ('MemFree', 'Buffers', 'Cached')): field, amount, unit = line.split() amount = int(amount) if unit != 'kB': raise ValueError( 'Unknown unit {u!r} in /proc/meminfo'.format(u=unit)) total += amount return total def gen_change_in_memory(): f = free_memory() diff = 0 while True: yield diff f2 = free_memory() diff = f - f2 f = f2 change_in_memory = gen_change_in_memory().next def report_memory(label): logger.warn('{l:<25}: {d:+d}'.format(d=change_in_memory(), l=label)) if __name__ == '__main__': report_memory('Initial') tfidf_vect, model = train_and_model() report_memory('After train_and_model') large_object = (tfidf_vect, model) procs = [mp.Process(target=predict) for i in range(mp.cpu_count())] report_memory('After Process') for p in procs: p.start() for p in procs: p.join() report_memory('After p.join') 

Это дает:

 21:45:01: [ MainProcess] Initial : +0 21:45:01: [ MainProcess] Before fit_transform : +3224 21:45:12: [ MainProcess] After model.fit : +153572 21:45:12: [ MainProcess] After train_and_model : -3100 21:45:12: [ MainProcess] After Process : +0 21:45:12: [ Process-1] done : +2232 21:45:12: [ Process-2] done : +2976 21:45:12: [ Process-3] done : +3596 21:45:12: [ Process-4] done : +3224 21:45:12: [ MainProcess] After p.join : -372 

Сообщается о количестве изменений в KiB свободной памяти (включая кеширование и буферы). Так, например, изменение свободной памяти между «Initial» и «After train_and_model» было около 150 МБ. Таким образом, для large_object требуется около 150 МБ.

Затем, после завершения 4 подпроцессов, потребляется намного меньший объем памяти – около 12 МБ. Потребление памяти может быть связано с созданием подпроцесса плюс память, используемой методами transform и predict .

Таким образом, кажется, что large_object не копируется, так как если бы мы должны были увидеть увеличение около 150 МБ в потребляемой памяти.


Комментарий к вашему прогону по 20 группам новостей :

Вот изменения в свободной памяти:

В 20 новостных группах:

 | Initial | 0 | | After train_and_model | 626804 | <-- Large object requires 627M | After Process | 0 | | After p.start | 33016 | | done | 200384 | | done | -55552 | | done | -8804 | | done | -12152 | | After p.join | -156240 | 

Таким образом, похоже, что для экземпляра большого объекта требуется 627 МБ. Я не знаю, почему дополнительные 200 + МБ были израсходованы после done как достигнуто первое.

Использование load_large_object:

 | Initial | 0 | | After loading the object | 33976 | | After Process | 0 | | After p.start | 21112 | | done | 185256 | | done | 744 | | done | -102176 | | done | 496 | | After p.join | -103912 | 

По-видимому, для большого_объекта требуется всего 34 МБ, остальная часть памяти, 627-34 = 593 МБ, должна была потребляться методами fit_transform и fit вызванными в train_and_model .

Использование одного процесса:

 | Initial | 0 | | After loading the object | 34224 | | done | 24552 | | After Process | 0 | 

Это правдоподобно.

Таким образом, данные, которые вы накопили, похоже, подтверждают утверждение о том, что сам большой объект не копируется каждым подпроцессом. Но возникает новая загадка: почему существует огромное потребление памяти между «После p.start» и первым «сделанным». Я не знаю ответа на это.


Вы можете попробовать поставить вызовы report_memory вокруг

 vectorized_essay = large_object[0].transform(essay) 

а также

 large_object[1].predict(vectorized_essay) 

чтобы увидеть, где потребляется дополнительная память. Я предполагаю, что одним из этих методов scikit-learn является выделение этого (относительно) огромного объема памяти.

Я закончил использование RPC-серверов с помощью Rabbit MQ. Rabbit MQ Tutorial для RPC / Python . Поэтому я создал количество серверов, эквивалентное количеству процессоров на моей машине. Эти серверы запускаются один раз и выделяют память для модели и векторизатора один раз и удерживают ее во время работы. Дополнительные преимущества

  1. Некоторая обработка может быть легко отправлена ​​на другую машину, если вы будете перегружены
  2. Если вычисление завершается с ошибкой на одном сервере, его можно легко отправить на другой сервер
  3. Процесс выделения памяти не был мгновенным в исходном коде, поэтому общее время работы на моем наборе данных упало с 18 секунд до 12 секунд на запрос, потому что память предварительно выделена.

В целом мой код стал намного чище.

Python - лучший язык программирования в мире.