Руководство по использованию Concurrent.futures – простой пример использования потоков и обработки

Я хочу включить параллельную обработку / потоки моей программы, используя модуль concurrent.futures .

К сожалению, я не могу найти никаких хороших, простых идиотских примеров использования модуля concurrent.futures. Как правило, они требуют более глубоких знаний о концепциях python или обработки / потоки и жаргоне.

Ниже приведен упрощенный, самодостаточный пример, основанный на моей программе: существует чисто задача с привязкой к ЦП, идеально подходящая для многопроцессорной обработки, и отдельная связанная задача IO, вставляемая в базу данных (SQLite). В моей программе я уже преобразовал это, чтобы использовать класс многопроцессорного пула, но поскольку результаты из задачи привязки к ЦП собраны в ожидании завершения задач, она использует огромные объемы памяти. Таким образом, я хочу использовать комбинацию потоковой обработки / обработки, которая, как я полагаю, сопряжена.футболы могут сделать для меня довольно просто.

Итак, как мне преобразовать нижеследующее в то, что использует этот модуль?

import sqlite3 #Stand in CPU intensive task def calculate(value): return value * 10 #Stand in Thread I/O intensive task def output(value): global db if (value % 1000) == 0: db.execute('delete from test_table') db.execute('insert into test_table (result) values (?)', (value,)) def main(): global db results = [] db = sqlite3.connect('e:\\z_dev\\test.sqlite') db.cursor() #========= #Perform CPU intensive task for i in range(1000): results.append( calculate(i)) #Perform Threading intensive task for a in results: output(a) #========= db.commit() db.close() if __name__ == '__main__': main() 

Я ищу ответ, который не использует какой-либо причудливый / сложный питон. Или простое простое объяснение, или, в идеале, оба!

благодаря

Изменить : моя текущая «многопроцессорная» реализация. Наверное, неправильно, но, похоже, это работает. Нет нити. Это входит в часть «# =========» выше.

 #Multiprocessing pool = multiprocessing.Pool(None) for i in range(1000): results.append( pool.apply_async(calculate(i))) pool.close() pool.join() for i in results: results[i] = results[i].get() #Complete lack of threading; but if I had it, it'd be here: for a in results: output(a) 

  • Невозможно установить модули через pip на python 3.4
  • Как получить имя / файл скрипта из файла sitecustomize.py?
  • Какие кодировки файлов поддерживаются для исходных файлов Python 3?
  • Самосознание аннотаций типа в Python
  • Загрузите gensim Word2Vec, вычисленный в Python 2, в Python 3
  • Как запустить python для windows?
  • Почему «1000000000000000 в диапазоне (1000000000000001)» так быстро в Python 3?
  • В Python `pip -r requirements.txt` не устанавливает пакеты * рекурсивно *?
  • One Solution collect form web for “Руководство по использованию Concurrent.futures – простой пример использования потоков и обработки”

    concurrent.futures имеет минималистичный API. Он прост в использовании для очень простых проблем, но у вас нет очень простой проблемы. Если бы вы это сделали, вы бы уже решили это 😉

    Вы не указали какой-либо из multiprocessing.Pool кода, который вы написали, но это было бы более перспективным местом для начала – если вы хотите решить проблему больше, чем хотите подтвердить свою надежду на то, что это должно быть легко сделать, если вы только вы переключились на более слабый API 😉

    «Очевидный способ продолжения использования multiprocessing – использовать метод Pool.apply_async() , поместить объекты результата async на ограниченный Queue.Queue и потоки в вашей основной программе вытащить их из Queue и дождаться результатов объявиться. Это достаточно легко, но это не волшебство. Он решает вашу проблему, потому что ограниченные Queuesэто канонический способ посредничества между производителями и потребителями, которые работают на разных скоростях. Ничто в concurrent.futures затрагивает эту проблему напрямую, и это лежит в основе вашей проблемы с «огромными объемами памяти».

     # Define global result_queue only in the main program. import Queue result_queue = Queue.Queue(100) # pick a reasonable max size based on your problem # Run this in as many threads as you like. def consume_results(): while True: a = result_queue.get() if a is None: break output(a.get()) # `output()` is your function ... # main program passes out work, after starting threads for i in range(1000): # the .put() will block so long as the queue is at its max size result_queue.put(pool.apply_async(calculate, args=(i,))) # add sentinels to let threads know they're done for i in range(number_of_threads_you_started): result_queue.put(None) 

    Это то, что вам нужно, чтобы поддерживать производителей и потребителей примерно в балансе, и в какой-либо стандартной библиотеке нет ничего, что сделало бы это для вас по волшебству.

    EDIT – сглаживание

    Вот полный, исполняемый пример, который может запускать любой пользователь Python3. Заметки:

    • Он не использует ваши фрагменты кода, потому что они полагаются на внешний модуль базы данных, который не каждый может выполнять.
    • Он придерживается concurrent.futures для управления как процессами, так и потоками. На самом деле не так сложно использовать multiprocessing и threading , и, действительно, способы использования потоков здесь будут немного проще, используя threading напрямую. Но этот путь достаточно ясен.
    • Параметр concurrent.futures Future объект – это в основном то же самое, что и объект результата async multiprocessing – функциональные возможности API просто различаются по-разному.
    • Ваша проблема не проста, потому что она имеет несколько этапов, которые могут работать с разной скоростью. Опять же, ничто в любой стандартной библиотеке не может скрыть потенциально плохие последствия этого по магии. Создание собственной ограниченной очереди остается лучшим решением для этого. Использование памяти здесь останется скромным для любого нормального значения MAX_QUEUE_SIZE .
    • Обычно вы не хотите создавать больше рабочих процессов, связанных с процессором, чем на один меньше, чем количество ядер, доступных для использования. Основной программе также нужны циклы для запуска, а также ОС.
    • Как только вы привыкнете к этому материалу, все комментарии в этом коде будут раздражать, например, увидеть комментарий «increment by 1» на кодовой строке i += 1 😉

    Вот код:

     import concurrent.futures as cf import threading import queue NUM_CPUS = 3 NUM_THREADS = 4 MAX_QUEUE_SIZE = 20 # Runs in worker processes. def producer(i): return i + 10 def consumer(i): global total # We need to protect this with a lock because # multiple threads in the main program can # execute this function simultaneously. with sumlock: total += i # Runs in threads in main program. def consume_results(q): while True: future = q.get() if future is None: break else: consumer(future.result()) if __name__ == "__main__": sumlock = threading.Lock() result_queue = queue.Queue(MAX_QUEUE_SIZE) total = 0 NUM_TO_DO = 1000 with cf.ThreadPoolExecutor(NUM_THREADS) as tp: # start the threads running `consume_results` for _ in range(NUM_THREADS): tp.submit(consume_results, result_queue) # start the worker processes with cf.ProcessPoolExecutor(NUM_CPUS) as pp: for i in range(NUM_TO_DO): # blocks until the queue size <= MAX_QUEUE_SIZE result_queue.put(pp.submit(producer, i)) # tell threads we're done for _ in range(NUM_THREADS): result_queue.put(None) print("got", total, "expected", (10 + NUM_TO_DO + 9) * NUM_TO_DO // 2) 

    Если все хорошо, это ожидаемый результат:

     got 509500 expected 509500 
    Python - лучший язык программирования в мире.