Показывать индикатор выполнения для моего многопоточного процесса

У меня есть простое веб-приложение Flask, которое делает много HTTP-запросов внешней службе, когда пользователь нажимает кнопку. На стороне клиента у меня есть приложение angularjs.

Серверная часть кода выглядит так (с использованием multiprocessing.dummy ):

worker = MyWorkerClass() pool = Pool(processes=10) result_objs = [pool.apply_async(worker.do_work, (q,)) for q in queries] pool.close() # Close pool pool.join() # Wait for all task to finish errors = not all(obj.successful() for obj in result_objs) # extract result only from successful task items = [obj.get() for obj in result_objs if obj.successful()] 

Как вы можете видеть, я использую apply_async потому что я хочу позже проверить каждую задачу и извлечь из них результат только в том случае, если задача не вызвала каких-либо исключений.

Я понял, что для того, чтобы показать индикатор выполнения на стороне клиента, мне нужно опубликовать где-то количество завершенных задач, поэтому я сделал простой вид следующим образом:

 @app.route('/api/v1.0/progress', methods=['GET']) def view_progress(): return jsonify(dict(progress=session['progress'])) 

Это покажет содержимое переменной сеанса. Теперь, во время процесса, мне нужно обновить эту переменную с количеством завершенных задач (общее количество завершенных задач фиксировано и известно).

Любые идеи о том, как это сделать? Я работаю в правильном направлении?

Я видел подобные вопросы на SO, как этот, но я не могу адаптировать ответ к моему делу.

Спасибо.

3 Solutions collect form web for “Показывать индикатор выполнения для моего многопоточного процесса”

Для межпроцессного общения вы можете использовать multiprocessiong.Queue, и ваши работники могут put_nowait кортежи put_nowait с информацией о ходе работы, выполняя свою работу. Ваш основной процесс может обновлять все ваши данные view_progress, пока все результаты не будут готовы.

Похоже, что в этом примере использование очереди с несколькими настройками:

В писателях (рабочих) я бы использовал put_nowait вместо put потому что работа важнее, чем ждать, чтобы сообщить, что вы работаете (но, возможно, вы судите об этом иначе и решаете, что информирование пользователя является частью задачи и никогда не должно быть пропущено) ,

Пример просто puts строки в очередь, я бы использовал collection.namedtuples для более структурированных сообщений. В задачах с множеством шагов это позволяет вам повысить разрешение вашего отчета о проделанной работе и сообщить пользователю больше.

В общем, подход, который вы принимаете, в порядке, я делаю это аналогичным образом.

Чтобы рассчитать прогресс, вы можете использовать вспомогательную функцию, которая учитывает завершенные задачи:

 def get_progress(result_objs): done = 0 errors = 0 for r in result_objs: if r.ready(): done += 1 if not r.successful(): errors += 1 return (done, errors) 

Обратите внимание, что в качестве бонуса эта функция возвращает количество выполненных задач, закончившихся ошибками.

Большая проблема заключается в пути /api/v1.0/progress чтобы найти массив объектов AsyncResult .

К сожалению, объекты AsyncResult не могут быть сериализованы для сеанса, поэтому опция отсутствует. Если ваше приложение поддерживает один набор задач async за один раз, вы можете просто сохранить этот массив как глобальную переменную. Если вам необходимо поддерживать несколько клиентов, каждый из которых имеет другой набор задач async, тогда вам нужно будет разработать стратегию для хранения данных сеанса клиента на сервере.

Я реализовал решение для одного клиента в качестве быстрого теста. Мои функции просмотра:

 results = None @app.route('/') def index(): global results results = [pool.apply_async(do_work) for n in range(20)] return render_template('index.html') @app.route('/api/v1.0/progress') def progress(): global results total = len(results) done, errored = get_progress(results) return jsonify({'total': total, 'done': done, 'errored': errored}) 

Надеюсь, это поможет!

Я думаю, что вы должны иметь возможность обновлять количество завершенных задач с помощью многопроцессорной обработки . Целое и многопроцессорное.Lock .

В своем основном коде используйте:

 processes=multiprocessing.Value('i', 10) lock=multiprocessing.Lock() 

И затем, когда вы вызываете worker.dowork, передайте объект блокировки и значение для него:

 worker.dowork(lock, processes) 

В коде employee.dowork уменьшите «процессы» на единицу при завершении кода:

 lock.acquire() processes.value-=1 lock.release() 

Теперь «process.value» должен быть доступен из вашего основного кода и быть равен количеству оставшихся процессов. Удостоверьтесь, что вы приобрели блокировку до появления процессов.

  • python threading: модель памяти и видимость
  • Как запустить многолетнюю работу в фоновом режиме на Python
  • Флакон: фоновый поток видит непустую очередь как пустую
  • Наличие консоли в однопоточном скрипте Python
  • Параллельное сопоставление файлов, Python
  • Начните приложение с флягой в отдельной нити
  • Threading / Queue в Python
  • Примеры потоков и очереди Python
  • Python использует несколько процессоров
  • Проблема блокировки приложений Python / OpenCV
  • Исключение gevent.hub.LoopExit: LoopExit («Эта операция будет заблокирована навсегда»,)
  • Python - лучший язык программирования в мире.