Создание массива NumPy для общих процессов

Я прочитал немало вопросов о SO о совместном использовании массивов, и это кажется простым для простых массивов, но я застреваю, пытаясь заставить его работать для массива, который у меня есть.

import numpy as np data=np.zeros(250,dtype='float32, (250000,2)float32') 

Я попытался преобразовать это в общий массив, пытаясь каким-то образом заставить mp.Array принять data , я также попытался создать массив с использованием ctypes как таковой:

 import multiprocessing as mp data=mp.Array('c_float, (250000)c_float',250) 

Единственный способ, которым мне удалось заставить работать мой код, – это не передавать данные в функцию, а передавать закодированную строку, которая должна быть несжатой / декодированной, но это будет в конечном итоге в n (число строк), вызываемых, которые кажутся излишними. Моя желаемая реализация основана на разрезании списка двоичных строк на x (количество процессов) и передаче этого фрагмента, data и index в процессы, которые работают, за исключением того, что data модифицируются локально, следовательно, вопрос о том, как сделать его общим , любой пример работы с пользовательским (вложенным) массивом numpy уже будет большой помощью.

PS: Этот вопрос является продолжением многопроцессорной обработки Python

2 Solutions collect form web for “Создание массива NumPy для общих процессов”

Обратите внимание, что вы можете начать с массива сложного dtype:

 In [4]: data = np.zeros(250,dtype='float32, (250000,2)float32') 

и рассматривать его как массив однородного типа:

 In [5]: data2 = data.view('float32') 

а затем перевести его обратно в сложный тип dtype:

 In [7]: data3 = data2.view('float32, (250000,2)float32') 

Изменение dtype – очень быстрая операция; это не влияет на базовые данные, только то, как NumPy интерпретирует его. Таким образом, изменение типа dtype практически безрезультатно.

Итак, то, что вы читали о массивах с простыми (однородными) типами, может быть легко применено к вашему сложному dtype с трюком выше.


Нижеприведенный код заимствует многие идеи из ответа Дж. Ф. Себастьяна .

 import numpy as np import multiprocessing as mp import contextlib import ctypes import struct import base64 def decode(arg): chunk, counter = arg print len(chunk), counter for x in chunk: peak_counter = 0 data_buff = base64.b64decode(x) buff_size = len(data_buff) / 4 unpack_format = ">%dL" % buff_size index = 0 for y in struct.unpack(unpack_format, data_buff): buff1 = struct.pack("I", y) buff2 = struct.unpack("f", buff1)[0] with shared_arr.get_lock(): data = tonumpyarray(shared_arr).view( [('f0', '<f4'), ('f1', '<f4', (250000, 2))]) if (index % 2 == 0): data[counter][1][peak_counter][0] = float(buff2) else: data[counter][1][peak_counter][1] = float(buff2) peak_counter += 1 index += 1 counter += 1 def pool_init(shared_arr_): global shared_arr shared_arr = shared_arr_ # must be inherited, not passed as an argument def tonumpyarray(mp_arr): return np.frombuffer(mp_arr.get_obj()) def numpy_array(shared_arr, peaks): """Fills the NumPy array 'data' with m/z-intensity values acquired from b64 decoding and unpacking the binary string read from the mzXML file, which is stored in the list 'peaks'. The m/z values are assumed to be ordered without validating this assumption. Note: This function uses multi-processing """ processors = mp.cpu_count() with contextlib.closing(mp.Pool(processes=processors, initializer=pool_init, initargs=(shared_arr, ))) as pool: chunk_size = int(len(peaks) / processors) map_parameters = [] for i in range(processors): counter = i * chunk_size # WARNING: I removed -1 from (i + 1)*chunk_size, since the right # index is non-inclusive. chunk = peaks[i*chunk_size : (i + 1)*chunk_size] map_parameters.append((chunk, counter)) pool.map(decode, map_parameters) if __name__ == '__main__': shared_arr = mp.Array(ctypes.c_float, (250000 * 2 * 250) + 250) peaks = ... numpy_array(shared_arr, peaks) 

Если вы можете гарантировать, что различные процессы, выполняющие задания

 if (index % 2 == 0): data[counter][1][peak_counter][0] = float(buff2) else: data[counter][1][peak_counter][1] = float(buff2) 

никогда не конкурируйте, чтобы изменять данные в тех же местах, тогда я считаю, что вы действительно можете отказаться от блокировки

 with shared_arr.get_lock(): 

но я не достаточно понимаю ваш код, чтобы точно знать, поэтому, чтобы быть в безопасности, я включил замок.

 from multiprocessing import Process, Array import numpy as np import time import ctypes def fun(a): a[0] = -a[0] while 1: time.sleep(2) #print bytearray(a.get_obj()) c=np.frombuffer(a.get_obj(),dtype=np.float32) c.shape=3,3 print 'haha',c def main(): a = np.random.rand(3,3).astype(np.float32) a.shape=1*a.size #a=np.array([[1,3,4],[4,5,6]]) #b=bytearray(a) h=Array(ctypes.c_float,a) print "Originally,",h # Create, start, and finish the child process p = Process(target=fun, args=(h,)) p.start() #p.join() a.shape=3,3 # Print out the changed values print 'first',a time.sleep(3) #h[0]=h[0]+1 print 'main',np.frombuffer(h.get_obj(), dtype=np.float32) if __name__=="__main__": main() 
  • Разница между Windows и Windows
  • многопроцессорность python и количество ядер
  • Python 2.7 на Windows, «assert main_name не в sys.modules, main_name» для всех примеров многопроцессорности
  • Совместное использование большого массива с непрерывной записью только для чтения между процессами многопроцессорности
  • Такая же производительность у разных работников в многопроцессорной обработке
  • Сельдерей: как ограничить количество заданий в очереди и прекратить кормить, когда они полны?
  • многопроцессорность и сбор мусора
  • Простая многозадачность
  • Python - лучший язык программирования в мире.