Проверка данных для приложений PySpark с использованием Pandera

Проверка данных PySpark с Pandera

 

Если вы являетесь практикующим в области данных, то вы цените важность проверки данных для обеспечения точности и согласованности. Это особенно важно при работе с большими наборами данных или данными из разных источников. Однако библиотека Pandera для Python может помочь упростить и автоматизировать процесс проверки данных. Pandera – это библиотека с открытым исходным кодом, разработанная для упрощения задач схемы и проверки данных. Она основана на надежности и гибкости библиотеки pandas и вводит интуитивный и выразительный API, специально разработанный для проверки данных.

В этой статье кратко представлены основные функции Pandera, а затем объясняется, как можно интегрировать проверку данных Pandera с рабочими процессами обработки данных, использующими нативный PySpark SQL с последней версии (Pandera 0.16.0).

Pandera разработана для работы с другими популярными библиотеками Python, такими как pandas, pyspark.pandas, Dask и т. д. Это позволяет легко интегрировать проверку данных в существующие рабочие процессы обработки данных. До недавнего времени Pandera не имела нативной поддержки для PySpark SQL, но для заполнения этого пробела команда QuantumBlack, AI by McKinsey, включающая Исмаила Негма-ПАРИ, Нирджа Малхотра, Джаскарана Сингха Сидану, Каспера Янехага, Олександра Лазарчука, а также основателя Pandera Нильса Бантилана разработали нативную поддержку PySpark SQL и внесли вклад в Pandera. Текст этой статьи также был подготовлен командой и написан их словами ниже.

 

Основные функции Pandera

 

Если вы не знакомы с использованием Pandera для проверки ваших данных, рекомендуем ознакомиться с статьей Хуен Трана “Проверка вашего DataFrame с помощью Pandera”, в которой описываются основы. Вкратце здесь мы кратко объясним основные функции и преимущества простого и интуитивного API, встроенных функций проверки и настройки.

 

Простой и интуитивный API

 

Одной из выдающихся особенностей Pandera является его простой и интуитивный API. Вы можете определить схему данных с использованием декларативного синтаксиса, который легко читать и понимать. Это позволяет легко написать проверочный код данных, который является эффективным и эффективным.

Вот пример определения схемы в Pandera:

class InputSchema(pa.DataFrameModel):
   year: Series[int] = pa.Field()
   month: Series[int] = pa.Field()
   day: Series[int] = pa.Field()

 

Встроенные функции проверки

 

Pandera предоставляет набор встроенных функций (обычно называемых проверками) для выполнения проверки данных. Когда мы вызываем validate()на схеме Pandera, она выполняет проверку схемы и данных. Проверка данных вызывает функции check внутри.

Вот простой пример того, как запустить проверку данных на объекте dataframe с использованием Pandera.

class InputSchema(pa.DataFrameModel):
   year: Series[int] = pa.Field(gt=2000, coerce=True)
   month: Series[int] = pa.Field(ge=1, le=12, coerce=True)
   day: Series[int] = pa.Field(ge=0, le=365, coerce=True)

InputSchema.validate(df)

 

Как видно из приведенного выше примера, для поля year мы определили проверку gt=2000, требующую, чтобы все значения в этом поле были больше 2000, в противном случае Pandera вызовет ошибку проверки.

Вот список всех встроенных проверок, доступных по умолчанию в Pandera:

eq: проверяет, равно ли значение данному литералу
ne: проверяет, отличается ли значение от данного литерала
gt: проверяет, больше ли значение данного литерала
ge: проверяет, больше ли значение и равно данному литералу
lt: проверяет, меньше ли значение данного литерала
le: проверяет, меньше ли значение и равно данному литералу
in_range: проверяет, входит ли значение в заданный диапазон
isin: проверяет, входит ли значение в заданный список литералов
notin: проверяет, не входит ли значение в заданный список литералов
str_contains: проверяет, содержит ли значение строковый литерал
str_endswith: проверяет, заканчивается ли значение строковым литералом
str_length: проверяет, соответствует ли длина значения
str_matches: проверяет, соответствует ли значение строковому литералу
str_startswith: проверяет, начинается ли значение с заданного строкового литерала

 

Пользовательские функции проверки

 

Помимо встроенных проверок, Pandera позволяет определить собственные пользовательские функции проверки. Это дает вам гибкость определения собственных правил проверки на основе конкретной ситуации.

Например, вы можете определить лямбда-функцию для проверки данных, как показано здесь:

schema = pa.DataFrameSchema({
   "column2": pa.Column(str, [
       pa.Check(lambda s: s.str.startswith("value")),
       pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
   ]),
})

 

Добавление поддержки для PySpark SQL DataFrames в Pandera

 

В процессе добавления поддержки для PySpark SQL мы придерживались двух основных принципов: 

  • согласованности интерфейса и пользовательского опыта
  • оптимизации производительности для PySpark.

Сначала давайте рассмотрим вопрос согласованности, потому что важно, чтобы у пользователя был согласованный набор API и интерфейс, независимо от выбранного фреймворка. Поскольку Pandera предоставляет несколько фреймворков на выбор, было еще более важно иметь согласованный опыт использования в API PySpark SQL.

С учетом этого мы можем определить схему Pandera с использованием PySpark SQL следующим образом:

from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
import pandera.pyspark as pa

spark = SparkSession.builder.getOrCreate()


class PanderaSchema(DataFrameModel):
       """Тестовая схема"""
       id: T.IntegerType() = Field(gt=5)
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()
       description: T.ArrayType(T.StringType()) = Field()
       meta: T.MapType(T.StringType(), T.StringType()) = Field()


data_fail = [
       (5, "Bread", 44.4, ["описание продукта"], {"категория продукта": "молочные продукты"}),
       (15, "Butter", 99.0, ["дополнительные подробности здесь"], {"категория продукта": "хлебобулочные изделия"}),
   ]

spark_schema = T.StructType(
       [
           T.StructField("id", T.IntegerType(), False),
           T.StructField("product", T.StringType(), False),
           T.StructField("price", T.DecimalType(20, 5), False),
           T.StructField("description", T.ArrayType(T.StringType(), False), False),
           T.StructField(
               "meta", T.MapType(T.StringType(), T.StringType(), False), False
           ),
       ],
   )
df_fail = spark_df(spark, data_fail, spark_schema)

 

В приведенном выше коде, PanderaSchema определяет схему для входящего PySpark dataframe. Он имеет 5 полей с различными типами данных и проверкой данных на полях id и product_name.

class PanderaSchema(DataFrameModel):
       """Тестовая схема"""
       id: T.IntegerType() = Field(gt=5)
       product_name: T.StringType() = Field(str_startswith="B")
       price: T.DecimalType(20, 5) = Field()
       description: T.ArrayType(T.StringType()) = Field()
       meta: T.MapType(T.StringType(), T.StringType()) = Field()

 

Далее мы создали фиктивные данные и применили собственную схему PySpark SQL, определенную в spark_schema.

spark_schema = T.StructType(
       [
           T.StructField("id", T.IntegerType(), False),
           T.StructField("product", T.StringType(), False),
           T.StructField("price", T.DecimalType(20, 5), False),
           T.StructField("description", T.ArrayType(T.StringType(), False), False),
           T.StructField(
               "meta", T.MapType(T.StringType(), T.StringType(), False), False
           ),
       ],
   )

df_fail = spark_df(spark, data_fail, spark_schema)

 

Это сделано для имитации ошибок проверки схемы и данных.

Вот содержимое датафрейма df_fail:

df_fail.show()

   +---+-------+--------+--------------------+--------------------+
   | id|product|   price|         description|                meta|
   +---+-------+--------+--------------------+--------------------+
   |  5|  Хлеб |44.40000|  [описание продукта]|{категория_продук...|
   | 15| Масло |99.00000| [подробности здесь] |{категория_продук...|
   +---+-------+--------+--------------------+--------------------+

 

Затем мы можем вызвать функцию validate из Pandera для выполнения проверки схемы и проверки данных следующим образом:

df_out = PanderaSchema.validate(check_obj=df)

 

Мы рассмотрим содержимое df_out в ближайшее время.

 

Оптимизация производительности для PySpark

 

Наш вклад был специально разработан для оптимальной производительности при работе с объектами данных PySpark, что крайне важно при работе с большими наборами данных для обработки уникальных задач среды распределенных вычислений PySpark.

Pandera использует архитектуру распределенных вычислений PySpark для эффективной обработки больших наборов данных при сохранении надежности и точности данных. Мы переписали пользовательские функции проверки Pandera для достижения более быстрой и эффективной проверки больших объемов данных, снижая риск ошибок данных и несоответствий при высокой нагрузке.

 

Подробные отчеты об ошибках

 

Мы также добавили в Pandera возможность генерации подробных отчетов об ошибках в виде объекта словаря Python. Эти отчеты доступны через объект dataframe, возвращаемый функцией validate. Они предоставляют подробное описание всех проверок схемы и данных, в соответствии с настройками пользователя.

Эта функция оказывается ценной для разработчиков, позволяя быстро выявлять и устранять любые проблемы, связанные с данными. Используя сгенерированный отчет об ошибках, команды могут составить полный список проблем схемы и данных в своем приложении. Это позволяет им установить приоритеты и решать проблемы с эффективностью и точностью.

Важно отметить, что эта функция в настоящее время доступна исключительно для PySpark SQL, предлагая пользователям улучшенный опыт работы с отчетами об ошибках в Pandera.

В приведенном выше примере кода, помните, что мы вызывали функцию validate() для объекта spark dataframe:

df_out = PanderaSchema.validate(check_obj=df)

 

Она возвращает объект dataframe. С помощью аксессоров мы можем извлечь отчет об ошибках из него следующим образом:

print(df_out.pandera.errors)

 

{
  "SCHEMA":{
     "COLUMN_NOT_IN_DATAFRAME":[
        {
           "schema":"PanderaSchema",
           "column":"PanderaSchema",
           "check":"column_in_dataframe",
           "error":"column 'product_name' not in dataframe Row(id=5, product='Bread', price=None, description=['description of product'], meta={'product_category': 'dairy'})"
        }
     ],
     "WRONG_DATATYPE":[
        {
           "schema":"PanderaSchema",
           "column":"description",
           "check":"dtype('ArrayType(StringType(), True)')",
           "error":"expected column 'description' to have type ArrayType(StringType(), True), got ArrayType(StringType(), False)"
        },
        {
           "schema":"PanderaSchema",
           "column":"meta",
           "check":"dtype('MapType(StringType(), StringType(), True)')",
           "error":"expected column 'meta' to have type MapType(StringType(), StringType(), True), got MapType(StringType(), StringType(), False)"
        }
     ]
  },
  "DATA":{
     "DATAFRAME_CHECK":[
        {
           "schema":"PanderaSchema",
           "column":"id",
           "check":"greater_than(5)",
           "error":"column 'id' with type IntegerType() failed validation greater_than(5)"
        }
     ]
  }
}

 

Как видно из приведенного выше примера, отчет об ошибках агрегируется на 2 уровнях в виде словаря Python, который легко можно использовать в последующих приложениях, например, для визуализации ошибок по времени с помощью инструментов, таких как Grafana:

  1. тип проверки = SCHEMA или DATA
  2. категория ошибок = DATAFRAME_CHECK или WRONG_DATATYPE и т.д.

Этот новый формат для структурирования отчетов об ошибках был введен в версии 0.16.0 в качестве нашего вклада.

Переключатель Вкл./Выкл.

Для приложений, основанных на PySpark, наличие переключателя Вкл./Выкл. является важной функцией, которая может существенно повлиять на гибкость и управление рисками. В частности, переключатель Вкл./Выкл. позволяет командам отключать проверку данных в производственной среде без необходимости внесения изменений в код.

Это особенно важно для крупных потоков обработки больших данных, где производительность является критическим фактором. Во многих случаях проверка данных может занимать значительное время обработки, что может отразиться на общей производительности потока обработки. С помощью переключателя Вкл./Выкл. команды могут быстро и легко отключать проверку данных при необходимости, не прибегая к затратному процессу изменения кода.

Наша команда внедрила переключатель Вкл./Выкл. в Pandera, чтобы пользователи могли легко отключать проверку данных в производственной среде, просто изменив настройки конфигурации. Это обеспечивает гибкость, необходимую для приоритезации производительности, когда это необходимо, не жертвуя качеством или точностью данных в разработке.

Для активации проверок установите следующее в переменных среды:

export PANDERA_VALIDATION_ENABLED=False

Это будет использоваться Pandera для отключения всех проверок в приложении. По умолчанию проверка включена.

В настоящее время эта функция доступна только для PySpark SQL начиная с версии 0.16.0, поскольку это новая концепция, внедренная нашим вкладом.

Точное управление выполнением Pandera

В дополнение к функции переключателя Вкл./Выкл., мы также внедрили более точное управление выполнением проверок Pandera. Это достигается путем введения настраиваемых параметров, позволяющих пользователям контролировать выполнение на трех разных уровнях:

  1. SCHEMA_ONLY: Этот параметр выполняет только проверку схемы. Он проверяет, соответствуют ли данные определению схемы, но не выполняет дополнительные проверки на уровне данных.
  2. DATA_ONLY: Этот параметр выполняет только проверку на уровне данных. Он проверяет данные на соответствие заданным ограничениям и правилам, но не проверяет схему.
  3. SCHEMA_AND_DATA: Этот параметр выполняет проверку как схемы, так и данных. Он проверяет данные как по определению схемы, так и по заданным ограничениям и правилам.

Предоставляя такое точное управление, пользователи могут выбрать уровень проверки, который наилучшим образом соответствует их конкретному случаю использования. Например, если основной задачей является обеспечение соответствия данных определенной схеме, можно использовать параметр SCHEMA_ONLY для сокращения времени обработки. Альтернативно, если известно, что данные соответствуют схеме и основное внимание уделяется обеспечению качества данных, можно использовать параметр DATA_ONLY для приоритета проверок на уровне данных.

Расширенное управление выполнением Pandera позволяет пользователям найти обдуманный баланс между точностью и эффективностью, обеспечивая более целевой и оптимизированный опыт проверки.

export PANDERA_VALIDATION_DEPTH=SCHEMA_ONLY

По умолчанию проверки включены, а глубина установлена на SCHEMA_AND_DATA, что может быть изменено на SCHEMA_ONLY или DATA_ONLY в зависимости от конкретного случая использования.

В настоящее время эта функция доступна только для PySpark SQL начиная с версии 0.16.0, поскольку это новая концепция, внедренная нашим вкладом.

Метаданные на уровне столбцов и таблиц

Наша команда добавила новую функцию в Pandera, которая позволяет пользователям хранить дополнительные метаданные на уровне Field и Schema / Model. Эта функция предназначена для встраивания контекстной информации в определения схемы, которую можно использовать другими приложениями.

Например, сохраняя детали о конкретном столбце, такие как тип данных, формат или единицы измерения, разработчики могут гарантировать, что последующие приложения смогут правильно интерпретировать и использовать данные. Аналогично, сохраняя информацию о том, какие столбцы схемы необходимы для определенного случая использования, разработчики могут оптимизировать потоки обработки данных, снизить затраты на хранение и улучшить производительность запросов.

На уровне схемы пользователи могут хранить информацию, помогающую категоризировать различные схемы во всем приложении. Эти метаданные могут включать детали, такие как назначение схемы, источник данных или диапазон дат. Это может быть особенно полезно для управления сложными рабочими процессами обработки данных, где используются несколько схем для разных целей и их необходимо эффективно отслеживать и управлять.

класс PanderaSchema(DataFrameModel):
       """Класс схемы Pandera"""
       id: T.IntegerType() = Field(
           gt=5,
           metadata={"usecase": ["Розничноеценообразование", "Потребительское_поведение"],
              "категория": "ценообразование_товаров"},
       )
       наименование_товара: T.StringType() = Field(str_startswith="B")
       цена: T.DecimalType(20, 5) = Field()


       class Config:
           """Конфигурация класса pandera"""
           name = "информация_о_товаре"
           strict = True
           coerce = True
           metadata = {"категория": "подробности_товара"}

 

В приведенном выше примере мы внесли дополнительную информацию о самом объекте схемы. Это разрешено на 2 уровнях: поле и схема.

Чтобы извлечь метаданные на уровне схемы (включая все поля в ней), мы предоставляем вспомогательные функции:

PanderaSchema.get_metadata()
Вывод будет представлен в виде словарного объекта следующего вида:
{
       "информация_о_товаре": {
           "колонки": {
               "id": {"usecase": ["Розничноеценообразование", "Потребительское_поведение"],
                      "категория": "ценообразование_товаров"},
               "наименование_товара": None,
               "цена": None,
           },
           "dataframe": {"категория": "подробности_товара"},
       }
}

 

В настоящее время эта функция является новым концептом в версии 0.16.0 и была добавлена для PySpark SQL и Pandas.

 

Резюме

 

Мы представили несколько новых возможностей и концепций, включая переключатель Вкл/Выкл, который позволяет командам отключать проверки в производстве без изменения кода, детальное управление потоком валидации Pandera и возможность сохранения дополнительных метаданных на уровне столбцов и dataframe. Более подробную информацию вы можете найти в обновленной документации Pandera для версии 0.16.0.

Как объяснил основатель Pandera, Нилс Бантилан, в недавней статье о выпуске Pandera 0.16.0:

 

Чтобы продемонстрировать расширяемость Pandera с новой спецификацией схемы и API бэкэнда, мы сотрудничали с командой QuantumBlack для реализации схемы и бэкэнда для Pyspark SQL … и мы завершили МВП в течение нескольких месяцев!

 

Этот недавний вклад в открытый код Pandera будет полезен командам, работающим с PySpark и другими технологиями больших данных.

За этот недавний вклад в кодовую базу Pandera отвечают следующие члены команды QuantumBlack, AI by McKinsey: Исмаил Негм-ПАРИ, Нирадж Малхотра, Джаскаран Сингх Сидана, Каспер Янехаг, Олександр Лазарчук. Особую благодарность хотелось бы выразить Нираджу за помощь в подготовке этой статьи к публикации.     Джо Стичбери – опытный технический писатель. Он пишет о науке о данных и анализе, искусственном интеллекте и программном обеспечении.