Эффективная реализация Python для сравнения массивов numpy

Задний план

У меня есть два массива numpy, которые я бы хотел использовать для выполнения некоторых операций сравнения наиболее эффективным / быстрым способом. Оба содержат только unsigned ints.

pairs представляют собой массив nx 2 x 3 , который содержит длинный список парных трехмерных координат (для некоторой номенклатуры массив pairs содержит набор пар …) – т.е.

 # full pairs array In [145]: pairs Out[145]: array([[[1, 2, 4], [3, 4, 4]], ..... [[1, 2, 5], [5, 6, 5]]]) # each entry contains a pair of 3D coordinates In [149]: pairs[0] Out[149]: array([[1, 2, 4], [3, 4, 4]]) 

positions представляет собой массив nx 3 который содержит набор трехмерных координат

 In [162]: positions Out[162]: array([[ 1, 2, 4], [ 3, 4, 5], [ 5, 6, 3], [ 3, 5, 6], [ 6, 7, 5], [12, 2, 5]]) 

Цель Я хочу создать массив, который является подмножеством массива pairs , но содержит ТОЛЬКО записи, в которых не более одной пары находится в массиве положений, т. Е. Не должно быть пар, где пары BOTH лежат в массиве позиций. Для некоторой информации о домене каждая пара будет иметь по крайней мере одну из парных позиций внутри списка позиций.

Подходы до сих пор. Мой первоначальный наивный подход состоял в том, чтобы перебрать каждую пару в массиве pairs и вычесть каждую из двух парных позиций из вектора positions , определяя, было ли в случае BOTH совпадение, обозначенное наличием 0 в обоих векторы, которые исходят из операций вычитания:

  if (~(positions-pair[0]).any(axis=1)).any() and (~(positions-pair[1]).any(axis=1)).any(): # both members of the pair were in the positions array - # these weren't the droids we were looking for pass else: # append this set of pairs to a new matrix 

Это прекрасно работает и использует некоторую векторность, но, вероятно, есть лучший способ сделать это?

Для некоторых других компонентов, чувствительных к производительности, я переписал вещи с Cython, которые привели к значительному ускорению, хотя в этом случае (по крайней мере, на основе наивной вложенной реализации для цикла) это было немного медленнее, чем подход изложенных выше.

Если у людей есть предложения, я рад профилировать и отчитываться (у меня есть вся инфраструктура профилирования).

Подход №1

Как упоминалось в этом вопросе, оба массива содержат только unsigned ints , которые могут быть использованы для объединения XYZ в эквивалентную версию линейных индексов, которая была бы уникальной для каждого уникального триплекса XYZ . Реализация будет выглядеть примерно так:

 maxlen = np.max(pairs,axis=(0,1)) dims = np.append(maxlen[::-1][:-1].cumprod()[::-1],1) pairs1D = np.dot(pairs.reshape(-1,3),dims) positions1D = np.dot(positions,dims) mask_idx = ~(np.in1d(pairs1D,positions1D).reshape(-1,2).all(1)) out = pairs[mask_idx] 

Поскольку вы имеете дело с 3D-координатами, вы также можете использовать cdist для проверки одинаковых триплетов XYZ между входными массивами. Ниже перечислены две реализации с учетом этой идеи.

Подход №2

 from scipy.spatial.distance import cdist p0 = cdist(pairs[:,0,:],positions) p1 = cdist(pairs[:,1,:],positions) out = pairs[((p0==0) | (p1==0)).sum(1)!=2] 

Подход №3

 mask_idx = ~((cdist(pairs.reshape(-1,3),positions)==0).any(1).reshape(-1,2).all(1)) out = pairs[mask_idx] 

Тесты времени выполнения –

 In [80]: n = 5000 ...: pairs = np.random.randint(0,100,(n,2,3)) ...: positions= np.random.randint(0,100,(n,3)) ...: In [81]: def cdist_split(pairs,positions): ...: p0 = cdist(pairs[:,0,:],positions) ...: p1 = cdist(pairs[:,1,:],positions) ...: return pairs[((p0==0) | (p1==0)).sum(1)!=2] ...: ...: def cdist_merged(pairs,positions): ...: mask_idx = ~((cdist(pairs.reshape(-1,3),positions)==0).any(1).reshape(-1,2).all(1)) ...: return pairs[mask_idx] ...: ...: def XYZ_merged(pairs,positions): ...: maxlen = np.max(pairs,axis=(0,1)) ...: dims = np.append(maxlen[::-1][:-1].cumprod()[::-1],1) ...: pairs1D = np.dot(pairs.reshape(-1,3),dims) ...: positions1D = np.dot(positions,dims) ...: mask_idx1 = ~(np.in1d(pairs1D,positions1D).reshape(-1,2).all(1)) ...: return pairs[mask_idx1] ...: In [82]: %timeit cdist_split(pairs,positions) 1 loops, best of 3: 662 ms per loop In [83]: %timeit cdist_merged(pairs,positions) 1 loops, best of 3: 615 ms per loop In [84]: %timeit XYZ_merged(pairs,positions) 100 loops, best of 3: 4.02 ms per loop 

Проверить результаты –

 In [85]: np.allclose(cdist_split(pairs,positions),cdist_merged(pairs,positions)) Out[85]: True In [86]: np.allclose(cdist_split(pairs,positions),XYZ_merged(pairs,positions)) Out[86]: True 

Разрабатывая мой комментарий:

Разверните pairs чтобы быть более интересными. Не стесняйтесь тестировать с более крупными, более реалистичными массивами:

 In [260]: pairs = np.array([[[1,2,4],[3,4,4]],[[1,2,5],[5,6,5]],[[3,4,5],[3,5,6]],[[6,7,5],[1,2,3]]]) In [261]: positions = np.array([[ 1, 2, 4], [ 3, 4, 5], [ 5, 6, 3], [ 3, 5, 6], [ 6, 7, 5], [12, 2, 5]]) 

Разверните оба массива в широковещательные формы:

 In [262]: I = pairs[None,...]==positions[:,None,None,:] In [263]: I.shape Out[263]: (6, 4, 2, 3) 

Большой логический массив, показывающий элементы по элементам для всех измерений. Не могли заменить другие сравнения ( difference ==0 , np.isclose для поплавков и т. Д.).

 In [264]: J = I.all(axis=-1).any(axis=0).sum(axis=-1) In [265]: J Out[265]: array([1, 0, 2, 1]) 

Консолидируйте результаты по различным параметрам. Сопоставьте все числа по координатам, сопоставьте любые позиции, подсчитайте совпадения по парам.

 In [266]: pairs[J==1,...] Out[266]: array([[[1, 2, 4], [3, 4, 4]], [[6, 7, 5], [1, 2, 3]]]) 

J==1 представляют элементы, в которых только одно значение пары совпадает. (см. примечание)

Комбинация any , and , и sum работает для тестового примера, но может потребоваться корректировка с большим тестовым примером (-ами). Но идея в целом применима.


Для размеров массивов, которые тестируют https://stackoverflow.com/a/31901675/901925 , мое решение довольно медленно. В частности, он выполняет тест == который приводит к I с формой (5000, 5000, 2, 3) .

Сжатие последнего измерения помогает

 dims = np.array([10000,100,1]) # simpler version of dims from XYZmerged pairs1D = np.dot(pairs.reshape(-1,3),dims) positions1D = np.dot(positions,dims) I1d = pairs1D[:,None]==positions1D[None,:] J1d = I1d.any(axis=-1).reshape(pairs.shape[:2]).sum(axis=-1) 

Я изменил выражение J1d в соответствии с моим – чтобы подсчитать количество совпадений на пару.

in1d1 который использует Divakar еще быстрее:

 mask = np.in1d(pairs1D, positions1D).reshape(-1,2) Jmask = mask.sum(axis=-1) 

Я просто понял, что ОП запрашивает at most one of the pairs is in the positions array . Где я тестирую exactly one match per pair . Поэтому мои тесты должны быть изменены на pairs[J<2,...] .

(в моей конкретной случайной выборке для n = 5000, которая, оказывается, есть все. Нет никаких pairs которых оба находятся в positions . 54 из 5000 имеют J==1 , остальные 0, не соответствуют) ,