Блокировка в файле dask.multiprocessing.get и добавление метаданных в HDF

Выполняя задачу ETL в чистом Python, я хотел бы собирать показатели ошибок, а также метаданные для каждого из рассмотренных необработанных входных файлов (показатели ошибок вычисляются из кодов ошибок, предоставляемых в разделе данных файлов, в то время как метаданные хранятся в заголовках ). Вот псевдокод для всей процедуры:

import pandas as pd import dask from dask import delayed from dask import dataframe as dd META_DATA = {} # shared resource ERRORS = {} # shared resource def read_file(file_name): global META_DATA, ERRORS # step 1: process headers headers = read_header(file_name) errors = {} data_bfr = [] # step 2: process data section for line in data_section: content_id, data = parse_line(line) if contains_errors(data): errors[content_id] = get_error_code(data) else: data_bfr.append(content_id, data) # ---- Part relevant for question 1 ---- # step 3: acquire lock for shared resource and write metadata with lock.acquire(): write_metadata(file_name, headers) # stores metadata in META_DATA[file_name] write_errors(file_name, errors) # stores error metrics in ERRORS[file_name] return pd.DataFrame(data=data_bfr,...) with set_options(get=dask.multiprocessing.get): df = dd.from_delayed([delayed(read_file)(file_name) \ for file_name in os.listdir(wd)]) # ---- Part relevant for question 2 ---- df.to_hdf('data.hdf', '/data', 'w', complevel=9, \ complib='blosc',..., metadata=(META_DATA, ERRORS)) 

Для каждого входного файла read_file возвращает pd.DataFrame , далее записывая соответствующие метаданные и метрики ошибок для общих ресурсов. Я использую dask многопроцессорности dask для вычисления dask.dataframe из списка задержанных read_file .

  • Вопрос 1 : Каждый из read_file записывает в общие ресурсы META_DATA и META_DATA . Что мне нужно сделать для реализации правильной стратегии блокировки, которая работает с dask.multiprocessing.get ? Достаточно ли было бы написать метаданные и информацию об ошибках в коллекциях изнутри с with locket.lock_file('.lock'): -context? Работает ли multiprocessing.RLock ? Что мне нужно сделать, чтобы инициализировать блокировку для работы с dask ? Более принципиально, как я могу объявить META_DATA и META_DATA качестве общих ресурсов в dask ?
  • Вопрос 2. Если возможно, я хотел бы аннотировать данные HDF с метаданными и метриками ошибок. Из вопроса «Сбор атрибутов от поставщиков пакетов данных dask» я узнал, что в настоящее время dask не поддерживает добавление метаданных в dataframes, но возможно ли, чтобы информация была записана в HDF? Если да, то как обрабатывать доступ к общим ресурсам в этом случае?

One Solution collect form web for “Блокировка в файле dask.multiprocessing.get и добавление метаданных в HDF”

Не зависеть от Globals

Dask лучше всего работает с чистыми функциями .

В частности, ваш случай является ограничением в Python, поскольку он (правильно) не передает глобальные данные между процессами. Вместо этого я рекомендую вам явно возвращать данные из функций:

 def read_file(file_name): ... return df, metadata, errors values = [delayed(read_file)(fn) for fn in filenames] dfs = [v[0] for v in values] metadata = [v[1] for v in values] errors = [v[2] for v in values] df = dd.from_delayed(dfs) import toolz metadata = delayed(toolz.merge)(metadata) errors = delayed(toolz.merge)(errors) 
  • чтение файла с отсутствующими значениями в python pandas
  • матричная операция с использованием numpy pandas
  • Добавление / вычитание столбцов Pandas
  • Как применять функции качения в группе по объекту в пандах
  • Создайте индексированное datetime из информации о дате / времени в 3 столбцах, используя pandas
  • Объедините два блока данных Pandas с тем же индексом
  • Получение индекса столбца из имени столбца в python pandas
  • Сгладить / ravel / collapse 3-мерный xr.DataArray (Xarray) в 2 измерения вдоль оси?
  • проблемы в "pandas datetime convert to num"
  • Как уменьшить размер кадра данных pandas
  • Pandas - Заменить выбросы с помощью группового
  • Python - лучший язык программирования в мире.