Блокировка в файле dask.multiprocessing.get и добавление метаданных в HDF

Выполняя задачу ETL в чистом Python, я хотел бы собирать показатели ошибок, а также метаданные для каждого из рассмотренных необработанных входных файлов (показатели ошибок вычисляются из кодов ошибок, предоставляемых в разделе данных файлов, в то время как метаданные хранятся в заголовках ). Вот псевдокод для всей процедуры:

import pandas as pd import dask from dask import delayed from dask import dataframe as dd META_DATA = {} # shared resource ERRORS = {} # shared resource def read_file(file_name): global META_DATA, ERRORS # step 1: process headers headers = read_header(file_name) errors = {} data_bfr = [] # step 2: process data section for line in data_section: content_id, data = parse_line(line) if contains_errors(data): errors[content_id] = get_error_code(data) else: data_bfr.append(content_id, data) # ---- Part relevant for question 1 ---- # step 3: acquire lock for shared resource and write metadata with lock.acquire(): write_metadata(file_name, headers) # stores metadata in META_DATA[file_name] write_errors(file_name, errors) # stores error metrics in ERRORS[file_name] return pd.DataFrame(data=data_bfr,...) with set_options(get=dask.multiprocessing.get): df = dd.from_delayed([delayed(read_file)(file_name) \ for file_name in os.listdir(wd)]) # ---- Part relevant for question 2 ---- df.to_hdf('data.hdf', '/data', 'w', complevel=9, \ complib='blosc',..., metadata=(META_DATA, ERRORS)) 

Для каждого входного файла read_file возвращает pd.DataFrame , далее записывая соответствующие метаданные и метрики ошибок для общих ресурсов. Я использую dask многопроцессорности dask для вычисления dask.dataframe из списка задержанных read_file .

  • Вопрос 1 : Каждый из read_file записывает в общие ресурсы META_DATA и META_DATA . Что мне нужно сделать для реализации правильной стратегии блокировки, которая работает с dask.multiprocessing.get ? Достаточно ли было бы написать метаданные и информацию об ошибках в коллекциях изнутри с with locket.lock_file('.lock'): -context? Работает ли multiprocessing.RLock ? Что мне нужно сделать, чтобы инициализировать блокировку для работы с dask ? Более принципиально, как я могу объявить META_DATA и META_DATA качестве общих ресурсов в dask ?
  • Вопрос 2. Если возможно, я хотел бы аннотировать данные HDF с метаданными и метриками ошибок. Из вопроса «Сбор атрибутов от поставщиков пакетов данных dask» я узнал, что в настоящее время dask не поддерживает добавление метаданных в dataframes, но возможно ли, чтобы информация была записана в HDF? Если да, то как обрабатывать доступ к общим ресурсам в этом случае?

One Solution collect form web for “Блокировка в файле dask.multiprocessing.get и добавление метаданных в HDF”

Не зависеть от Globals

Dask лучше всего работает с чистыми функциями .

В частности, ваш случай является ограничением в Python, поскольку он (правильно) не передает глобальные данные между процессами. Вместо этого я рекомендую вам явно возвращать данные из функций:

 def read_file(file_name): ... return df, metadata, errors values = [delayed(read_file)(fn) for fn in filenames] dfs = [v[0] for v in values] metadata = [v[1] for v in values] errors = [v[2] for v in values] df = dd.from_delayed(dfs) import toolz metadata = delayed(toolz.merge)(metadata) errors = delayed(toolz.merge)(errors) 
  • Как сортировать фрейм данных с помощью массива, который не находится в фрейме данных
  • Выбор / исключение наборов столбцов в Pandas
  • Определите, имеет ли в DataFrame MultiIndex
  • Понимание диагонали в матрице рассеяния Панды
  • Как определить последовательные даты
  • Получить метки строк и столбцов для выбранных значений в кадре данных Pandas
  • Изменение формы данных pandas от категориальных до подсчетов
  • Как добавить общий ярлык x и ярлык y к сюжету, созданному с сюжетом панд?
  • Python - лучший язык программирования в мире.