Сбрасывание многопроцессорности.

Я хочу сбросить multiprocessing.Queue в список. Для этой задачи я написал следующую функцию:

 import Queue def dump_queue(queue): """ Empties all pending items in a queue and returns them in a list. """ result = [] # START DEBUG CODE initial_size = queue.qsize() print("Queue has %s items initially." % initial_size) # END DEBUG CODE while True: try: thing = queue.get(block=False) result.append(thing) except Queue.Empty: # START DEBUG CODE current_size = queue.qsize() total_size = current_size + len(result) print("Dumping complete:") if current_size == initial_size: print("No items were added to the queue.") else: print("%s items were added to the queue." % \ (total_size - initial_size)) print("Extracted %s items from the queue, queue has %s items \ left" % (len(result), current_size)) # END DEBUG CODE return result 

Но по какой-то причине это не сработает.

Соблюдайте следующий сеанс оболочки:

 >>> import multiprocessing >>> q = multiprocessing.Queue() >>> for i in range(100): ... q.put([range(200) for j in range(100)]) ... >>> q.qsize() 100 >>> l=dump_queue(q) Queue has 100 items initially. Dumping complete: 0 items were added to the queue. Extracted 1 items from the queue, queue has 99 items left >>> l=dump_queue(q) Queue has 99 items initially. Dumping complete: 0 items were added to the queue. Extracted 3 items from the queue, queue has 96 items left >>> l=dump_queue(q) Queue has 96 items initially. Dumping complete: 0 items were added to the queue. Extracted 1 items from the queue, queue has 95 items left >>> 

Что тут происходит? Почему не все предметы сбрасываются?

2 Solutions collect form web for “Сбрасывание многопроцессорности.”

Попробуй это:

 import Queue import time def dump_queue(queue): """ Empties all pending items in a queue and returns them in a list. """ result = [] for i in iter(queue.get, 'STOP'): result.append(i) time.sleep(.1) return result import multiprocessing q = multiprocessing.Queue() for i in range(100): q.put([range(200) for j in range(100)]) q.put('STOP') l=dump_queue(q) print len(l) 

Многопроцессорные очереди имеют внутренний буфер, который имеет поток фидера, который тянет работу с буфера и сбрасывает его в трубу. Если не все объекты были сброшены, я мог бы увидеть случай, когда пустая страница поднимается преждевременно. Использование дозорного устройства для указания конца очереди является безопасным (и надежным). Кроме того, использование итерации iter (get, sentinel) просто лучше, чем полагаться на Empty.

Мне не нравится, что он может поднимать пустой из-за времени промывки (я добавил time.sleep (.1), чтобы разрешить контекстный переключатель в поток фидера, вам может и не понадобиться, он работает без него – это привычка отпустите GIL).

В некоторых ситуациях мы уже вычислили все, и хотим просто конвертировать очередь.

 shared_queue = Queue() shared_queue_list = [] ... join() #All process are joined while shared_queue.qsize() != 0: shared_queue_list.append(shared_queue.get()) 

Теперь shared_queue_list имеет результаты, преобразованные в список.

  • Как получить результаты из пула потоков в python?
  • Как передать ссылку на очередь на функцию, управляемую пулом.map_async ()?
  • многопроцессор python - процесс зависает при соединении для большой очереди
  • Длительные задачи в веб-приложении Pyramid
  • многопроцессорная очередь python: ставит независимость от получения?
  • Максимальный предел максимальной длины очереди - 32767
  • Заполнение очереди и управление многопроцессорностью в python
  • Queue vs JoinableQueue в Python
  • Python - лучший язык программирования в мире.