Функция медленного len на распределенной информационной рамке dask

Я тестировал, как использовать dask (кластер с 20 ядрами), и меня удивляет скорость, которую я получаю при вызове функции len vs slicing через loc.

import dask.dataframe as dd from dask.distributed import Client client = Client('192.168.1.220:8786') log = pd.read_csv('800000test', sep='\t') logd = dd.from_pandas(log,npartitions=20) #This is the code than runs slowly #(2.9 seconds whilst I would expect no more than a few hundred millisencods) print(len(logd)) #Instead this code is actually running almost 20 times faster than pandas logd.loc[:'Host'].count().compute() 

Любые идеи, почему это может произойти? Для меня не важно, что len работает быстро, но я чувствую, что, не понимая этого поведения, я кое-что не понимаю об библиотеке.

введите описание изображения здесь

Все зеленые прямоугольники соответствуют «from_pandas», в то время как в этой статье Мэтью Роклин http://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes граф вызовов выглядит лучше (len_chunk называется, что значительно быстрее, и звонки, кажется, не заперты и ждут, пока один рабочий закончит свою задачу, прежде чем запускать другой)

введите описание изображения здесь

One Solution collect form web for “Функция медленного len на распределенной информационной рамке dask”

Хороший вопрос, это касается нескольких моментов, когда данные перемещаются к кластеру и возвращаются к клиенту (ваш сеанс python). Давайте посмотрим на несколько этапов вашей работы

Загрузка данных с помощью Pandas

Это кадр данных Pandas на вашем сеансе python, поэтому он, очевидно, все еще находится в вашем локальном процессе.

 log = pd.read_csv('800000test', sep='\t') # on client 

Преобразовать в ленивый Dask.dataframe

Это разбивает ваш кадр данных Pandas на двадцать кадров данных Pandas, однако они все еще находятся на клиенте. Dask DataFrames не ожидают отправки данных в кластер.

 logd = dd.from_pandas(log,npartitions=20) # still on client 

Вычислить len

Вызов len самом деле вызывает вычисление здесь (обычно вы должны использовать df.some_aggregation().compute() . Итак, теперь Dask начинает работать. Сначала он перемещает ваши данные в кластер (медленно), затем вызывает len на всех 20 разделах ( быстро), он объединяет эти (быстрые) и затем переносит результат на ваш клиент, чтобы он мог печатать.

 print(len(logd)) # costly roundtrip client -> cluster -> client 

Анализ

Поэтому проблема заключается в том, что наш dask.dataframe все еще имел все свои данные в локальном сеансе python.

Было бы гораздо быстрее использовать, скажем, локальный планировщик с потоками, а не распределенный планировщик. Это должно вычисляться в миллисекундах

 with dask.set_options(get=dask.threaded.get): # no cluster, just local threads print(len(logd)) # stays on client 

Но, по-видимому, вы хотите знать, как масштабировать до более крупных наборов данных, поэтому давайте сделаем это правильно.

Загрузите данные о работниках

Вместо того, чтобы загружать Pandas на ваш клиентский / локальный сеанс, пусть рабочие Dask загружают биты csv-файла. Таким образом, общение с клиентом и клиентом не требуется.

 # log = pd.read_csv('800000test', sep='\t') # on client log = dd.read_csv('800000test', sep='\t') # on cluster workers 

Однако, в отличие от pd.read_csv , dd.read_csv ленив, так что это должно вернуться почти сразу. Мы можем заставить Dask фактически выполнять вычисления с помощью метода persist

 log = client.persist(log) # triggers computation asynchronously 

Теперь кластер начинает действовать и загружает ваши данные непосредственно в рабочих. Это относительно быстро. Обратите внимание, что этот метод немедленно возвращается, когда работа выполняется в фоновом режиме. Если вы хотите подождать, пока он закончится, вызовите wait .

 from dask.distributed import wait wait(log) # blocks until read is done 

Если вы тестируете небольшой набор данных и хотите получить больше разделов, попробуйте изменить размер блока.

 log = dd.read_csv(..., blocksize=1000000) # 1 MB blocks 

Несмотря на это, операции с log теперь должны быть быстрыми

 len(log) # fast 

редактировать

В ответ на вопрос об этом blogpost здесь приведены предположения о том, где мы живем.

Обычно, когда вы dd.read_csv имя файла на dd.read_csv предполагается, что этот файл отображается со всех рабочих. Это верно, если вы используете сетевую файловую систему или глобальный магазин, такой как S3 или HDFS. Если вы используете сетевую файловую систему, вам нужно либо использовать абсолютные пути (например, /path/to/myfile.*.csv ), либо убедиться, что ваши рабочие и клиенты имеют один и тот же рабочий каталог.

Если это не так, и ваши данные находятся только на вашей клиентской машине, вам придется загружать и рассылать их.

Простой, но неоптимальный

Простым способом является только то, что вы делали изначально, но сохраняете свой dask.dataframe

 log = pd.read_csv('800000test', sep='\t') # on client logd = dd.from_pandas(log,npartitions=20) # still on client logd = client.persist(logd) # moves to workers 

Это прекрасно, но приводит к чуть-чуть-менее идеальному общению.

Комплексный, но оптимальный

Вместо этого вы можете явно разбросать свои данные в кластер

 [future] = client.scatter([log]) 

Однако это приводит к более сложному API, поэтому я просто укажу вам документы

http://distributed.readthedocs.io/en/latest/manage-comput.html http://distributed.readthedocs.io/en/latest/memory.html http://dask.pydata.org/en/latest/ отсроченным collections.html

Python - лучший язык программирования в мире.