multiprocessing.Pool: вызов вспомогательных функций при использовании опции обратного вызова apply_async

Как поток apply_async работает между вызовом функции iterable (?) И функции обратного вызова?

Настройка: я читаю некоторые строки всех файлов в каталоге файлов 2000, некоторые из них содержат миллионы строк, некоторые из них только с несколькими. Некоторые данные заголовка / форматирования / даты извлекаются для charecterize каждого файла. Это делается на 16-процессорной машине, поэтому имеет смысл многопроцессорно работать.

В настоящее время ожидаемый результат отправляется в список ( ahlala ), поэтому я могу его распечатать; позже это будет записано в * .csv. Это упрощенная версия моего кода, первоначально основанная на этом чрезвычайно полезном сообщении.

 import multiprocessing as mp def dirwalker(directory): ahlala = [] # X() reads files and grabs lines, calls helper function to calculate # info, and returns stuff to the callback function def X(f): fileinfo = Z(arr_of_lines) return fileinfo # Y() reads other types of files and does the same thing def Y(f): fileinfo = Z(arr_of_lines) return fileinfo # results() is the callback function def results(r): ahlala.extend(r) # or .append, haven't yet decided # helper function def Z(arr): return fileinfo # to X() or Y()! for _,_,files in os.walk(directory): pool = mp.Pool(mp.cpu_count() for f in files: if (filetype(f) == filetypeX): pool.apply_async(X, args=(f,), callback=results) elif (filetype(f) == filetypeY): pool.apply_async(Y, args=(f,), callback=results) pool.close(); pool.join() return ahlala 

Обратите внимание: код работает, если я помещаю всю функцию Z() , вспомогательную функцию, в X() , Y() или results() , но является ли это повторяющимся или, возможно, более медленным, чем это возможно? Я знаю, что функция обратного вызова вызывается для каждого вызова функции, но когда называется функция обратного вызова? Это после pool.apply_async() … завершает все задания для процессов? Разве не должно быть быстрее, если эти вспомогательные функции вызывались внутри области (?) Первой функции pool.apply_async() принимает (в данном случае X() )? Если нет, должен ли я просто поставить вспомогательную функцию в results() ?

Другие связанные идеи: являются ли процессы демонов причиной чего-либо? Я также очень смущен тем, как ставить в очередь, и если это проблема. Кажется, это место, чтобы начать изучать его , но можно ли без очереди игнорировать очереди при использовании apply_async или только при заметном времени неэффективности?

One Solution collect form web for “multiprocessing.Pool: вызов вспомогательных функций при использовании опции обратного вызова apply_async”

Вы спрашиваете о целом ряде вещей, поэтому я постараюсь все это покрыть, насколько это возможно:

Функция, которую вы передаете для callback будет выполняться в основном процессе (а не у рабочего), как только рабочий процесс вернет результат. Он выполняется в потоке, который создает объект Pool внутри. Этот поток потребляет объекты из результата result_queue , который используется для получения результатов от всех рабочих процессов. После того, как поток вытащил результат из очереди, он выполняет callback . Пока выполняется ваш обратный вызов, никакие другие результаты не могут быть вытащены из очереди, поэтому важно, чтобы обратный вызов быстро заканчивался. В вашем примере, как только один из вызовов X или Y выполняемых с помощью apply_async завершится, результат будет помещен в result_queue рабочим процессом, а затем поток обработки результатов вытащит результат из result_queue , и ваш callback будет выполнен.

Во-вторых, я подозреваю, что причина, по которой вы не видите, что что-то происходит с вашим примером кода, связана с тем, что все вызовы вашей рабочей функции терпят неудачу. Если рабочая функция не работает, callback никогда не будет выполнен. Сбой вообще не сообщается, если вы не попытаетесь извлечь результат из объекта AsyncResult возвращаемого вызовом apply_async . Однако, поскольку вы не сохраняете ни один из этих объектов, вы никогда не узнаете, что произошли сбои. Если бы я был вами, я бы попытался использовать pool.apply пока вы тестируете, чтобы вы видели ошибки, как только они появились.

Причина, по которой рабочие, вероятно, не работают (по крайней мере, в приведенном вами примере кода), состоит в том, что X и Y определяются как функция внутри другой функции. multiprocessing передает функции и объекты рабочим процессам путем травления их в основном процессе и рассыпания их в рабочих процессах. Функции, определенные внутри других функций, не подбираются, что означает, что multiprocessing не сможет успешно распаковать их в рабочем процессе. Чтобы исправить это, определите обе функции на верхнем уровне вашего модуля, а не встроенную функцию dirwalker .

Вы должны обязательно продолжать называть Z от X и Y , а не к results . Таким образом, Z можно запускать одновременно во всех ваших рабочих процессах, вместо того, чтобы запускать один вызов за раз в вашем основном процессе. И помните, что ваша функция callback должна быть как можно быстрее, поэтому вы не задерживаете результаты обработки. Выполнение Z там замедлит работу.

Вот простой пример кода, похожий на то, что вы делаете, что, надеюсь, дает вам представление о том, как выглядит ваш код:

 import multiprocessing as mp import os # X() reads files and grabs lines, calls helper function to calculate # info, and returns stuff to the callback function def X(f): fileinfo = Z(f) return fileinfo # Y() reads other types of files and does the same thing def Y(f): fileinfo = Z(f) return fileinfo # helper function def Z(arr): return arr + "zzz" def dirwalker(directory): ahlala = [] # results() is the callback function def results(r): ahlala.append(r) # or .append, haven't yet decided for _,_,files in os.walk(directory): pool = mp.Pool(mp.cpu_count()) for f in files: if len(f) > 5: # Just an arbitrary thing to split up the list with pool.apply_async(X, args=(f,), callback=results) # ,error_callback=handle_error # In Python 3, there's an error_callback you can use to handle errors. It's not available in Python 2.7 though :( else: pool.apply_async(Y, args=(f,), callback=results) pool.close() pool.join() return ahlala if __name__ == "__main__": print(dirwalker("/usr/bin")) 

Вывод:

 ['ftpzzz', 'findhyphzzz', 'gcc-nm-4.8zzz', 'google-chromezzz' ... # lots more here ] 

Редактировать:

Вы можете создать объект dict, общий для родительского и дочернего процессов, используя класс multiprocessing.Manager :

 pool = mp.Pool(mp.cpu_count()) m = multiprocessing.Manager() helper_dict = m.dict() for f in files: if len(f) > 5: pool.apply_async(X, args=(f, helper_dict), callback=results) else: pool.apply_async(Y, args=(f, helper_dict), callback=results) 

Затем сделайте X и Y возьмите второй аргумент helper_dict (или любое helper_dict имя, которое вы хотите), и вы все настроены.

Суть в том, что это сработало, создав серверный процесс, который содержит нормальный dict, и все ваши другие процессы говорят об этом с помощью прокси-объекта. Поэтому каждый раз, когда вы читаете или пишете в dict, вы делаете IPC. Это делает его намного медленнее, чем настоящий дикт.

  • Что такое метод работы .join () для процесса многопроцессорности Python?
  • segfault с использованием lapack_lite от numpy с многопроцессорной обработкой на osx, а не linux
  • Почему падение скорости увеличивается для генерации 400 000 000 случайных чисел?
  • python многопроцессорный набор процесс нереста ждать
  • Как получить несколько возвращаемых значений функции, вызванной многопроцессорной обработкой. Процесс
  • Как очистить многопроцессорную очередь в python
  • Как передать экземпляр multiprocessing.Pool для функции обратного вызова apply_async?
  • Проблемы обмена данными между процессами
  • Python присоединяется к процессу, не блокируя родительский
  • Эффективное чтение файла в python с необходимостью разбиения на '\ n'
  • Запуск нескольких сценариев python одновременно с различным именем CMD
  • Python - лучший язык программирования в мире.