Многопроцессорность Python Pool.apply_async с общими переменными (значение)

Для моего проекта колледжа я пытаюсь создать генератор трафика на основе python. Я создал 2 CentOS-машины на vmware, и я использую 1 в качестве моего клиента и 1 в качестве моей серверной машины. Я использовал технологию IP aliasing для увеличения количества клиентов и разделителей, используя только одну клиентскую / серверную машину. До сих пор я создал 50 псевдонимов IP на моем клиентском компьютере и 10 IP-псевдонимов на моем сервере. Я также использую многопроцессорный модуль для одновременного создания трафика из всех 50 клиентов на все 10 серверов. У меня также было создано несколько профилей (1kb, 10kb, 50kb, 100kb, 500kb, 1mb) на моем сервере (в каталоге / var / www / html, так как я использую Apache Server), и я использую urllib2 для отправки запроса в эти профили из моя клиентская машина. Я использую httplib + urllib2, чтобы сначала связать с любым из псевдонимов исходного кода, а затем отправить запрос с этого ip с помощью urllib2. Здесь, чтобы увеличить количество TCP-соединений , я пытаюсь использовать модуль multiprocessing.Pool.apply_async. Но я получаю эту ошибку: RuntimeError: синхронизированные объекты должны быть разделены между процессами через наследование при запуске моих скриптов. После некоторой отладки я обнаружил, что эта ошибка вызвана использованием многопроцессорности. Value. Но я хочу поделиться некоторыми переменными между моими процессами, а также хочу увеличить количество TCP-соединений. Какой еще модуль (кроме multiprocessing.Value) можно использовать здесь для обмена некоторыми общими переменными? Или есть ли другое решение для этого запроса?

''' Traffic Generator Script: Here I have used IP Aliasing to create multiple clients on single vm machine. Same I have done on server side to create multiple servers. I have around 50 clients and 10 servers ''' import multiprocessing import urllib2 import random import myurllist #list of all destination urls for all 10 servers import time import socbindtry #script that binds various virtual/aliased client ips to the script m=multiprocessing.Manager() response_time=m.list() #some shared variables error_count=multiprocessing.Value('i',0) def send_request3(): #function to send requests from alias client ip 1 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3) #bind to alias client ip1 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 def send_request4(): #function to send requests from alias client ip 2 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4) #bind to alias client ip2 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 #50 such functions are defined here for 50 clients def func(): pool=multiprocessing.Pool(processes=750) for i in range(5): pool.apply_async(send_request3) pool.apply_async(send_request4) pool.apply_async(send_request5) #append 50 functions here pool.close() pool.join() print"All work Done..!!" return start=float(time.time()) func() end=float(time.time())-start print end 

  • Python запрашивает модуль многопоточности
  • Python не может выделять память с помощью multiprocessing.pool
  • Возможно ли распараллелить вызовы get_attribute selenium webdriver в python?
  • Проблема травления с пафосом python
  • Сохранять пользовательские атрибуты при подгонке подкласса массива numpy
  • Процесс против потока в отношении использования Queue () / deque () и переменной класса для связи и «ядовитой таблетки»
  • Одновременное выполнение нескольких сеансов тензорного потока
  • получение pid дочернего процесса
  • 2 Solutions collect form web for “Многопроцессорность Python Pool.apply_async с общими переменными (значение)”

    Как говорится в сообщениях об ошибках, вы не можете передавать multiprocessing.Value . Однако вы можете использовать multiprocessing.Manager().Value :

     import multiprocessing import urllib2 import random import myurllist #list of all destination urls for all 10 servers import time import socbindtry #script that binds various virtual/aliased client ips to the script def send_request3(response_time, error_count): #function to send requests from alias client ip 1 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3) #bind to alias client ip1 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: with error_count.get_lock(): error_count.value += 1 def send_request4(response_time, error_count): #function to send requests from alias client ip 2 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4) #bind to alias client ip2 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: with error_count.get_lock(): error_count.value += 1 #50 such functions are defined here for 50 clients def func(response_time, error_count): pool=multiprocessing.Pool(processes=2*multiprocessing.cpu_count()) args = (response_time, error_count) for i in range(5): pool.apply_async(send_request3, args=args) pool.apply_async(send_request4, args=args) #append 50 functions here pool.close() pool.join() print"All work Done..!!" return if __name__ == "__main__": m=multiprocessing.Manager() response_time=m.list() #some shared variables error_count=m.Value('i',0) start=float(time.time()) func(response_time, error_count) end=float(time.time())-start print end с import multiprocessing import urllib2 import random import myurllist #list of all destination urls for all 10 servers import time import socbindtry #script that binds various virtual/aliased client ips to the script def send_request3(response_time, error_count): #function to send requests from alias client ip 1 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3) #bind to alias client ip1 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: with error_count.get_lock(): error_count.value += 1 def send_request4(response_time, error_count): #function to send requests from alias client ip 2 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4) #bind to alias client ip2 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: with error_count.get_lock(): error_count.value += 1 #50 such functions are defined here for 50 clients def func(response_time, error_count): pool=multiprocessing.Pool(processes=2*multiprocessing.cpu_count()) args = (response_time, error_count) for i in range(5): pool.apply_async(send_request3, args=args) pool.apply_async(send_request4, args=args) #append 50 functions here pool.close() pool.join() print"All work Done..!!" return if __name__ == "__main__": m=multiprocessing.Manager() response_time=m.list() #some shared variables error_count=m.Value('i',0) start=float(time.time()) func(response_time, error_count) end=float(time.time())-start print end 

    Несколько других примечаний здесь:

    1. Использование Pool с 750 процессами не является хорошей идеей. Если вы не используете сервер с сотнями ядер процессора, это приведет к подавлению вашей машины. Это было бы быстрее и у вас меньше нагрузки на вашу машину, чтобы использовать значительно меньше процессов. Что-то больше похоже на 2 * multiprocessing.cpu_count() .
    2. В качестве наилучшей практики вы должны явно передать все общие аргументы, которые необходимо использовать для дочерних процессов, а не использовать глобальные переменные. Это увеличивает вероятность того, что код будет работать в Windows.
    3. Похоже, что все ваши функции send_request* выполняют почти то же самое. Почему бы просто не сделать одну функцию и не использовать переменную, чтобы решить, какой socbindtry.BindableHTTPHandler использовать? Сделав это, вы избегаете тонны дублирования кода.
    4. То, как вы увеличиваете значение error_count , не является процессом / потокобезопасным и подвержено условиям гонки. Вам необходимо защитить инкремент с помощью блокировки (как в приведенном выше примере).

    Возможно, потому что Python Multiprocess отличается между Windows и Linux (я серьезно, не знаю, как многопроцессорность работает в виртуальных машинах, как это имеет место здесь.)

    Это может сработать;

     import multiprocessing import random import myurllist #list of all destination urls for all 10 servers import time def send_request3(response_time, error_count): #function to send requests from alias client ip 1 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3) #bind to alias client ip1 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 def send_request4(response_time, error_count): #function to send requests from alias client ip 2 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4) #bind to alias client ip2 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 #50 such functions are defined here for 50 clients def func(): m=multiprocessing.Manager() response_time=m.list() #some shared variables error_count=multiprocessing.Value('i',0) pool=multiprocessing.Pool(processes=750) for i in range(5): pool.apply_async(send_request3, [response_time, error_count]) pool.apply_async(send_request4, [response_time, error_count]) # pool.apply_async(send_request5) #append 50 functions here pool.close() pool.join() print"All work Done..!!" return start=float(time.time()) func() end=float(time.time())-start print end с import multiprocessing import random import myurllist #list of all destination urls for all 10 servers import time def send_request3(response_time, error_count): #function to send requests from alias client ip 1 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler3) #bind to alias client ip1 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 def send_request4(response_time, error_count): #function to send requests from alias client ip 2 opener=urllib2.build_opener(socbindtry.BindableHTTPHandler4) #bind to alias client ip2 try: tstart=time.time() for i in range(myurllist.url): x=random.choice(myurllist.url[i]) opener.open(x).read() print "file downloaded:",x response_time.append(time.time()-tstart) except urllib2.URLError, e: error_count.value=error_count.value+1 #50 such functions are defined here for 50 clients def func(): m=multiprocessing.Manager() response_time=m.list() #some shared variables error_count=multiprocessing.Value('i',0) pool=multiprocessing.Pool(processes=750) for i in range(5): pool.apply_async(send_request3, [response_time, error_count]) pool.apply_async(send_request4, [response_time, error_count]) # pool.apply_async(send_request5) #append 50 functions here pool.close() pool.join() print"All work Done..!!" return start=float(time.time()) func() end=float(time.time())-start print end 
    Python - лучший язык программирования в мире.