Оптимизация процесса обработки данных ETL в Talent.com с помощью Amazon SageMaker

Улучшение процесса обработки данных ETL в Talent.com с помощью Amazon SageMaker

Этот пост написан совместно Анатолием Хоменко, инженером машинного обучения, и Абденуром Беззухом, главным техническим директором в Talent.com.

Основанная в 2011 году, Talent.com собирает платные вакансии от своих клиентов и вакансии открытых источников, создавая единообразную и легко находимую платформу. Охватывая более 30 миллионов вакансий в более чем 75 странах и охватывая различные языки, отрасли и каналы распределения, Talent.com удовлетворяет разнообразные потребности соискателей, эффективно связывая миллионы соискателей с вакансиями.

Миссия Talent.com – облегчить глобальные связи на рынке труда. Для достижения этой цели Talent.com собирает вакансии с различных источников в Интернете, предлагая соискателям доступ к обширной базе из более чем 30 миллионов вакансий, соответствующих их навыкам и опыту. Согласно этой миссии, Talent.com сотрудничает с AWS для разработки передового движка рекомендаций вакансий на основе глубокого обучения, который поможет пользователям продвигать свою карьеру.

Для обеспечения эффективной работы этого движка рекомендаций, необходимо реализовать масштабную обработку данных, отвечающую за извлечение и уточнение характеристик из собранных вакансий Talent.com. Эта обработка может обрабатывать 5 миллионов записей ежедневно менее чем за 1 час и позволяет обрабатывать несколько дней записей параллельно. Кроме того, данное решение позволяет быстро развернуться в производстве. Основным источником данных для этой обработки является JSON Lines формат, хранящийся в Amazon Simple Storage Service (Amazon S3) и разделенный по датам. Каждый день это приводит к созданию десятков тысяч файлов JSON Lines с ежедневными обновлениями.

Основная цель этой обработки данных – обеспечить создание характеристик, необходимых для обучения и развертывания движка рекомендаций Talent.com. Следует отметить, что эта обработка должна поддерживать инкрементные обновления и учитывать сложные требования к извлечению характеристик, необходимых для обучения и развертывания модулей, необходимых для системы рекомендаций вакансий. Наша обработка данных относится к общему процессу ETL (извлечение, преобразование и загрузка), который объединяет данные из разных источников в едином хранилище.

Для более подробной информации о том, как Talent.com и AWS совместно создали передовые методы обучения моделей глубокого обучения, используя Amazon SageMaker для создания системы рекомендаций вакансий, см. From text to dream job: Building an NLP-based job recommender at Talent.com with Amazon SageMaker. Система включает инженерию характеристик, проектирование архитектуры модели глубокого обучения, оптимизацию гиперпараметров и оценку модели, где все модули запускаются с использованием Python.

В этом посте показано, как мы использовали SageMaker для создания масштабной обработки данных для движка рекомендаций вакансий в Talent.com. Полученное решение позволяет Data Scientist создавать извлечение характеристик в блокноте SageMaker с использованием библиотек Python, таких как Scikit-Learn или PyTorch, а затем быстро развернуть тот же код в обработке данных, выполняющей извлечение характеристик в масштабе. Решение не требует переноса кода извлечения характеристик для использования PySpark, что необходимо при использовании AWS Glue в качестве решения для ETL. Наше решение может быть разработано и развернуто полностью Data Scientist, используя только SageMaker, и не требует знания других решений ETL, таких как AWS Batch. Это значительно сокращает время, необходимое для развертывания конвейера машинного обучения (ML) в производству. Конвейер управляется через Python и без проблем интегрируется с рабочими процессами извлечения характеристик, что делает его адаптивным для широкого спектра приложений в области аналитики данных.

Обзор решения

Обзор конвейера обработки данных с использованием SageMaker Processing

Трубопровод состоит из трех основных фаз:

  1. Используйте задание обработки Amazon SageMaker для обработки необработанных файлов JSONL, связанных с определенным днем. Несколько дней данных могут быть обработаны с помощью отдельных заданий обработки одновременно.
  2. Используйте AWS Glue для перебора данных после обработки нескольких дней данных.
  3. Загрузите обработанные характеристики для указанного диапазона дат с использованием SQL из таблицы Amazon Athena, затем обучите и разверните модель рекомендаций по задачам.

Обработка необработанных файлов JSONL

Мы обрабатываем необработанные файлы JSONL для указанного дня с помощью задания обработки SageMaker. Задание реализует извлечение характеристик и сжатие данных и сохраняет обработанные характеристики в файлах Parquet с 1 миллионом записей в файле. Мы используем параллелизацию процессора для выполнения извлечения характеристик для каждого необработанного файла JSONL параллельно. Результаты обработки каждого файла JSONL сохраняются в отдельный файл Parquet во временном каталоге. После обработки всех файлов JSONL мы выполняем сжатие тысяч маленьких файлов Parquet в несколько файлов с 1 миллионом записей в файле. Сжатые файлы Parquet затем загружаются в Amazon S3 в качестве выходных данных задания обработки. Сжатие данных обеспечивает эффективную переборку и выполнение SQL-запросов на следующих этапах трубопровода.

Вот пример кода для планирования задания обработки SageMaker для указанного дня, например 2020-01-01, с использованием SDK SageMaker. Задание читает необработанные файлы JSONL из Amazon S3 (например, из s3://bucket/raw-data/2020/01/01) и сохраняет сжатые файлы Parquet в Amazon S3 (например, в s3://bucket/processed/table-name/day_partition=2020-01-01/).

### установка зависимостей %pip install sagemaker pyarrow s3fs awswranglerimport sagemakerimport boto3from sagemaker.processing import FrameworkProcessorfrom sagemaker.sklearn.estimator import SKLearnfrom sagemaker import get_execution_rolefrom sagemaker.processing import ProcessingInput, ProcessingOutputregion = boto3.session.Session().region_namerole = get_execution_role()bucket = sagemaker.Session().default_bucket()### мы используем экземпляр с 16 ЦП и 128 ГБ памяти### обратите внимание, что скрипт НЕ будет загружать все данные в память во время сжатия### в зависимости от размера отдельных файлов jsonl, может потребоваться больший экземплярinstance = "ml.r5.4xlarge"n_jobs = 8  ### мы используем 8 рабочих процессовscript_department = "2020-01-01" ### обработать данные за один деньest_cls = SKLearnframework_version_str = "0.20.0"### планирование задания обработкиscript_processor = FrameworkProcessor(    role=role,    instance_count=1,    instance_type=instance,    estimator_cls=est_cls,    framework_version=framework_version_str,    volume_size_in_gb=500,)script_processor.run(    code="processing_script.py", ### имя основного обрабатывающего сценария    source_dir="../src/etl/", ### расположение каталога с исходным кодом    inputs=[], ### ввод обработки пуст    outputs=[        ProcessingOutput(destination="s3://bucket/processed/table-name/",                         source="/opt/ml/processing/output"),    ],    arguments=[        ### каталог с результатами работы задания        "--output", "/opt/ml/processing/output",        ### временный каталог внутри экземпляра        "--tmp_output", "/opt/ml/tmp_output",        "--n_jobs", str(n_jobs), ### количество рабочих процессов        "--date", date, ### дата для обработки        ### расположение с необработанными файлами jsonl в S3        "--path", "s3://bucket/raw-data/",    ],    wait=False)

Следующий основной сценарий (processing_script.py), который выполняет задание обработки SageMaker, имеет следующую структуру кода:

import concurrentimport pyarrow.dataset as dsimport osimport s3fsfrom pathlib import Path### функция для обработки необработанного файла jsonl и сохранения извлеченных характеристик в файл parquet  from process_data import process_jsonl### разбор аргументов командной строкиargs = parse_args()### мы используем s3fs для перебора пути ввода S3 для необработанных файлов jsonlfs = s3fs.S3FileSystem()### мы предполагаем, что необработанные файлы jsonl хранятся в каталогах S3, разделенных по датам### например: s3://bucket/raw-data/2020/01/01/jsons = fs.find(os.path.join(args.path, *args.date.split('-')))### местоположение временного каталога внутри экземпляра задания обработкиtmp_out = os.path.join(args.tmp_output, f"day_partition={args.date}")### расположение каталога с результатами заданияout_dir = os.path.join(args.output, f"day_partition={args.date}")### обработка отдельных файлов jsonl параллельно с использованием рабочих процессовfutures=[]with concurrent.futures.ProcessPoolExecutor(max_workers=args.n_jobs) as executor:    for file in jsons:        inp_file = Path(file)        out_file = os.path.join(tmp_out, inp_file.stem + ".snappy.parquet")        ### функция process_jsonl читает необработанный файл jsonl из расположения S3 (inp_file)        ### и сохраняет результат в файл Parquet (out_file) во временном каталоге        futures.append(executor.submit(process_jsonl, file, out_file))    ### ожидание обработки всех файлов jsonl    for future in concurrent.futures.as_completed(futures):        result = future.result()### сжатие файлов parquetdataset = ds.dataset(tmp_out)if len(dataset.schema) > 0:    ### сохранение сжатых файлов parquet с 1 млн записей в файле    ds.write_dataset(dataset, out_dir, format="parquet",                      max_rows_per_file=1024 * 1024)

Масштабируемость является ключевой особенностью нашего конвейера. Во-первых, несколько задач SageMaker Processing могут использоваться одновременно для обработки данных в течение нескольких дней. Во-вторых, мы избегаем загрузки всей обработанной или исходной информации в память сразу, обрабатывая каждый указанный день данных. Это позволяет обрабатывать данные, используя типы экземпляров, которые не могут вместить весь объем данных за день в оперативную память. Единственное требование заключается в том, что тип экземпляра должен быть способен загружать N файлов JSONL или обработанных файлов Parquet в память одновременно, где N – это количество рабочих процессов, используемых в процессе.

Перехватить обработанные данные с помощью AWS Glue

После обработки всех исходных данных для нескольких дней мы можем создать таблицу Athena из всего набора данных с помощью движка AWS Glue. Мы используем библиотеку AWS SDK для pandas (awswrangler), чтобы создать таблицу с помощью следующего фрагмента кода:

import awswrangler as wr### перехватываем обработанные данные в S3res = wr.s3.store_parquet_metadata(    path='s3://bucket/processed/table-name/',    database="database_name",    table="table_name",    dataset=True,    mode="overwrite",    sampling=1.0,    path_suffix='.parquet',)### печатаем схему таблицыprint(res[0])

Загрузка обработанных характеристик для обучения

Теперь можно загрузить обработанные характеристики для указанного диапазона дат из таблицы Athena с использованием SQL, и эти характеристики могут быть использованы для обучения модели рекомендаций задач. Например, следующий фрагмент кода загружает один месяц обработанных характеристик в DataFrame с использованием библиотеки awswrangler:

import awswrangler as wrquery = """    SELECT *     FROM table_name    WHERE day_partition BETWEN '2020-01-01' AND '2020-02-01' """### загружаем 1 месяц данных из database_name.table_name в DataFramedf = wr.athena.read_sql_query(query, database='database_name')

Кроме того, использование SQL для загрузки обработанных характеристик для обучения можно расширить, чтобы поддержать различные другие случаи использования. Например, мы можем применить аналогичный конвейер для поддержки двух отдельных таблиц Athena: одна для хранения впечатлений пользователей, а другая для хранения кликов пользователей на эти впечатления. С помощью операторов объединения SQL мы можем получить впечатления, на которые пользователи нажали или не нажимали, а затем передать эти впечатления в задачу обучения модели.

Преимущества решения

Внедрение предложенного решения приносит несколько преимуществ нашему существующему рабочему процессу, включая:

  • Упрощение реализации – Решение позволяет выполнить извлечение характеристик с использованием популярных библиотек машинного обучения на Python. Кроме того, нет необходимости портировать код на PySpark. Это упрощает извлечение характеристик, так как тот же самый код, разработанный Data Scientist в блокноте, будет выполняться этим конвейером.
  • Быстрый путь к производству – Решение может быть разработано и внедрено Data Scientist для выполнения извлечения характеристик в масштабе, что дает им возможность разработать модель рекомендаций на основе этих данных. В то же время это же решение может быть развернуто в производстве ML Engineer с минимальными модификациями.
  • Повторное использование – Решение предоставляет повторяемый шаблон для извлечения характеристик в масштабе и легко адаптируется для других случаев использования, кроме создания моделей рекомендаций.
  • Эффективность – Решение обеспечивает хорошую производительность: обработка одного дня данных на Talent.com занимает менее 1 часа.
  • Инкрементальные обновления – Решение также поддерживает инкрементные обновления. Новые ежедневные данные могут быть обработаны задачей SageMaker Processing, и S3 расположение, содержащее обработанные данные, может быть повторно просканировано для обновления таблицы Athena. Мы также можем использовать планировщик cron job для обновления данных за сегодня несколько раз в день (например, каждые 3 часа).

Мы использовали этот конвейер ETL, чтобы помочь Talent.com обрабатывать 50 000 файлов в день, содержащих 5 миллионов записей, и создать данные для обучения, используя характеристики, извлеченные из 90 дней исходных данных от Talent.com – всего 450 миллионов записей в 900 000 файлах. Наш конвейер помог Talent.com создать и развернуть систему рекомендаций в производство всего за 2 недели. Решение выполнило все процессы ML, включая ETL, на Amazon SageMaker без использования других служб AWS. Система рекомендаций задач увеличила показатель кликабельности на 8,6% в онлайн-тестировании A/B по сравнению с предыдущим решением на основе XGBoost, что помогло связать миллионы пользователей Talent.com с лучшими рабочими местами.

Заключение

В этом посте описана ETL-процедура, которую мы разработали для обработки данных при обучении и развертывании модели рекомендаций вакансий на Talent.com. Наша процедура использует задания обработки SageMaker для эффективной обработки данных и извлечения признаков в большом масштабе. Код извлечения признаков реализован на Python, что позволяет использовать популярные библиотеки машинного обучения для выполнения извлечения признаков в большом масштабе, без необходимости переносить код на использование PySpark.

Мы призываем читателей исследовать возможность использования представленной в этом блоге процедуры как шаблона для их собственных случаев, когда требуется извлечение признаков в большом масштабе. Процедуру можно использовать Data Scientist’у для создания модели машинного обучения, а затем ту же самую процедуру можно использовать ML Engineer’у для запуска в продакшн. Это может значительно сократить время, необходимое для продуктизации решения машинного обучения от начала до конца, как это было с Talent.com. Читатели могут обратиться к руководству по настройке и запуску заданий обработки SageMaker. Мы также рекомендуем читателям ознакомиться с постом From text to dream job: Building an NLP-based job recommender at Talent.com with Amazon SageMaker, где мы обсуждаем методики обучения модели глубокого обучения с использованием Amazon SageMaker для создания системы рекомендаций вакансий Talent.com.