Совместное использование нескольких очередей среди процессов в Python

Я знаю о multiprocessing.Manager() и о том, как его можно использовать для создания общих объектов, в частности очередей, которые могут совместно использоваться рабочими. Это вопрос , этот вопрос , этот вопрос и даже один из моих собственных вопросов .

Однако мне нужно определить большое количество очередей, каждая из которых связывает определенные пары процессов. Скажем, что каждая пара процессов и связанная с ней очередь связывания определяются key переменной.

Я хочу использовать словарь для доступа к моим очередям, когда мне нужно поставить и получить данные. Я не могу сделать эту работу. Я пробовал несколько вещей. При multiprocessing импортируемой как mp :

Определение типа dict for key in all_keys: DICT[key] = mp.Queue в файле конфигурации, который импортируется модулем многопроцессорности (вызов его multi.py ), не возвращает ошибок, но очередь DICT[key] не используется между процессами, каждая из них, похоже, имеет свою собственную копию очереди, и, таким образом, связь не происходит.

Если я попытаюсь определить DICT в начале основной функции многопроцессорности, которая определяет процессы и запускает их, например

 DICT = mp.Manager().dict() for key in all_keys: DICT[key] = mp.Queue() 

Я получаю ошибку

 RuntimeError: Queue objects should only be shared between processes through inheritance 

Переход на

 DICT = mp.Manager().dict() for key in all_keys: DICT[key] = mp.Manager().Queue() 

только делает все хуже. Попытка схожих определений во главе multi.py а не внутри основной функции возвращает подобные ошибки.

Должен быть способ разделить много очередей между процессами без явного называния каждого из них в коде. Есть идеи?

редактировать

Вот базовая схема программы:

1- загрузить первый модуль, который определяет некоторые переменные, импортирует multi , запускает multi.main() и загружает другой модуль, который запускает каскад загрузки модулей и выполнения кода. В то же время …

2- multi.main выглядит следующим образом:

 def main(): manager = mp.Manager() pool = mp.Pool() DICT2 = manager.dict() for key in all_keys: DICT2[key] = manager.Queue() proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,) 

Вместо использования pool и manager я также запускал процессы со следующим:

 mp.Process(target=targ1, args=(DICT[key],)) 

3 – Функция targ1 принимает входные данные, которые поступают (сортируются по key ) из основного процесса. Он предназначен для передачи результата в DICT[key] поэтому targ2 может выполнять свою работу. Это часть, которая не работает. Существует произвольное количество targ1 , targ2 и т. Д. И, следовательно, произвольное количество очередей.

4 – Результаты некоторых из этих процессов будут отправлены на кучу различных массивов данных / pandas, которые также индексируются по key , и которые я хотел бы получить от произвольных процессов, даже запущенных в другом модуле. Мне еще предстоит написать эту часть, и это может быть другой вопрос. (Я упоминаю это здесь, потому что ответ на 3 выше может также решить 4.)

One Solution collect form web for “Совместное использование нескольких очередей среди процессов в Python”

Похоже, что ваши проблемы начались, когда вы пытались передать multiprocessing.Queue() , передав его в качестве аргумента. Вы можете обойти это, создав вместо этого управляемую очередь :

 import multiprocessing manager = mutiprocessing.Manager() passable_queue = manager.Queue() 

Когда вы используете менеджера для его создания, вы сохраняете и передаете прокси-сервер в очередь, а не в очередь, так что даже когда объект, который вы передаете своим рабочим процессам, скопирован, он все равно будет указывать на то же самое структура данных: ваша очередь. Он очень похож (по понятию) на указатели на C / C ++. Если вы создадите свои очереди таким образом, вы сможете передать их при запуске рабочего процесса.

Поскольку теперь вы можете передавать очереди, вам больше не нужен ваш словарь для управления. Держите нормальный словарь в главном, который сохранит все сопоставления и только даст вашему работнику обрабатывать очереди, в которых они нуждаются, поэтому им не нужен доступ к каким-либо сопоставлениям.

Я написал пример этого здесь. Похоже, вы проходите объекты между вашими работниками, вот что здесь делается. Представьте, что у нас есть две стадии обработки, и данные начинаются и заканчиваются в контроле main . Посмотрите, как мы можем создавать очереди, которые соединяют рабочих как конвейер, но, предоставляя им только очереди, которые им нужны , им не нужно знать о каких-либо сопоставлениях:

 import multiprocessing as mp def stage1(q_in, q_out): q_out.put(q_in.get()+"Stage 1 did some work.\n") return def stage2(q_in, q_out): q_out.put(q_in.get()+"Stage 2 did some work.\n") return def main(): pool = mp.Pool() manager = mp.Manager() # create managed queues q_main_to_s1 = manager.Queue() q_s1_to_s2 = manager.Queue() q_s2_to_main = manager.Queue() # launch workers, passing them the queues they need results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2)) results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main)) # Send a message into the pipeline q_main_to_s1.put("Main started the job.\n") # Wait for work to complete print(q_s2_to_main.get()+"Main finished the job.") pool.close() pool.join() return if __name__ == "__main__": main() 

Код выводит этот результат:

Главное начало работу.
Этап 1 проделал определенную работу.
Этап 2 сделал некоторую работу.
Главная закончила работу.

Я не включил пример хранения очередей или объектов AsyncResults в словарях, потому что я до сих пор не совсем понимаю, как должна работать ваша программа. Но теперь, когда вы можете свободно передавать свои очереди, вы можете построить свой словарь для хранения сопоставлений очереди / процессов по мере необходимости.

На самом деле, если вы действительно строите конвейер между несколькими рабочими, вам даже не нужно указывать ссылки на «межличностные» очереди в main . Создайте очереди, передайте их своим работникам, а затем сохраните ссылки на очереди, которые будут использоваться main . Я определенно рекомендовал бы ставить старую очередь на сбор мусора как можно быстрее, если у вас действительно есть «произвольное число» очередей.

Python - лучший язык программирования в мире.