Свертка двух трехмерных массивов с дополнением с одной стороны слишком медленная

В моем текущем проекте мне нужно « сокрушить » два трехмерных массива несколько необычным образом:

Предположим, что мы имеем два трехмерных массива A и B с размерами dimA и dimB (одинаковые для каждой оси). Теперь мы хотим создать третий массив C с размерами dimA + dimB для каждой оси.

Записи C вычисляются как:

c_{x1+x2,y1+y2,z1+z2} += a_{x1,y1,z1} * b_{x2,y2,z2} 

Моя текущая версия проста:

 dimA = A.shape[0] dimB = B.shape[0] dimC = dimA+dimB C = np.zeros((dimC,dimC,dimC)) for x1 in range(dimA): for x2 in range(dimB): for y1 in range(dimA): for y2 in range(dimB): for z1 in range(dimA): for z2 in range(dimB): x = x1+x2 y = y1+y2 z = z1+z2 C[x,y,z] += A[x1,y1,z1] * B[x2,y2,z2] 

К сожалению, эта версия очень медленная и непригодна для использования.

Моя вторая версия:

 C = scipy.signal.fftconvolve(A,B,mode="full") 

Но это вычисляет только элементы max(dimA,dimB)

У кого-нибудь есть лучшая идея?

  • Когда del полезно в python?
  • Некоторые материалы NLP, связанные с грамматикой, помечением, вытеснением и смысловым смысловым смыслом в Python
  • python ImageTk.PhotoImage - segfault
  • В matplotlib, почему это быстрее, чтобы построить более тонкие линии?
  • Как записать данные в Redshift, что является результатом данных, созданных в Python?
  • Ошибка импорта PySide / Qt
  • Матрица рассеянного графика с более низкой плавностью
  • Построение данных цилиндрической карты над 3D-сферой в Python
  • 3 Solutions collect form web for “Свертка двух трехмерных массивов с дополнением с одной стороны слишком медленная”

    Вы пытались использовать Numba ? Это пакет, который позволяет вам обернуть код Python, который обычно медленный с JIT-компилятором. Я быстро ударил по вашей проблеме с помощью Numba и значительно ускорился. Используя магическую магическую магическую магическую функцию IPython, функция custom_convolution заняла ~ 18 с, а оптимизированная функция Numba заняла 10,4 мс. Это скорость более 1700 .

    Вот как работает Numba.

     import numpy as np from numba import jit, double s = 15 array_a = np.random.rand(s ** 3).reshape(s, s, s) array_b = np.random.rand(s ** 3).reshape(s, s, s) # Original code def custom_convolution(A, B): dimA = A.shape[0] dimB = B.shape[0] dimC = dimA + dimB C = np.zeros((dimC, dimC, dimC)) for x1 in range(dimA): for x2 in range(dimB): for y1 in range(dimA): for y2 in range(dimB): for z1 in range(dimA): for z2 in range(dimB): x = x1 + x2 y = y1 + y2 z = z1 + z2 C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2] return C # Numba'ing the function with the JIT compiler fast_convolution = jit(double[:, :, :](double[:, :, :], double[:, :, :]))(custom_convolution) 

    Если вы вычислите остаточный результат между обеими функциями, вы получите нули. Это означает, что реализация JIT работает без проблем.

     slow_result = custom_convolution(array_a, array_b) fast_result = fast_convolution(array_a, array_b) print np.max(np.abs(slow_result - fast_result)) 

    Результат, который я получаю для этого, равен 0.0 .

    Вы можете установить Numba в текущую настройку Python или быстро выполнить ее с помощью пакета AnacondaCE из continuum.io.

    И последнее, но не менее важное: функция Numba быстрее, чем функция scipy.signal.fftconvolve в несколько раз.

    Примечание. Я использую Anaconda, а не AnacondaCE. Есть несколько отличий между двумя пакетами для производительности Numba, но я не думаю, что это будет сильно отличаться.

    Метод Numba выше – это аккуратный трюк, но это будет только преимуществом для относительно небольшого алгоритма N. Алгоритм O (N ^ 6) убьет вас каждый раз, независимо от того, насколько быстро он реализован. В моих тестах метод fftconvolve пересекается быстрее, чем N = 20, а N = 32 – в 10 раз быстрее. Оставив определение custom_convolution выше:

     from timeit import Timer import numpy as np import matplotlib.pyplot as plt from scipy.signal import fftconvolve from numba import jit, double # Numba'ing the function with the JIT compiler numba_convolution = jit(double[:, :, :](double[:, :, :], double[:, :, :]))(custom_convolution) def fft_convolution(A, B): return fftconvolve(A, B, mode='full') if __name__ == '__main__': reps = 3 nt, ft = [], [] x = range(2, 34) for N in x: print N A = np.random.rand(N, N, N) B = np.random.rand(N, N, N) C1 = numba_convolution(A, B) C2 = fft_convolution(A, B) assert np.allclose(C1[:-1, :-1, :-1], C2) t = Timer(lambda: numba_convolution(A, B)) nt.append(t.timeit(number=reps)) t = Timer(lambda: fft_convolution(A, B)) ft.append(t.timeit(number=reps)) plt.plot(x, ft, x, nt) plt.show() 

    Он также очень зависит от N, поскольку FFT будет намного быстрее для степеней 2. Время для версии FFT по существу постоянное для N = 17 до N = 32, и оно еще быстрее при N = 33, где он снова начинает расходиться.

    Вы можете попробовать обернуть реализацию FFT в Numba, но вы не можете сделать это напрямую со скудной версией.

    (Извините, что создал новый ответ, но у меня нет точек для ответа напрямую).

    Общее правило состоит в том, чтобы использовать правильный алгоритм для задания, который, если ядро ​​свертки не является коротким по сравнению с данными, является сверткой на основе FFT (короткий примерно означает меньше log2 (n), где n – длина данных) ,

    В вашем случае, поскольку вы свертываете два набора данных одинакового размера, вы, вероятно, захотите рассмотреть свертку на основе FFT.

    Очевидно, что scipy.signal.fftconvolve в этом отношении недостаточно scipy.signal.fftconvolve . Используя более быстрый алгоритм БПФ, вы можете сделать гораздо лучше, скопировав свою собственную процедуру свертки (это не помогает тому, что fftconvolve заставляет размер преобразования равным двум, в противном случае это может быть обезглавлено обезьяной).

    Следующий код использует pyfftw , мои обертки вокруг FFTW и создает пользовательский класс свертки, CustomFFTConvolution :

     class CustomFFTConvolution(object): def __init__(self, A, B, threads=1): shape = (np.array(A.shape) + np.array(B.shape))-1 if np.iscomplexobj(A) and np.iscomplexobj(B): self.fft_A_obj = pyfftw.builders.fftn( A, s=shape, threads=threads) self.fft_B_obj = pyfftw.builders.fftn( B, s=shape, threads=threads) self.ifft_obj = pyfftw.builders.ifftn( self.fft_A_obj.get_output_array(), s=shape, threads=threads) else: self.fft_A_obj = pyfftw.builders.rfftn( A, s=shape, threads=threads) self.fft_B_obj = pyfftw.builders.rfftn( B, s=shape, threads=threads) self.ifft_obj = pyfftw.builders.irfftn( self.fft_A_obj.get_output_array(), s=shape, threads=threads) def __call__(self, A, B): fft_padded_A = self.fft_A_obj(A) fft_padded_B = self.fft_B_obj(B) return self.ifft_obj(fft_padded_A * fft_padded_B) 

    Это используется как:

     custom_fft_conv = CustomFFTConvolution(A, B) C = custom_fft_conv(A, B) # This can contain different values to during construction 

    с дополнительным аргументом threads при построении класса. Цель создания класса – извлечь выгоду из способности FFTW заранее планировать преобразование.

    Полный демо-код ниже просто расширяет ответ Kelsey за время и так далее.

    Ускорение значимо и для решения numba, и для решения vanilla fftconvolve. При n = 33 это примерно на 40-45 раз быстрее, чем у обоих.

     from timeit import Timer import numpy as np import matplotlib.pyplot as plt from scipy.signal import fftconvolve from numba import jit, double import pyfftw # Original code def custom_convolution(A, B): dimA = A.shape[0] dimB = B.shape[0] dimC = dimA + dimB C = np.zeros((dimC, dimC, dimC)) for x1 in range(dimA): for x2 in range(dimB): for y1 in range(dimA): for y2 in range(dimB): for z1 in range(dimA): for z2 in range(dimB): x = x1 + x2 y = y1 + y2 z = z1 + z2 C[x, y, z] += A[x1, y1, z1] * B[x2, y2, z2] return C # Numba'ing the function with the JIT compiler numba_convolution = jit(double[:, :, :](double[:, :, :], double[:, :, :]))(custom_convolution) def fft_convolution(A, B): return fftconvolve(A, B, mode='full') class CustomFFTConvolution(object): def __init__(self, A, B, threads=1): shape = (np.array(A.shape) + np.array(B.shape))-1 if np.iscomplexobj(A) and np.iscomplexobj(B): self.fft_A_obj = pyfftw.builders.fftn( A, s=shape, threads=threads) self.fft_B_obj = pyfftw.builders.fftn( B, s=shape, threads=threads) self.ifft_obj = pyfftw.builders.ifftn( self.fft_A_obj.get_output_array(), s=shape, threads=threads) else: self.fft_A_obj = pyfftw.builders.rfftn( A, s=shape, threads=threads) self.fft_B_obj = pyfftw.builders.rfftn( B, s=shape, threads=threads) self.ifft_obj = pyfftw.builders.irfftn( self.fft_A_obj.get_output_array(), s=shape, threads=threads) def __call__(self, A, B): fft_padded_A = self.fft_A_obj(A) fft_padded_B = self.fft_B_obj(B) return self.ifft_obj(fft_padded_A * fft_padded_B) def run_test(): reps = 10 nt, ft, cft, cft2 = [], [], [], [] x = range(2, 34) for N in x: print N A = np.random.rand(N, N, N) B = np.random.rand(N, N, N) custom_fft_conv = CustomFFTConvolution(A, B) custom_fft_conv_nthreads = CustomFFTConvolution(A, B, threads=2) C1 = numba_convolution(A, B) C2 = fft_convolution(A, B) C3 = custom_fft_conv(A, B) C4 = custom_fft_conv_nthreads(A, B) assert np.allclose(C1[:-1, :-1, :-1], C2) assert np.allclose(C1[:-1, :-1, :-1], C3) assert np.allclose(C1[:-1, :-1, :-1], C4) t = Timer(lambda: numba_convolution(A, B)) nt.append(t.timeit(number=reps)) t = Timer(lambda: fft_convolution(A, B)) ft.append(t.timeit(number=reps)) t = Timer(lambda: custom_fft_conv(A, B)) cft.append(t.timeit(number=reps)) t = Timer(lambda: custom_fft_conv_nthreads(A, B)) cft2.append(t.timeit(number=reps)) plt.plot(x, ft, label='scipy.signal.fftconvolve') plt.plot(x, nt, label='custom numba convolve') plt.plot(x, cft, label='custom pyfftw convolve') plt.plot(x, cft2, label='custom pyfftw convolve with threading') plt.legend() plt.show() if __name__ == '__main__': run_test() 

    EDIT: более свежий scipy делает лучшую работу не всегда дополнением до степеней 2 длины, поэтому ближе к выходу к случаю pyFFTW.

    Python - лучший язык программирования в мире.