Создание отношений Many-To-One между столбцами в синтетической таблице с использованием пользовательских функций PySpark

Установка отношений Many-To-One между столбцами в синтетической таблице с помощью пользовательских функций PySpark

Изображение, сгенерированное с помощью DALL-E 3

Я недавно играл с генератором данных Databricks Labs, чтобы создавать полностью синтетические наборы данных с нуля. В рамках этого я изучал создание данных о продажах в разных магазинах, сотрудниках и клиентах. Поэтому я хотел создать связи между столбцами, которые я искусственно заполнял, такие как сопоставление сотрудников и клиентов с определенным магазином.

Используя PySpark UDF и немного логики, мы можем создавать связанные столбцы, которые следуют отношению “многие к одному”. С помощью небольшой магии мы даже можем расширить логику и создать некоторое изменение в этом сопоставлении, например, когда клиент в основном покупает в своем местном магазине, но иногда выбирает другой.

Используйте генератор данных Databricks Labs для создания базового DataFrame

Примечание: Вы можете пропустить этот раздел, если он не требуется!

Сначала нам нужно создать DataFrame с нашим первым случайно сгенерированным столбцом. В нашем случае мы начнем с магазина, поскольку логически у нас будет «много сотрудников на один магазин» и «много клиентов, которые регулярно покупают в магазине».

С учетом модели данных Star Schema, мы начнем с фактовой таблицы о продажах – транзакционной таблицы, которая будет содержать ключевые значения для идентификатора продажи, идентификатора магазина, идентификатора сотрудника и идентификатора клиента, сумму продажи и некоторые данные о дате и времени покупки. Затем мы можем заполнить дополнительную информацию о магазине, сотруднике и клиенте в таблицах-измерениях ниже.

Мы начнем с небольшой таблицы – таблицы с 1000 продажами нам будет достаточно. Теперь нам нужно решить, как распределить эти продажи между магазинами, сотрудниками и клиентами. Давайте предложим следующее:

  • # Магазины = 20
  • # Сотрудники = 100
  • # Клиенты = 700

Мы также можем сказать, что продажи будут записаны на протяжении прошлого месяца:

  • Первая дата продажи = 2023-11-01
  • Последняя дата продажи = 2023-11-30

Идентификатор продажи должен быть уникальным столбцом, чтобы мы могли сгенерировать столбец с идентификатором для этого. Теперь нам нужно распределить 1000 продаж по 20 магазинам. Для простоты мы будем считать, что это случайно.

Используя генератор Databricks Lab, мы можем сделать это с помощью следующего кода:

Теперь добавим некоторый код, чтобы записать, когда были совершены продажи, и их сумму. Чтобы упростить, мы округлим временную метку продажи до ближайшего часа.

Для расчета суммы продажи мы можем использовать параметр “expr” в выражении withColumn, чтобы сгенерировать случайное число с определенными правилами/границами.

В данном случае выражение довольно простое: сгенерировать случайное число (от 0 до 1), добавить 0,1 (чтобы гарантировать, что значения продажи не равны 0) и умножить на 350.

У нас теперь есть базовая структура DataFrame, объединяем все вместе:

Мы можем быстро создать анализ данных, чтобы посмотреть распределение значений в столбцах:

Изображение от автора: анализ данных, созданных в Databricks

Мы видим, что распределение идентификаторов магазинов достаточно однородно по всем 20 магазинам, без пропущенных значений, и имеет среднее значение в центре, как ожидается. То же самое относится к отметкам времени и значениям суммы.

Добавить зависимый столбец “Многие к одному”

Теперь мы можем добавить столбец с идентификатором сотрудника в DataFrame. Мы завершили работу с генератором данных Databricks Lab, поэтому будем использовать операции PySpark для добавления столбцов в DataFrame.

Отступая от кода, мы хотим представить это в виде следующих утверждений:

  • В наличии 20 магазинов.
  • У каждого магазина более 1 сотрудника.
  • Каждый сотрудник работает только в одном магазине.

Сначала нам нужно распределить сотрудников между магазинами. Для этого можно использовать следующую функцию на языке Python:

Теперь, когда у нас есть распределение сотрудников для каждого магазина, давайте начнем назначать Id!

Список employeesPerStore гарантирует, что идентификаторы сотрудников в каждом магазине не пересекаются. Мы можем использовать это для случайного назначения идентификатора сотрудника продаже в таблице с помощью следующего уравнения:

Эта функция в настоящее время работает только для одного значения. Нам нужно поместить это во что-то, с чем DataFrame в PySpark может работать (функционально и быстро!)

Мы можем передавать PySpark UDF в метод withColumn, поэтому давайте переформатируем эту логику в функцию и установим ее в UDF:

Теперь вызовем это как новую колонку в DataFrame:

Мы можем быстро проверить, что это выглядит правильно, используя инструмент Visualisation в Databricks для просмотра уникального количества идентификаторов сотрудников для каждого идентификатора магазина. Я предпочитаю это, но вы также можете использовать логику группировки или другие модули для построения графиков, если хотите.

Image by Author: Distinct count of Employee Ids per Store

Важное замечание: Эта логика позволяет пропустить сотрудников в результатах. Это означает, что сотрудник может не сделать ни одной продажи и, следовательно, не будет включен в DataFrame. Мы рассмотрим, как гарантировать, что все клиенты имеют записи о продажах на их имя, в следующем разделе.

Добавить колонку “клиенты”

Колонка “клиенты” немного отличается… хотя наш случай использования подразумевает, что клиент часто делает покупки в одном магазине, совершенно возможно, что он пошел в другой магазин однажды. Как нам это моделировать?

Мы уже имеем основу с работой над колонкой “сотрудники”, поэтому можем повторить функцию get_employees и логику UDF для клиентов, как показано ниже:

Мы снова могли пропустить нескольких клиентов. Вот несколько подходов для их исправления:

  • Пересчитайте в цикле until, пока не получите DataFrame, который содержит всех клиентов (неэффективно, затратно, может выполняться неопределенно долго)
  • Случайным образом обновляйте идентификаторы клиентов в цикле until, пока все клиенты не будут помещены в DataFrame (требуется логика для перезаписи только в тех же магазинах, также может выполняться неопределенно долго)
  • Верните список всех идентификаторов клиентов с более чем одной записью в таблице продажи и случайным образом перезапишите их, пока все недостающие идентификаторы не будут добавлены (также требуется логика для перезаписи клиентов в одном магазине, может потребоваться циклическая логика)
  • Обратите процесс и начните с сотрудников. Это гарантирует, что каждому сотруднику будет случайным образом назначено строки. Затем мы можем использовать сопоставление и применить идентификатор магазина.

Надеюсь, становится понятно, почему последний вариант является наименее трудоемким для вычислений – у нас есть весь необходимый код, просто нужно немного переформатировать его.

Наш новый скрипт выглядит следующим образом:

Изображение от автора: Databricks Data Profile for the new DataFrame

Добавление случайности клиентам

Теперь нам нужна небольшая случайность, которую мы должны определить. Для нашего примера давайте скажем, что у каждого клиента есть 90% шанс совершать покупки в обычном магазине («местный» магазин). Если нам не нужно, чтобы все клиенты были возвращены в наборе результатов, мы можем просто изменить наш customers_udf следующим образом и использовать df2:

Логика включает использование функции random.choices для предоставления взвешенного списка и возвращения одного значения.

Чтобы вычислить взвешенный список, у нас есть вес нашего «местного» магазина для клиента, в данном случае 90%, поэтому нужно назначить оставшиеся 10% другим магазинам, в данном случае 19 магазинам. Вероятность выбора каждого другого магазина будет равна 10/19 = 0.526%. Мы можем заполнить массив этими процентами, выглядящими примерно так:[0.526,0.526,0.526,…,90,0.526,…0.526]

Передавая это в random.choices, мы случайным образом выбираем идентификатор магазина из списка с соответствующими весами и используем его в качестве входного значения для переменной customer_id, так же, как раньше.

Примечание: Вывод random.choices возвращает список (так как вы можете запросить k результатов), поэтому для получения store_id в виде целочисленного значения необходимо получить элемент списка с индексом 0.

Если нам нужно объединить эту логику с DataFrame, включающим всех клиентов, мы можем немного изменить процесс. Логика с весами по-прежнему действительна, поэтому мы можем просто выбрать случайным образом магазин и вернуть это в качестве результата:

Изображение автора: пример итогового DataFrame в Databricks

Вывод

Вот и все! Строго и свободно связанный DataFrame, созданный синтетически. Теперь вы можете перейти к следующим шагам для заполнения связанных таблиц, которые могут содержать более подробную информацию, такую как таблицы размеров магазинов, адресов, имен сотрудников, ролей и т.д. Это также можно сделать с помощью Databricks Labs Data Generator или любого другого инструмента/процесса, с которым вы пользуетесь.

В репозитории Databricks Labs Data Generator на GitHub есть некоторые отличные примеры вместе с документацией, так что, если вас интересует больше информации, обязательно взгляните на это.

Весь мой код доступен в следующем репозитории GitHub.

Если у вас есть мысли, комментарии или альтернативы к этой демонстрации, пожалуйста, оставьте свое мнение в комментариях. Спасибо!