Python: Как запустить вложенный параллельный процесс в python?

У меня есть dataset df транзакций трейдера. У меня есть два уровня для циклов:

 smartTrader =[] for asset in range(len(Assets)): df = df[df['Assets'] == asset] # I have some more calculations here for trader in range(len(df['TraderID'])): # I have some calculations here, If trader is successful, I add his ID # to the list as follows smartTrader.append(df['TraderID'][trader]) # some more calculations here which are related to the first for loop. 

Я хотел бы сопоставить вычисления для каждого актива в Assets , и я также хочу, чтобы распараллеливать вычисления для каждого трейдера для каждого актива. После того, как все эти вычисления сделаны, я хочу сделать дополнительный анализ на основе списка smartTrader .

Это моя первая попытка параллельной обработки, поэтому, пожалуйста, будьте терпеливы со мной, и я ценю вашу помощь.

Если вы используете pathos , который предоставляет вилку multiprocessing , вы можете легко развернуть параллельные карты. pathos построен для легкой проверки комбинаций вложенных параллельных карт, которые являются прямыми переводами вложенных циклов. Он предоставляет набор карт, которые блокируют, не блокируют, повторяют, асинхронны, последовательно, параллельно и распределены.

 >>> from pathos.pools import ProcessPool, ThreadPool >>> amap = ProcessPool().amap >>> tmap = ThreadPool().map >>> from math import sin, cos >>> print amap(tmap, [sin,cos], [range(10),range(10)]).get() [[0.0, 0.8414709848078965, 0.9092974268256817, 0.1411200080598672, -0.7568024953079282, -0.9589242746631385, -0.27941549819892586, 0.6569865987187891, 0.9893582466233818, 0.4121184852417566], [1.0, 0.5403023058681398, -0.4161468365471424, -0.9899924966004454, -0.6536436208636119, 0.2836621854632263, 0.9601702866503661, 0.7539022543433046, -0.14550003380861354, -0.9111302618846769]] 

Здесь в этом примере используется пул обработки и пул потоков, где блокируется вызов карты потоков, а вызов карты обработки асинхронен (обратите внимание на get в конце последней строки).

Получить pathos здесь: https://github.com/uqfoundation или с помощью: $ pip install git+https://github.com/uqfoundation/pathos.git@master

Вместо использования for использования используйте map :

 import functools smartTrader =[] m=map( calculations_as_a_function, [df[df['Assets'] == asset] \ for asset in range(len(Assets))]) functools.reduce(smartTradder.append, m) 

С этого момента вы можете попробовать различные реализации параллельных map multiprocessing , так и stackless '

Вероятно, многопоточность, из стандартной библиотеки python, является наиболее удобным подходом:

 import threading def worker(id): #Do you calculations here return threads = [] for asset in range(len(Assets)): df = df[df['Assets'] == asset] for trader in range(len(df['TraderID'])): t = threading.Thread(target=worker, args=(trader,)) threads.append(t) t.start() #add semaphore here if you need synchronize results for all traders.