Овладение интеграцией данных из систем SAP с быстрым проектированием

Мастерство интеграции данных из систем SAP с быстрым проектированием

Исследование работой инженера-строителя - Stable diffusion

Введение

В нашей предыдущей публикации, От обработки данных до обработки запросов, мы продемонстрировали, как использовать ChatGPT для решения задач по подготовке данных. Кроме положительной обратной связи, которую мы получили, был выдвинут один важный вопрос: Может ли обработка запросов помочь в более сложной среде? Это справедливый вопрос. В последние десятилетия архитектуры данных стали все более разнообразными и сложными. В результате этой сложности, инженерам данных все больше приходится интегрировать различные источники данных, с которыми они не всегда знакомы. Может ли обработка запросов помочь в этом контексте?

В этой статье рассматривается данный вопрос на основе реального использования в управлении человеческими ресурсами. Мы применяем обучение на небольшой выборке для представления модели данных SAP HCM в ChatGPT и анализируем полученную информацию с помощью Apache Spark. Таким образом, мы иллюстрируем, как обработка запросов может быть полезной даже в сложных условиях работы с данными.

О бизнес-кейсе

Общая задача, которую каждая крупная компания должна выполнить, – определить количество сотрудников и их организационную принадлежность в конкретный момент времени. Связанные данные в нашем сценарии хранятся в системе управления человеческими ресурсами SAP HCM, которая является одним из ведущих приложений для управления человеческими ресурсами в предприятиях.

Для решения этой задачи каждому инженеру данных необходимо накопить большое количество знаний, связанных с бизнесом, которые тесно зависят от основной модели данных.

В этой статье будет предоставлена пошаговая инструкция по решению описанной бизнес-проблемы путем создания кода PySpark, который может использоваться для построения модели данных и, следовательно, основы для любого решения отчетности.

Пример отчета PowerBi, показывающего численность персонала

Шаг 1: Определить необходимую информацию

Одна из основных задач в области науки о данных – выбрать необходимую информацию в соответствии с бизнес-кейсом и определить ее источник в исходных системах. Для решения этой задачи мы должны привлечь некоторые бизнес-знания к ChatGPT. В этом целях мы обучаем ChatGPT некоторой информации о базовых таблицах SAP HCM, которые можно найти в справочнике SAP: Управление персоналом | Портал справки по SAP, комбинируя это с примерами записей в формате CSV для каждой таблицы.

В данном первом сценарии нашей целью является составление отчета о всех активных сотрудниках в конкретный момент времени. Результат должен также включать персональный номер сотрудника, имя, статус и организационную принадлежность.

Для получения всей необходимой информации нам нужно передать Схему базы данных в ChatGPT, включая примеры наборов данных и описания полей, используя подход few-shot prompting. Мы начнем с передачи основной информации о таблицах в ChatGPT.

Промпт:Учитывая следующие таблицы1. Таблица PA0000 - Действия сотрудникаПоле;Ключ;Элемент данных;Домен;Тип данных;Длина;Десятичные знаки;Краткое описание;Таблица проверкиMANDT;;MANDT;MANDT;CLNT;3;0;Клиент;T000PERNR;;PERSNO;PERSNO;NUMC;8 ;0;Табельный номер сотрудника;PA0003SUBTY;;SUBTY;SUBTY;CHAR;4;0;Подтип;OBJPS;;OBJPS;OBJPS;CHAR;2;0;Идентификация объекта...содержат следующий образец наборов данных:100;99992228;;;;9999-12-31T00:00:00.0000000;2023-07-01T00:00:00.0000000;0;2023-01-10T00:00:00.0000000;TRP0001;;;;;;;;;;;;;;10;3;;0;0100;99992229;;;;9999-12-31T00:00:00.0000000;2023-07-01T00:00:00.0000000;0;2023-03-29T00:00:00.0000000;TRP0001;;;;;;;;;;;;;;10;6;;0;02. Таблица T529U - Значения статусаПоле;Ключ;Элемент данных;Домен;Тип данных;Длина;Десятичные знаки;Краткое описание;Таблица проверкиMANDT;;MANDT;MANDT;CLNT;3;0;Клиент;*SPRSL;;SPRAS;SPRAS;LANG;1;0;Ключ языка;T002STATN;;STATN;STATN;NUMC;1;0;Номер статуса;...содержат следующие примеры наборов данных:MANDT,SPRSL,STATN,STATV,TEXT1100,D,2,3,активный100,D,2,0,уволенный....

Шаг 2: Объедините необходимые базовые таблицы и отфильтруйте только активных сотрудников

Теперь давайте создадим код для объединения базовых таблиц:

Prompt: Дайте мне код на pyspark, чтобы ограничить "Действия сотрудников" только теми сотрудниками, которые указаны как активные в соответствии с их соответствующими "Значениями статуса" для текстов на английском языке. В качестве источника для таблицы PA0000 используйте "structureVoAGI.PA0000", а в качестве источника для таблицы T529U используйте "structureVoAGI.T529U". Используйте осмысленные псевдонимы для столбцов.

Это приведет к довольно неплохому и хорошо отформатированному коду PySpark:

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# Инициализация сессии Sparkspark = SparkSession.builder.appName("EmployeeActionsFilter").getOrCreate()# Чтение таблиц в DataFramedf_pa0000 = spark.read.table("structureVoAGI.PA0000")df_t529u = spark.read.table("structureVoAGI.T529U")# Фильтрация T529U для соответствующего описания статуса и языкадф_active_status = df_t529u.filter(    (col("TEXT1") == "Active") &    (col("SPRSL") == "E"))# Объединение PA0000 с отфильтрованным T529U на основе поля статуса STAT2df_active_employees = df_pa0000.join(    df_active_status,    (df_pa0000.STAT2 == df_active_status.STATV) &    (df_active_status.STATN == 2)  # Предполагая, что STATN=2 указывает статус занятости на основе предоставленных данных).select(    df_pa0000["PERNR"].alias("EmployeeNumber"),    df_pa0000["MASSN"].alias("ActionType"),    df_pa0000["MASSG"].alias("ReasonForAction"),    df_active_status["TEXT1"].alias("StatusDescription"))# Показать результатdf_active_employees.show()# При необходимости вы можете записать этот DataFrame в назначение или дальше обрабатывать его.

Обратите внимание, что GPT-4 распознала условия объединения для обеих таблиц, основанных на столбце STAT2 таблицы PA0000 и столбце STATV таблицы T529U, которая является соответствующей прозрачной таблицей. Кроме того, созданный код содержит бизнес-описания в качестве псевдонимов столбцов для повышения его читаемости.

Шаг 3: Создайте график, отражающий историю сотрудников компаний

Теперь мы создадим DataFrame, который содержит значения дат для периода, начиная с 2020-01-01 и заканчивая 2024-01-01, и объединим все действующих сотрудников в соответствии с их датой начала работы (BEGDA) и возможной датой окончания работы (ENDDA), необходимо создать искусственную временную шкалу для объединения сотрудников.

PromptIn Pyspark: Создайте dataframe, содержащий одну строку для каждой даты, начиная с 2020-01-01 и заканчивая 2024-01-01. Столбец Date должен быть отформатирован как дата. Затем объедините всех действующих сотрудников в соответствии с их начальной и конечной датами. Результат должен содержать только активных сотрудников в соответствии с их статусом занятости.

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import explode, sequence, to_date, lit, col# Запуск сессии Sparkspark = SparkSession.builder.appName("EmployeeTimeline").getOrCreate()# Определение начальной и конечной датначальная_дата = to_date(lit("2020-01-01"))конечная_дата = to_date(lit("2024-01-01"))# Создание DataFrame с одной строкой для каждой датыdate_df = (spark.range(1)          .select(explode(sequence(начальная_дата, конечная_дата)).alias("date")))# Загрузка таблиц PA0000 и T529Upa0000_df = spark.read.table("structureVoAGI.PA0000")t529u_df = spark.read.table("structureVoAGI.T529U")# Фильтрация T529U для активных сотрудников с ключом языка = Eactive_t529u_df = t529u_df.filter((col("SPRSL") == "E") & (col("TEXT1") == "Активный"))# Объединение PA0000 с отфильтрованным T529U для получения активных сотрудниковactive_pa0000_df = pa0000_df.join(active_t529u_df, pa0000_df.STAT2 == active_t529u_df.STATV)# Объединение с DataFrame диапазона дат для получения графика активных сотрудниковactive_employee_timeline_df = date_df.join(active_pa0000_df, (date_df.date >= active_pa0000_df.BEGDA) & (date_df.date <= active_pa0000_df.ENDDA))# Показать DataFrameactive_employee_timeline_df.show()

Шаг 4: Разрушение связи родитель-потомок таблицы, определяя высший объект организационной структуры

В этом последнем шаге мы хотим построить DataFrame, представляющую организационную структуру компании и определить организационное назначение каждого объекта. Конкретно, мы хотим определить, к какой высшей организационной единице (например, области или подразделению) привязан каждый дочерний объект.

Организационная структура может быть использована для объединения временной шкалы сотрудников и получения подробной информации о организационном назначении каждого сотрудника на более позднем этапе. Для этого нам нужно использовать таблицу HRP1001 SAP:

Подсказка: Следующая таблица содержит организационное назначение каждого сотрудника: Таблица HRP1001 - Структура организации Поле;Элемент данных;Домен;Тип данных;Длина;Десятичные знаки;Краткое описание;Проверка таблицыMANDT;MANDT;MANDT;CLNT;3;0;Клиент;T000OTYPE;OTYPE;OTYPE;CHAR;2;0;Тип объекта;T778OOBJID;HROBJID;HROBJID;NUMC;8;0;Идентификатор объекта;PLOGI...содержащие следующие наборы данных:MANDT,OTYPE,OBJID,PLVAR,RSIGN,RELAT,ISTAT,PRIOX,BEGDA,ENDDA,VARYF,SEQNR,INFTY,OTJID,SUBTY,AEDTM,UNAME,REASN,HISTO,ITXNR,SCLAS,SOBID,PROZT,ADATANR100,P,00998833,A,209,1,,2018-01-01T00:00:00.0000000,9999-12-31T00:00:00.0000000,CP50035300,000,1001,P 00100769,A209,2020-12-11T00:00:00.0000000,TRP1000,,,00000000,CP,60095300,0.00,

Подсказка: Таблица HRP1001 содержит информацию о родительской и дочерней связи в организационной структуре компании. Столбец OBJID содержит идентификатор объекта, а столбец SOBID содержит связанный идентификатор родительского объекта. Столбец SCLAS содержит информацию о том, является ли идентификатор объекта должностью 'S', организационной единицей 'O' или ролью 'K'. В Pyspark: Дайте мне код, который создает dataframe, содержащий идентификатор объекта высшей доступной организационной единицы каждого объекта путем рекурсивного разрушения иерархии. Используйте понятные псевдонимы столбцов.

Модель Language создает рекурсивную функцию, которая ищет высшую уровневую организационную единицу (‘O’) для каждого объекта:

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# Инициализация сеанса Sparkspark = SparkSession.builder.appName("HighestAvailableOrgUnit").getOrCreate()HRP1001_df = spark.read.table("structureVoAGI.HRP1001")# Рекурсивная функция для поиска высшей доступной организационной единицыdef find_highest_org_unit(df, obj_id):    # Фильтрация строк для текущего идентификатора объекта    object_df = df.filter(col("OBJID") == obj_id)        if object_df.count() == 0:        return None        # Получение идентификатора родительского объекта (SOBID) и SCLAS для текущего объекта    parent_obj_id = object_df.select("SOBID").first()[0]    sclas = object_df.select("SCLAS").first()[0]        # Рекурсивный поиск высшей доступной организационной единицы    highest_org_unit = find_highest_org_unit(df, parent_obj_id)        # Если текущий объект является организационной единицей ('O') или highest_org_unit равно None, обновляем результат    if sclas == 'O' or highest_org_unit is None:        highest_org_unit = obj_id        return highest_org_unit# Создание UDF (User-Defined Function) для применения рекурсивной функцииfrom pyspark.sql.functions import udffrom pyspark.sql.types import StringTypefind_highest_org_unit_udf = udf(find_highest_org_unit, StringType())# Применение UDF для поиска высшей доступной организационной единицы для каждого объектаhighest_org_unit_df = df.withColumn("HighestAvailableOrgUnit", find_highest_org_unit_udf(df, col("OBJID")))# Выбор понятных столбцов и псевдонимовresult_df = highest_org_unit_df.select(    col("OBJID").alias("ObjectID"),    col("HighestAvailableOrgUnit").alias("HighestAvailableOrgUnit"))# Показ результирующего DataFrameresult_df.show()# Остановка сеанса Sparkspark.stop()

Даже если созданный код не очень оптимизирован, он содержит рекурсивную функцию для разрушения иерархии. Пользователи, предпочитающие общие табличные выражения (CTE), могут указать подсказку (с использованием общего табличного выражения) во входном запросе, чтобы создать более читаемое и понятное выражение PySpark.

from pyspark.sql import SparkSession# Создание сессии Sparkspark = SparkSession.builder.appName("OrganizationalUnitHierarchy").getOrCreate()# Определение DataFrame HRP1001 (предполагается, что он уже загружен)# Замените 'hrp1001_df' на ваш фактический DataFrame# Убедитесь, что у вас есть соответствующие столбцы: 'OBJID', 'SOBID' и 'SCLAS'# Регистрация DataFrame HRP1001 как временной таблицы SQLhrp1001_df.createOrReplaceTempView("hrp1001")# Определение рекурсивного запроса для разрешения иерархии и поиска высшего организационного подразделенияrecursive_query = """WITH RECURSIVE OrgHierarchy AS (  SELECT OBJID, SOBID, SCLAS  FROM hrp1001  WHERE SCLAS = 'O'  -- Начинаем с организационных подразделений    UNION ALL    SELECT h.OBJID, p.SOBID, p.SCLAS  FROM OrgHierarchy h  JOIN hrp1001 p ON h.SOBID = p.OBJID)SELECT DISTINCT  h.OBJID AS EmployeeID,  h.SOBID AS HighestOrganizationalUnitFROM OrgHierarchy h"""# Выполнение рекурсивного запроса и создание результирующего DataFrameresult_df = spark.sql(recursive_query)# Показать результатresult_df.show()# Остановка сеанса Spark после завершенияspark.stop()

Обе версии кода создадут фрейм данных для аппроксимированной иерархической организационной структуры, которую можно использовать для дальнейших шагов по интеграции данных, просто объединив ее с ранее созданным фреймом данных.

Набор данных, содержащий иерархическую информацию

Заключение

Мы показали, как решать более сложные задачи по инженерии данных на практическом примере, извлекая и интегрируя данные из систем SAP с использованием ChatGPT для генерации кода PySpark. Большие языковые модели пока не идеальны, но каждый уже может себе представить, насколько эффективными могут быть эти техники для инженеров данных. Есть несколько ключевых моментов:

  • ChatGPT способен понимать фундаментальные принципы моделей данных. Вы можете уточнить его понимание, используя техники подачи данных, чтобы предоставить более глубокие знания.
  • Даже если подход не дает сразу идеальный код, мы легко можем настроить созданный код под наши индивидуальные сценарии.
  • Благодаря широкой доступности открытых справочных документов и баз знаний SAP данный подход может быть расширен до решения, основанного на Retrieval-Augmented Generation (RAG).

Когда дело доходит до оптимальных практик по проектированию подсказок, старайтесь быть максимально точными и предоставлять коды ошибок, возвращаемые вашей средой Spark, чтобы использовать возможности LLM для рефакторинга сгенерированного кода. Возможно, для улучшения результатов ChatGPT поможет добавление ключевых слов, таких как “точно”. Просите подробное объяснение подхода к решению, таким образом модель трансформера ChatGPT будет исследовать глубже.

Примечание: Подсказки, содержащие примеры наборов данных в формате CSV, были обрезаны из-за ограничений длины данной статьи.

Об авторах

Маркус Штади – старший инженер облачных данных в Dehn SE, работает в области инженерии данных, науки о данных и аналитики данных многие годы.

Кристиан Кох – главный архитектор предприятия в BWI GmbH и преподаватель в Техническом институте Георга Симона Ома в Нюрнберге.