Самая маленькая в мире структура передачи данных
Разбираемся с самой маленькой в мире структурой передачи данных
Простое и быстрое основание для конвейера данных с сложным функционалом.
Сбор данных – это, пожалуй, самая трудоемкая задача для создателей данных. Сбор данных включает в себя очистку, преобразование и общую манипуляцию сырыми данными, чтобы сделать их полезными. Как и многие другие действия, процесс сбора данных часто нуждается в усовершенствовании со временем. Поэтому важно отслеживать, как набор данных собирается, чтобы ваша команда могла управлять и воспроизводить процесс с течением времени. Сбор данных, хотя не всегда веселый, может быть самой важной задачей в любой современной компании.
Существуют компании, специализирующиеся на конвейерах данных, и они могут быть сложными и очень сложными. Но для этого исследования рассмотрим задачу превращения текстового файла в набор слов или “токенов”, отбрасывая тексты, которые не являются нам полезными. Давайте начнем с простого и продвигаемся вперед.
Итак, на первом этапе определим ряд шагов для выполнения функций сбора данных по словам в тексте. Мы будем использовать функцию text.translate() в Python для выполнения некоторой части работы. Рассмотрим эти 4 функции:
import stringdef step1(word): trans = str.maketrans("", "", string.punctuation) return word.replace("\n", " ").translate(trans)def step2(word): return word.lower()def step3(word): trans = str.maketrans("", "", "0123456789") return word.replace("\n", " ").translate(trans)def step4(word): return (all([char in string.ascii_letters for char in word]) and len(word) > 0)
step1 – это функция, которая удаляет всю пунктуацию из слова и удаляет переносы строк. step2 преобразует слово в нижний регистр. step3 снова использует text.translate() для удаления цифр. И step4 будет использоваться в качестве фильтра для отфильтровывания слов, содержащих не-ASCII буквы. Можно представить дополнительные шаги, такие как стемминг.
- Видение через звук Дарение силы людям с нарушенным зрением с использованием GPT-4V(ision) и технологии текст в речь’.
- Должны ли машины использовать GenAI для общения друг с другом на английском?
- Изучение Google Cloud Platform Обзор услуг и возможностей
Так как эти функции просты, если мы применим step1 к слову, получим:
>>> step1("Testing---123;")'Testing123'
Действительно, это удалило пунктуацию из текста. Мы можем применить все три функции, заключив их методом “матрешки” вокруг слова:
>>> step3(step2(step1("Testing---123;")))'testing'
Здесь мы видим, что функции step1, step2 и step3 были применены, оставив только буквы “testing”. Обратите внимание, что мы определили наши функции для работы в определенном порядке. То есть step1 должно быть выполнено перед step2 и так далее.
Этот процесс на основе функций прост в создании и использовании. Конечно, мы могли бы выполнить все функции сразу. Но по мере того, как “конвейер” функций становится длиннее и сложнее, разбиение процесса на отдельные шаги сделает его более управляемым. Фактически, каждый шаг может стать настолько сложным, что над ним будут работать разные команды.
Хорошо, пока что все идет хорошо. Но конечно же, мы не хотим применять конвейер функций вручную к каждому слову. Вместо этого мы хотим применить его к каждому слову в списке. Для этого создадим очень простую функцию apply():
def apply(step, values): return [step(value) for value in values]
Теперь мы можем использовать те же функции для целых списков слов:
>>> apply(step3, apply(step2, apply(step1, ["Testing---123;", "456---", "Hello!"])))['testing', '', 'hello']
Ах, да, нам нужно удалить пустые слова. step4 разработан специально для этого, но его использование немного сложнее. Он будет выглядеть так:
>>> list(filter(step4, apply(step3, apply(step2, apply(step1, ["Testing---123;", "456---", "Hello!"])))))['testing', 'hello']
Итак, step4 – это функция-фильтр, возвращающая True для его сохранения и False для его удаления, она применяется следующим образом: filter(step4, data).
Есть несколько проблем с этим простым подходом:
- Шаги применяются внутрь. То есть первый шаг, step1, является самой вложенной функцией, а step3 находится снаружи. Не очень интуитивно.
- Это очень громоздко, потому что нам приходится повторять функцию apply() для каждой функции шага.
- Фильтры (например, step4) не могут использоваться так же, как другие функции.
Учитывая эти проблемы, можно ли абстрагировать основную функциональность в обобщенный конвейер? Я представляю себе двухэтапный подход:
# Сначала создаем функцию конвейера:p = my_pipeline(step1, step2, step3)# Затем применяем ее к набору данных:p(["Testing---123;", "456---", "Hello!"])
Как мы можем определить my_pipeline? Оказывается, это довольно просто:
def my_pipeline(*steps): def wrapper(inputs): for step in steps: inputs = apply(step, inputs) return inputs return wrapper
То есть my_pipeline – это функция, которая принимает набор функций шагов и возвращает функцию, которая принимает список слов, применяет каждый шаг в серии и возвращает обработанный список слов.
Давайте попробуем:
>>> p = my_pipeline(step1, step2, step3)>>> p(["Testing---123;", "456---", "Hello!"])['testing', '', 'hello']
Работает – мы получили то же самое, что и раньше! Что насчет фильтрующей функции step4? Так давайте пока оставим это и попробуем эту систему на “реальных” данных. Хорошо, это будут настоящие поддельные данные. Для этих экспериментов мы создадим 10 000 документов, каждый из которых будет состоять из 10 абзацев. Мы будем использовать DocumentGenerator() из пакета Python essential_generators.
from essential_generators import DocumentGeneratorimport osgen = DocumentGenerator()def generate_documents( count=10_000, paragraphs=10, output_folder="documents", overwrite=False): os.makedirs(output_folder, exist_ok=True) for n in range(count): filename = os.path.join( output_folder, "doc_%05d.txt" % (n + 1) ) if overwrite or not os.path.exists(filename): with open(filename, "w") as fp: for p in range(paragraphs): fp.write(gen.paragraph() + "\n\n")generate_documents()
Это займет около 30 секунд, чтобы сгенерировать все данные. Чтобы продолжить с нашим простым кодом, нам нужно ввести еще один шаг:
def step0(filename): return open(filename).read().split(" ")
Этот шаг будет принимать имя файла, открывать файл и разделять текст по пробелам. И нам нужно внести небольшую корректировку в нашу функцию apply(), чтобы обрабатывать списки слов, а не слова:
def apply(step, outputs): return (step(input) if not isinstance(input, list) else [step(i) for i in input] for input in outputs)
Я также внес небольшую корректировку в apply: теперь она возвращает выражение генератора вместо генераторного списка, используя окружающие скобки вместо квадратных скобок. Это отложит обработку до момента, когда она понадобится (иногда это называется “ленивое вычисление”).
Теперь мы можем построить почти полную систему конвейеров:
p = my_pipeline(step0, step1, step2, step3)list(p(["documents/doc_00001.txt"]))
Обратите внимание, что он принимает список имен файлов в качестве входных данных. Красиво и просто. Но есть несколько вещей, которые я бы все же хотел увидеть:
- возможность простым способом обрабатывать фильтры;
- возможность запускать конвейер параллельно для быстрой обработки наборов данных;
- возможность визуализации конвейера.
Для этих трех дополнений я рекомендую вам обратиться к проекту picopipe, который я разработал на основе вышеизложенных идей. Вы можете установить его, используя pip:
pip install picopipe
а затем выполнить его с такими же функциями шагов, которые были указаны выше:
from picopipe import pipeline, pfilterp = pipeline(step0, step1, step2, step3, pfilter(step4))list(p(["documents/doc_00001.txt"])[0])
Здесь pfilter означает “конвейер-фильтр”, и вы просто оборачиваете его вокруг функции step4. Я довольно доволен дизайном. Но давайте посмотрим, насколько быстро это будет работать.
Сначала давайте получим имена всех документов. Простой способ сделать это – использовать glob:
import globdataset = glob.glob("documents/doc_*.txt")
И теперь мы можем обработать все документы:
results = list(p(dataset))
На моем ноутбуке это занимает около 21 секунды, чтобы обработать все 10 000 документов. Быстро и просто! Можем ли мы сделать это еще быстрее?
Да! Теперь в пайпе также есть параметр n_jobs, который указывает, сколько задач можно выполнять параллельно. Вот небольшой кусок кода, который обрабатывает набор данных несколько раз, используя от 0 до 9 потоков. Насколько быстрее вы думаете он будет выполняться с использованием 9 потоков одновременно?
import timex = []y = []for i in range(10): start = time.time() results = list(p(dataset, n_jobs=i)) total_time = time.time() - start x.append(i) y.append(total_time)
На это уйдет несколько минут. Отображение времени выполнения в зависимости от потоков выглядит так:

Интересно: график выравнивается, а не продолжает уменьшаться с увеличением числа потоков. То есть использование 9 потоков не означает, что выполнение будет в 9 раз быстрее, чем с использованием 1 потока. Почему? К сожалению, вы не можете нарушить закон. И есть закон: Закон Амдала. Он говорит, что вы никогда не получите N-кратного ускорения, потому что есть неустранимая стоимость настройки. В этом случае я могу уменьшить время с примерно 21 секунды до 8 секунд, используя 4 потока. Неплохо!
Наконец, мне хотелось бы визуализировать конвейер. Для этой части проекта я выбрал формат диаграммы Mermaid. В последнее время он получил много поддержки, в том числе в хранилищах github. Формат очень простой и легко создается. Для отображения в github просто назовите файл с расширением .mmd. Вот как создать скрипт Mermaid с использованием picopipe:
from picopipe import to_mermaidwith open("pipeline.mmd", "w") as fp: fp.write(to_mermaid(p))
И вот как это выглядит при отображении в github:

К сожалению, на github не отображается функциональность при наведении мыши (определенной в CSS). Однако, если вы можете установить свой собственный CSS, то это позволяет не только визуализировать конвейер, но и показывать код шага при наведении курсора на блок шага:

Вышеуказанная диаграмма русалки с поддержкой наведения мыши была создана с использованием пользовательской системы панелей Comet (бесплатно для всех пользователей). Создание пользовательской панели, отображающей файлы Mermaid, оказалось очень простым. Вот демонстрация вышеупомянутой диаграммы русалки вживую: comet.com/dsblank/picopipe/a4c044c1657b464087ec44f67ae22709
Таким образом, мы завершили исследование разработки самой небольшой в мире платформы для обработки данных и изучили ее параллелизацию и визуализацию. Вы можете найти весь код здесь: github.com/dsblank/picopipe. Надеюсь, вы найдете представленные здесь идеи и конечный модуль полезными.
Интересуетесь искусственным интеллектом, машинным обучением или наукой о данных? Поставьте лайк и подпишитесь. Дуг – руководитель исследований в comet.com, компании по отслеживанию экспериментов в области машинного обучения и мониторинга моделей.