Мониторинг данных и моделей в операциях авиакомпаний с использованием Evidently & Streamlit в производстве

Мониторинг данных и моделей в авиационных операциях с использованием Evidently & Streamlit в производстве авиакомпаний

Введение

Вы когда-нибудь испытывали разочарование от того, что хорошо работающая модель при обучении и оценке проявляет себя хуже в производственной среде? Это общая проблема, с которой сталкиваются на этапе производства, и именно здесь вступает в игру удивительный инструмент с открытым исходным кодом Evidently.ai, который делает нашу модель машинного обучения наблюдаемой и легко отслеживаемой. В этом руководстве мы рассмотрим причины изменений в данных и производительности модели в производственной среде, а также необходимые действия для их внедрения. Мы также научимся интегрировать этот инструмент с приложением прогнозирования Streamlit. Давайте начнем наше замечательное путешествие.

Эта статья была опубликована в рамках Датасаин Блогатон.

Необходимые предпосылки

1) Клонировать репозиторий

git clone "https://github.com/VishalKumar-S/Flight-Delay-Prediction-and-live-Monitoring-with-Azure-Evidently-and-Streamlit-with-MVC-Architecture.git"

2) Создать и активировать виртуальное окружение

# Создать виртуальное окружениеpython3 -m venv venv# Активировать виртуальное окружение в вашей папке с проектомsource venv/bin/activate

# Эта команда устанавливает пакеты Python, указанные в файле requirements.txt.pip install -r requirements.txt

3) Установить Streamlit и Evidently

pip install streamlitpip install evidently

Структура проекта:

project_root/│├── assets/│├── data/│   └── Monitoring_data.csv│├── models/│   ├── best_model.pkl│   ├── lightgml_model.pkl│   ├── linear_regression_model.pkl│   ├── random_forest_model.pkl│   ├── ridge_regression.pkl│   ├── svm_model.pkl│   └── xgboost_model.pkl│├── notebooks/│   └── EDA.ipynb│├── src/│   ├── controller/│   │   └── flight_delay_controller.py│   ││   ├── model/│   │   └── flight_delay_model.py│   ││   ├── view/│   │   └── flight_delay_view.py│   ││   ├── data_preprocessing.py│   ├── model_evaluation.py│   └── modeling.py│├── .gitignore├── Dockerfile├── LICENSE├── Readme.md├── app.py└── requirements.txt

Предварительная обработка данных

Загрузка из облака

В этом проекте мы будем получать набор данных из Azure. Сначала мы создадим контейнер хранения в Azure, затем блок-хранилище, затем загрузим наши исходные данные туда, сделаем их общедоступными и будем использовать для будущих шагов предварительной обработки данных.

Ссылка на набор данных: https://www.kaggle.com/datasets/giovamata/airlinedelaycauses

Вот код для загрузки из облака:

class DataPreprocessorTemplate:    """    Шаблонный метод для предварительной обработки данных с настраиваемыми шагами.    """    def __init__(self, data_url):        """        Инициализация шаблона DataPreprocessor с URL данных, включая токен SAS.        Args:            data_url (str): URL данных с токеном SAS.        """        self.data_url = data_url    def fetch_data(self):        """        Получение данных из Azure Blob Storage.        Returns:            pd.DataFrame: Полученный набор данных в виде фрейма данных Pandas.        """        try:            # Получение набора данных по предоставленному URL            print("Получение данных из облака...")            data = pd.read_csv(self.data_url)            return data        except Exception as e:            raise Exception("Произошла ошибка при получении данных: " + str(e))def main():  # URL данных с токеном SAS  data_url = "https://flightdelay.blob.core.windows.net/flight-delayed-dataset/DelayedFlights.csv"  output_path = "../data/cleaned_flight_delays.csv"  data_preprocessor = DataPreprocessorTemplate(data_url)  data = data_preprocessor.fetch_data()  cleaned_data=data_preprocessor.clean_data(data)  data_preprocessor.save_cleaned_data(cleaned_data, output_path)    

Голубое изображение:

Очистка и преобразование данных

В мире данных преобразование данных превращает сырые данные в полированные. Здесь мы приступим ко всем этапам предварительной обработки данных, таким как удаление ненужных функций, заполнение пропущенных значений, кодирование категориальных столбцов и удаление выбросов. Здесь мы использовали кодирование с использованием фиктивных переменных. Затем мы удалим выбросы, используя тест Z-оценки.

Код фрагмента очистки данных:

def clean_data(self,df):        """        Очистка и предварительная обработка входных данных.        Args:            df (pd.DataFrame): Входной набор данных.        Returns:            pd.DataFrame: Очищенный и предобработанный набор данных.        """        print("Очистка данных...")        df=self.remove_features(df)        df=self.impute_missing_values(df)        df=self.encode_categorical_features(df)        df=self.remove_outliers(df)        return df    def remove_features(self,df):        """        Удаление ненужных столбцов из набора данных.        Args:            df (pd.DataFrame): Входной набор данных.        Returns:            pd.DataFrame: Набор данных с удаленными ненужными столбцами.        """        print("Удаление ненужных столбцов...")        df=df.drop(['Unnamed: 0','Year','CancellationCode','TailNum','Diverted','Cancelled','ArrTime','ActualElapsedTime'],axis=1)        return df    def impute_missing_values(self,df):        """        Восстановление пропущенных значений в наборе данных.        Args:            df (pd.DataFrame): Входной набор данных.        Returns:            pd.DataFrame: Набор данных с заполненными пропущенными значениями.        """        print("Восстановление пропущенных значений...")        delay_colns=['CarrierDelay', 'WeatherDelay', 'NASDelay', 'SecurityDelay', 'LateAircraftDelay']        # Восстановление пропущенных значений 0 для этих столбцов        df[delay_colns]=df[delay_colns].fillna(0)        # Восстановление пропущенных значений медианы для этих столбцов        columns_to_impute = ['AirTime', 'ArrDelay', 'TaxiIn','CRSElapsedTime']        df[columns_to_impute]=df[columns_to_impute].fillna(df[columns_to_impute].median())        return df    def encode_categorical_features(self,df):        """        Кодирование категориальных функций в наборе данных.        Args:            df (pd.DataFrame): Входной набор данных.        Returns:            pd.DataFrame: Набор данных с закодированными категориальными функциями.        """        print("Кодирование категориальных функций...")        df=pd.get_dummies(df,columns=['UniqueCarrier', 'Origin', 'Dest'], drop_first=True)        return df    def remove_outliers(self,df):        """        Удаление выбросов из набора данных.        Args:            df (pd.DataFrame): Входной набор данных.        Returns:            pd.DataFrame: Набор данных с удаленными выбросами.        """        print("Удаление выбросов...")        z_threshold=3        z_scores=np.abs(stats.zscore(df[self.numerical_columns]))        outliers=np.where(z_scores>z_threshold)        df_no_outliers=df[(z_scores<=z_threshold).all(axis=1)]        print("Shape after data cleaning:", df_no_outliers.shape)        return df_no_outliers

Затем мы сохраняем очищенный набор данных для нашего будущего обучения и оценки моделей. Мы используем joblib для сохранения очищенного набора данных.

Вот код фрагмента:

def save_cleaned_data(self,cleaned_data, output_path):    """    Сохранение очищенных данных в CSV-файле.    Args:        cleaned_data (pd.DataFrame): Очищенный набор данных.        output_path (str): Путь для сохранения очищенных данных.    """    print("Сохранение очищенных данных...")                cleaned_data.to_csv(output_path,index=False)

Обучение и оценка

После очистки данных мы разделим наш набор данных на две части – обучающую и тестовую. Затем мы обучим несколько моделей регрессии, таких как Линейная регрессия, Случайный лес регрессор, xgboost, регрессия гребня и модели lightgbm. Затем мы сохраняем все файлы в формате .pkl с использованием joblib, что сокращает время и оптимизирует использование дополнительных ресурсов.

Теперь мы можем использовать этот обученный файл модели для оценки и прогнозирования. Затем мы оценим модель на основе метрик оценки, таких как коэффициент детерминации R2, MAE (среднее абсолютное значение ошибки) и RMSE (корень из среднеквадратичной ошибки). Затем сохраняем модель с лучшей производительностью, чтобы использовать ее для развертывания.

Фрагмент кода для обучения модели:

# Функция для создания моделей машинного обученияdef create_model(model_name):    if model_name == "random_forest":        return RandomForestRegressor(n_estimators=50, random_state=42)    elif model_name == "linear_regression":        return LinearRegression()    elif model_name == "xgboost":        return xgb.XGBRegressor()    elif model_name == "ridge_regression":        return Ridge(alpha=1.0)  # Настройте alpha по необходимости    elif model_name == "lightgbm":        return lgb.LGBMRegressor()# Загрузка очищенного набора данныхprint("Началась загрузка модели...")cleaned_data = pd.read_csv("../data/cleaned_flight_delays.csv")print("Загрузка модели завершена")# Определение целевой переменной (ArrDelay) и функций (X)target_variable = "ArrDelay"X = cleaned_data.drop(columns=[target_variable])y = cleaned_data[target_variable]# Разделение данных на тренировочные и тестовые наборыprint("Разделение данных на тренировочные и тестовые наборы...")X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)print("Разделение данных завершено.")

# Функция для обучения и сохранения моделиdef train_and_save_model(model_name, X_train, y_train):    model = create_model(model_name)    print(f"Обучение модели {model_name}...")    start_time = time.time()    model.fit(X_train, y_train)    print("Обучение модели завершено...")    end_time = time.time()    elapsed_time = end_time - start_time    print(f"Время обучения: {elapsed_time:.2f} секунд")    # Сохранение обученной модели для последующего использования    joblib.dump(model, f"../models/{model_name}_model.pkl")    print(f"Модель {model_name} сохранена как {model_name}_model.pkl")# Создание и обучение модели Random Foresttrain_and_save_model("random_forest", X_train, y_train)# Обучение модели линейной регрессиитrain_and_save_model("linear_regression", X_train, y_train)# Создание и обучение модели XGBoosttrain_and_save_model("xgboost", X_train, y_train)# Создание и обучение модели гребневой регрессииtrain_and_save_model("ridge_regression", X_train, y_train)# Создание и обучение модели LightGBMtrain_and_save_model("lightgbm", X_train, y_train)

Фрагмент кода для оценки модели

# Загрузка очищенного набора данныхcleaned_data = pd.read_csv("../data/cleaned_flight_delays.csv")# Загрузка обученных моделей машинного обученияrandom_forest_model = joblib.load("../models/random_forest_model.pkl")linear_regression_model = joblib.load("../models/linear_regression_model.pkl")xgboost_model = joblib.load("../models/xgboost_model.pkl")ridge_regression_model = joblib.load("../models/ridge_regression_model.pkl")lightgbm_model = joblib.load("../models/lightgbm_model.pkl")# Определение целевой переменной (ArrDelay) и функций (X)target_variable = "ArrDelay"X = cleaned_data.drop(columns=[target_variable])y = cleaned_data[target_variable]# Разделение данных на тренировочные и тестовые наборыX_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)# Определение функции для оценки моделейdef evaluate_model(model, X_test, y_test):    y_pred = model.predict(X_test)    mae = mean_absolute_error(y_test, y_pred)    mse = mean_squared_error(y_test, y_pred)    r2 = r2_score(y_test, y_pred)    return mae, mse, r2# Создание словаря моделей для оценкиmodels = {    "Random Forest": random_forest_model,    "Linear Regression": linear_regression_model,    "XGBoost": xgboost_model,    "Ridge Regression": ridge_regression_model,    "LightGBM": lightgbm_model,}# Оценка каждой модели и сохранение их метрикmetrics = {}for model_name, model in models.items():    mae, mse, r2 = evaluate_model(model, X_test, y_test)    metrics[model_name] = {"MAE": mae, "MSE": mse, "R2": r2}# Вывод метрик оценки для всех моделейfor model_name, model_metrics in metrics.items():    print(f"Метрики для {model_name}:")    print(f"Средняя абсолютная ошибка (MAE): {model_metrics['MAE']:.2f}")    print(f"Средняя квадратичная ошибка (MSE): {model_metrics['MSE']:.2f}")    print(f"Коэффициент детерминации (R2): {model_metrics['R2']:.2f}")    print()

После оценки мы выберем лучшую модель развертывания.

Кодовый фрагмент для сохранения и печати лучшей модели:

# Находим лучшую модель на основе коэффициента R2
best_model = max(metrics, key=lambda model: metrics[model]["R2"])
# Выводим результаты
print(f"Лучшая модель из всех обученных моделей - {best_model} с следующими метриками:")
print(f"Средняя абсолютная ошибка (MAE): {metrics[best_model]['MAE']}")
print(f"Средняя квадратичная ошибка (MSE): {metrics[best_model]['MSE']}")
print(f"Коэффициент детерминации (R2): {metrics[best_model]['R2']}")
# Сохраняем лучшую модель для последующего использования
joblib.dump(models[best_model], "../models/best_model.pkl")
print("Лучшая модель сохранена в файле best_model.pkl")

Вывод:

Понятно: для мониторинга данных и модели

В производственной среде могут возникать различные проблемы с правильной работой модели. Некоторые из ключевых аспектов:

  1. Training-serving skew: Это происходит, когда существует значительная разница между данными, используемыми для обучения и экспериментов.
  2. Проблемы с качеством и целостностью данных: Могут возникать проблемы с обработкой данных, такие как проблемы с трубопроводами, инфраструктурные проблемы, изменение схемы данных или любые другие проблемы с данными из источника.
  3. Сломанная модель сверху: В производственной среде модели часто образуют цепочку, где вход одной модели зависит от выхода другой. Таким образом, любая проблема в выходных данных одной модели повлияет на прогнозы всей модели.

4. Постепенное изменение концепции: С течением времени может происходить изменение концепции, над которой мы работаем (или) изменение целевой переменной, поэтому требуется мониторинг. Также могут возникать неожиданные ситуации, когда мониторинг является важным, и наши прогнозы модели могут оказаться неверными. Например, любые катастрофы или стихийные бедствия.

Для оценки качества данных и модели мы используем 2 набора данных – эталонный и текущий наборы данных.

Эталонный набор данных: этот набор данных является эталоном для оценки метрик качества текущего набора данных. Хотя Evidently выбирает значения пороговых значений по умолчанию, исходя из нашего эталонного набора данных, мы также можем указать настраиваемые пороговые значения метрик в соответствии с нашими конкретными потребностями.

Текущий набор данных: это представляет собой реальные неизвестные данные, взятые для оценки.

Мы рассчитаем отчеты о дрейфе данных, дрейфе целевой переменной, качестве данных и производительности модели с использованием этих двух наборов данных.

Отчеты будут охватывать дрейф данных, дрейф целевой переменной, качество данных и метрики производительности модели. Обычно мониторинг происходит с определенной периодичностью.

Что следует учесть перед повторным обучением модели:

1. Проверка дрейфа данных: Если обнаружен дрейф данных, рекомендуется сначала проверить качество данных и наличие внешних факторов, влияющих на дрейф, таких как эпидемии и стихийные бедствия.

2. Оценка производительности модели: Учтите производительность модели после устранения проблем с дрейфом данных. Если отчеты о данных и модели показывают дрейф, повторное обучение модели может быть хорошей идеей. Прежде чем принять это решение, рассмотрите третий пункт.

3. Соображения о повторном обучении: Повторное обучение не всегда является решением. Во многих ситуациях у нас может не быть достаточно новых данных для повторного обучения модели. Будьте осторожны при обучении на новых данных, так как есть также вероятность нестабильности и ошибочности данных по определенным причинам.

4. Настройка предупреждений о дрейфе данных: Перед установкой предупреждений о дрейфе данных проанализируйте важность конкретных данных/признаков для прогнозирования. Не все дрейфы данных важны, и не все требуют мер действий. Всегда анализируйте важность влияния каждого признака на прогнозирование.

Отчеты могут быть сгенерированы в различных форматах, таких как HTML, JPEG, JSON и т.д. Давайте изучим фрагмент кода для создания отчетов.

Фрагмент кода для отчета о производительности модели

# Создание объекта отчета о производительности регрессии
regression_performance_report = Report(metrics=[RegressionPreset()])

# Запуск отчета о производительности регрессии
regression_performance_report.run(
    reference_data=reference,  # Ссылочный набор данных для сравнения
    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # Текущий набор данных для анализа
    column_mapping=column_mapping  # Сопоставление столбцов в ссылочных и текущих наборах данных
)

# Указание пути для сохранения файла отчета о производительности модели в формате HTML
model_performance_report_path = reports_dir / 'model_performance.html'

# Сохранение отчета о производительности регрессии в файл HTML
regression_performance_report.save_html(model_performance_report_path)

Фрагмент кода для отчета о смещении цели

# Создание объекта отчета о смещении цели
target_drift_report = Report(metrics=[TargetDriftPreset()])

# Запуск отчета о смещении цели
target_drift_report.run(
    reference_data=reference,  # Ссылочный набор данных для сравнения
    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # Текущий набор данных для анализа
    column_mapping=column_mapping  # Сопоставление столбцов в ссылочных и текущих наборах данных
)

# Указание пути для сохранения файла отчета о смещении цели в формате HTML
target_drift_report_path = reports_dir / 'target_drift.html'

# Сохранение отчета о смещении цели в файл HTML
target_drift_report.save_html(target_drift_report_path)

Смещение данных

# Создание объекта сопоставления столбцов
column_mapping = ColumnMapping()
column_mapping.numerical_features = numerical_features  # Определение числовых признаков для сопоставления

# Создание объекта отчета о смещении данных
data_drift_report = Report(metrics=[DataDriftPreset()])

# Запуск отчета о смещении данных
data_drift_report.run(
    reference_data=reference,  # Ссылочный набор данных для сравнения
    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # Текущий набор данных для анализа
    column_mapping=column_mapping  # Сопоставление числовых признаков в ссылочных и текущих наборах данных
)

# Указание пути для сохранения файла отчета о смещении данных в формате HTML
data_drift_report_path = reports_dir / 'data_drift.html'

# Сохранение отчета о смещении данных в файл HTML
data_drift_report.save_html(data_drift_report_path)

Фрагмент кода для отчета о качестве данных

# Создание объекта сопоставления столбцов
column_mapping = ColumnMapping()
column_mapping.numerical_features = numerical_features  # Определение числовых признаков для сопоставления

# Создание объекта отчета о качестве данных
data_quality_report = Report(metrics=[DataQualityPreset()])

# Запуск отчета о качестве данных
data_quality_report.run(
    reference_data=reference,  # Ссылочный набор данных для сравнения
    current_data=current.loc[CUR_WEEK_START:CUR_WEEK_END],  # Текущий набор данных для анализа
    column_mapping=column_mapping  # Сопоставление числовых признаков в ссылочных и текущих наборах данных
)

# Указание пути для сохранения файла отчета о качестве данных в формате HTML
data_quality_report_path = reports_dir / 'data_quality.html'

# Сохранение отчета о качестве данных в файл HTML
data_quality_report.save_html(data_quality_report_path)

Изучите архитектуру Model-View-Controller (MVC)

Давайте узнаем о архитектуре Model-View-Controller (MVC) в этом разделе:

Архитектура Model-View-Controller (MVC) – это шаблон проектирования, используемый в веб-приложениях. Он используется для упрощения сложности кода и повышения его читаемости. Давайте рассмотрим его компоненты.

Model: Логика

Компонент Model представляет собой ядро логики нашего приложения. Он обрабатывает обработку данных и взаимодействие с моделями машинного обучения. Скрипт Model находится в каталоге src/model/.

View: Создание пользовательских интерфейсов

Компонент View отвечает за создание пользовательского интерфейса. Он взаимодействует с пользователем и отображает информацию. Скрипт View находится в каталоге src/view/.

Controller: Оркестрирование моста

Компонент Controller выступает в качестве промежуточного моста между компонентами Model и View. Он обрабатывает ввод пользователя и запросы. Скрипт Controller находится в каталоге src/controller/.

Диаграмма MVC:

Ниже приведено визуальное представление архитектуры MVC:

Давайте рассмотрим фрагмент кода для прогнозирования задержек в архитектуре MVC:

1) Модель (flight_delay_model.py):

# Импорт необходимых библиотек и модулейimport pandas as pdimport joblibimport xgboost as xgbfrom typing import Dictclass FlightDelayModel:    def __init__(self, model_file="models/best_model.pkl"):        """        Инициализация модели FlightDelay.        Args:            model_file (str): Путь к обученному файлу модели.        """        # Загрузка предварительно обученной модели из предоставленного файла        self.model = joblib.load(model_file)    def predict_delay(self, input_data: Dict):        """        Прогнозирование задержки рейса на основе входных данных.        Args:            input_data (Dict): Словарь, содержащий входные данные для прогнозирования.        Returns:            float: Прогнозируемая задержка рейса.        """        # Преобразование словаря с входными данными в DataFrame для прогнозирования        input_df = pd.DataFrame([input_data])        # Использование предварительно обученной модели для прогнозирования        prediction = self.model.predict(input_df)        return prediction

2) Представление (flight_delay_view.py):

import streamlit as stfrom typing import Dictclass FlightDelayView:    def display_input_form(self):        """        Отображение формы ввода в боковой панели Streamlit для ввода данных пользователем.        """        st.sidebar.write("Входные значения:")        # Часть кода для создания элементов формы будет здесь    def display_selected_inputs(self, selected_data: Dict):        """        Отображение выбранных входных значений на основе ввода пользователя.        Args:            selected_data (Dict): Словарь, содержащий выбранные входные значения.        Returns:            Dict: Тот же словарь с выбранными значениями для ссылки.        """        # Часть кода для отображения выбранных входных значений будет здесь        # Отображение выбранных входных значений, таких как ползунки, числовые входы и выбор из списка        return selected_data    def display_predicted_delay(self, flight_delay):        """        Отображение прогнозируемой задержки рейса пользователю.        Args:            flight_delay: Прогнозируемое значение задержки рейса.        """        # Часть кода для отображения прогнозируемой задержки будет здесь        # Отображение прогнозируемого значения задержки рейса пользователю

3) Контроллер (flight_delay_controller.py):

import streamlit as stfrom src.view.flight_delay_view import FlightDelayViewfrom src.model.flight_delay_model import FlightDelayModelfrom typing import Dictclass FlightDelayController:    def __init__(self):        self.model = FlightDelayModel()        self.view = FlightDelayView()        self.selected_data = self.model.selected_data()    def run_prediction(self):        # Отображение формы ввода, сбор входных данных пользователем и отображение выбранных входных данных        self.view.display_input_form()        input_data = self.get_user_inputs()        self.view.display_selected_inputs(input_data)        if st.button("Прогноз задержки рейса"):            # При нажатии кнопки прогноза прогнозируем задержку рейса и отображаем результат            flight_delay = self.model.predict_delay(input_data)            self.view.display_predicted_delay(flight_delay)    def get_user_inputs(self):        # Часть кода для сбора входных данных пользователя из боковой панели Streamlit будет здесь.        user_inputs = {}        # Создание пустого словаря для хранения входных данных пользователя        # Можно использовать виджеты боковой панели Streamlit для сбора входных данных пользователя, например, st.sidebar.slider, st.sidebar.selectbox и т. д.        # Здесь вы можете добавить код для сбора входных данных пользователя и заполнения словаря 'user_inputs'.        # Пример:        # user_inputs['selected_feature'] = st.sidebar.slider("Выберите функцию", min_value, max_value, default_value)        # Повторите это для каждого ввода пользователя, который вы хотите собрать.        return user_inputs        # Возврат словаря, содержащего входные данные пользователя

Интеграция прогнозирования и мониторинга с помощью Streamlit

Здесь мы будем интегрировать мониторинг с помощью Evidently с прогнозированием с помощью Streamlit, чтобы пользователи могли делать прогнозы и отслеживать данные и модель.

Этот подход позволяет пользователям прогнозировать, мониторить или и то, и другое, плавно переходя от одного к другому согласно архитектурному шаблону MVC.

Код:

Фрагмент кода выбора ссылочного и текущего набора данных, реализованный в src/flight_delay_Controller.py

# Импорт необходимых библиотекimport streamlit as stimport pandas as pdimport timeclass FlightDelayController:    def run_monitoring(self):        # Заголовок и введение приложения Streamlit        st.title("Приложение мониторинга данных и моделей")        st.write("Вы находитесь в приложении мониторинга данных и моделей. Выберите диапазон даты и месяца в боковой панели и нажмите 'Отправить', чтобы начать обучение модели и мониторинг.")        # Позволяет пользователю выбрать предпочитаемый диапазон дат        new_start_month = st.sidebar.selectbox("Месяц начала", range(1, 12), 1)        new_end_month = st.sidebar.selectbox("Месяц окончания", range(1, 12), 1)        new_start_day = st.sidebar.selectbox("День начала", range(1, 32), 1)        new_end_day = st.sidebar.selectbox("День окончания", range(1, 32), 30)        # Если пользователь нажимает кнопку "Отправить"        if st.button("Отправить"):            st.write("Получение текущих данных...")            # Измерение времени, затраченного на получение данных            data_start = time.time()            df = pd.read_csv("data/Monitoring_data.csv")            data_end = time.time()            time_taken = data_end - data_start            st.write(f"Данные получены за {time_taken:.2f} секунд")            # Фильтрация данных на основе выбранного диапазона дат            date_range = (                (df['Месяц'] >= new_start_month) & (df['ДеньМесяца'] >= new_start_day) &                (df['Месяц'] <= new_end_month) & (df['ДеньМесяца'] <= new_end_day)            )            # Создание ссылочного и текущего набора данных из диапазона дат.            reference_data = df[~date_range]            current_data = df[date_range]

После выбора исходных и текущих наборов данных мы сгенерируем отчеты по изменению данных, качеству данных, качеству модели и изменению целевого показателя. Код для генерации отчетов разделен на 3 части в соответствии с архитектурой MVC. Давайте рассмотрим код в этих 3 файлах.

Фрагмент кода для flight_delay_Controller.py

import streamlit as stfrom src.view.flight_delay_view import FlightDelayViewfrom src.model.flight_delay_model import FlightDelayModelimport numpy as npimport pandas as pdfrom scipy import statsimport time# Контроллерclass FlightDelayController:    """    Контроллерный компонент приложения прогнозирования задержки рейсов.    Этот класс оркестрирует взаимодействие между компонентами Модель и Представление.    """    def __init__(self):        # Инициализация контроллера созданием экземпляров Модели и Представления.        self.model = FlightDelayModel()        # Создание экземпляра FlightDelayModel.        self.view = FlightDelayView()        # Создание экземпляра FlightDelayView.        self.selected_data = self.model.selected_data()        # Получение выбранных данных из модели.        self.categorical_options = self.model.categorical_features()        # Получение категориальных признаков из модели.    def run_monitoring(self):        # Функция запуска приложения мониторинга.        # Установка заголовка и краткого описания.        st.title("Приложение мониторинга данных и модели")        st.write("Вы находитесь в приложении мониторинга данных и модели. Выберите диапазон даты и месяца в боковой панели и нажмите 'Отправить', чтобы начать обучение и мониторинг модели.")        # Выбор отчетов для генерации с помощью флажков.        st.subheader("Выберите отчеты для генерации")        generate_model_report = st.checkbox("Сгенерировать отчет о производительности модели")        generate_target_drift = st.checkbox("Сгенерировать отчет об изменении целевого показателя")        generate_data_drift = st.checkbox("Сгенерировать отчет об изменении данных")        generate_data_quality = st.checkbox("Сгенерировать отчет о качестве данных")        if st.button("Отправить"):            # 'date_range' и 'df' здесь не показаны, поскольку они уже показаны в предыдущих фрагментах кода            # Справочные данные без выбранного диапазона дат.            reference_data = df[~date_range]            # Текущие данные в выбранном диапазоне дат.            current_data = df[date_range]            self.view.display_monitoring(reference_data, current_data)  # Отображение данных мониторинга.            self.model.train_model(reference_data, current_data)  # Обучение модели.            # Генерация выбранных отчетов и их отображение.            if generate_model_report:                st.write("### Отчет о производительности модели")                st.write("Генерация отчета о производительности модели...")                performance_report = self.model.performance_report(reference_data, current_data)                self.view.display_report(performance_report, "Отчет о производительности модели")            if generate_target_drift:                st.write("### Отчет об изменении целевого показателя")                st.write("Генерация отчета об изменении целевого показателя...")                target_report = self.model.target_report(reference_data, current_data)                self.view.display_report(target_report, "Отчет об изменении целевого показателя")            if generate_data_drift:                st.write("### Отчет об изменении данных")                st.write("Генерация отчета об изменении данных...")                data_drift_report = self.model.data_drift_report(reference_data, current_data)                self.view.display_report(data_drift_report, "Отчет об изменении данных")            if generate_data_quality:                st.write("### Отчет о качестве данных")                st.write("Генерация отчета о качестве данных...")                data_quality_report = self.model.data_quality_report(reference_data, current_data)                self.view.display_report(data_quality_report, "Отчет о качестве данных")

Код для Flight_delay_model.py

import pandas as pdimport joblibimport xgboost as xgbfrom evidently.pipeline.column_mapping import ColumnMappingfrom evidently.report import Reportfrom evidently.metric_preset import DataDriftPresetfrom evidently.metric_preset import TargetDriftPresetfrom evidently.metric_preset import DataQualityPresetfrom evidently.metric_preset.regression_performance import RegressionPresetimport timeimport streamlit as stfrom typing import Dict# Модельclass FlightDelayModel:    """    Компонент Модель для приложения прогнозирования задержки рейсов.    Этот класс отвечает за загрузку данных, загрузку моделей и прогнозирование задержки.    """    def __init__(self, model_file="models/best_model.pkl"):        # Инициализация класса FlightDelayModel        # Определение сопоставления столбцов для анализа данных        self.column_mapping = ColumnMapping()        self.column_mapping.target = self.target        self.column_mapping.prediction = 'prediction'        self.column_mapping.numerical_features = self.numerical_features    # Отчет о производительности модели    def performance_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        Генерирует отчет о производительности модели.        Аргументы:            reference_data (pd.DataFrame): Справочный набор данных для сравнения.            current_data (pd.DataFrame): Текущий набор данных с прогнозами.        Возвращает:            Report: Отчет, содержащий метрики производительности регрессии.        """        regression_performance_report = Report(metrics=[RegressionPreset()])        regression_performance_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return regression_performance_report    def target_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        Генерирует отчет для анализа изменения целевого показателя.        Аргументы:            reference_data (pd.DataFrame): Справочный набор данных для сравнения.            current_data (pd.DataFrame): Текущий набор данных с прогнозами.        Возвращает:            Report: Отчет, содержащий метрики изменения целевого показателя.        """        target_drift_report = Report(metrics=[TargetDriftPreset()])        target_drift_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return target_drift_report    def data_drift_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        Генерирует отчет для анализа изменения данных.        Аргументы:            reference_data (pd.DataFrame): Справочный набор данных для сравнения.            current_data (pd.DataFrame): Текущий набор данных с прогнозами.        Возвращает:            Report: Отчет, содержащий метрики изменения данных.        """        data_drift_report = Report(metrics=[DataDriftPreset()])        data_drift_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return data_drift_report    def data_quality_report(self, reference_data: pd.DataFrame, current_data: pd.DataFrame):        """        Генерирует отчет о качестве данных (примечание: это может занять около 10 минут).        Аргументы:            reference_data (pd.DataFrame): Справочный набор данных для сравнения.            current_data (pd.DataFrame): Текущий набор данных с прогнозами.        Возвращает:            Report: Отчет, содержащий метрики качества данных.        """        st.write("Генерация отчета о качестве данных займет больше времени, около 10 минут, из-за его тщательного анализа. Вы можете подождать или изучить другие отчеты, если у вас не хватает времени.")        data_quality_report = Report(metrics=[DataQualityPreset()])        data_quality_report.run(            reference_data=reference_data,            current_data=current_data,            column_mapping=self.column_mapping        )        return data_quality_report

 Код Flight_delay_view.py

# Импорт необходимых библиотек
import streamlit as st
import pandas as pd

# Определение класса для представления задержки рейса
class FlightDelayView:
    """
    Компонент представления для приложения прогнозирования задержки рейса.
    Этот класс отвечает за отображение веб-приложения Streamlit.
    """

    @staticmethod
    def display_input_form():
        """
        Отображает форму ввода на веб-приложении Streamlit.
        """
        st.title("Приложение прогнозирования задержки рейса")
        st.write("Это приложение прогнозирует продолжительность задержки рейса в минутах.")
        st.sidebar.header("Вход пользователей")

    @staticmethod
    def display_monitoring(reference_data, current_data):
        """
        Отображает информацию о мониторинге.
        Аргументы:
            reference_data (DataFrame): Ссылочный набор данных.
            current_data (DataFrame): Текущий набор данных.
        """
        st.write("Пожалуйста, прокрутите вниз, чтобы увидеть ваш отчет")
        st.write("Форма набора данных:", reference_data.shape)
        st.write("Текущая форма набора данных:", current_data.shape)
        # Информация о тренировке модели
        st.write("### Модель обучается ...")

    @staticmethod
    def display_selected_inputs(selected_data):
        """
        Отображает выбранные пользователем входные данные.
        Аргументы:
            selected_data (dict): Входные данные, предоставленные пользователем.
        """
        input_data = pd.DataFrame([selected_data])
        st.write("Выбранные входные данные:")
        st.write(input_data)
        return input_data

    @staticmethod
    def display_predicted_delay(flight_delay):
        """
        Отображает предсказанную задержку рейса.
        Аргументы:
            flight_delay (float): Предсказанная задержка рейса в минутах.
        """
        st.write("Предсказанная задержка рейса (в минутах):", round(flight_delay[0], 2))

    @staticmethod
    def display_report(report, report_name: str):
        """
        Отображает отчет, сгенерированный Evidently.
        Аргументы:
            report (Report): Отчет Evidently для отображения.
            report_name (str): Название отчета (например, "Отчет о производительности модели").
        """
        st.write(f"{report_name}")
        # Отображение отчета Evidently в виде HTML с включенной прокруткой
        st.components.v1.html(report.get_html(), height=1000, scrolling=True)

Теперь появляется окончательное приложение Streamlit, в котором мы объединили прогнозирование и мониторинг.

app.py

import streamlit as st
import pandas as pd
from src.controller.flight_delay_controller import FlightDelayController

def main():
    st.set_page_config(page_title="Приложение прогнозирования задержки рейса", layout="wide")
    
    # Инициализация контроллера
    controller = FlightDelayController()
    
    # Создание приложения Streamlit
    st.sidebar.title("Приложение прогнозирования задержки рейса и мониторинга данных и модели")
    choice = st.sidebar.radio("Выберите опцию:", ("Cделать прогнозы", "Мониторинг данных и модели"))
    
    if choice == "Cделать прогнозы":
        controller.run_prediction()
    elif choice == "Мониторинг данных и модели":
        controller.run_monitoring()

if __name__ == '__main__':
    main()

Чтобы запустить приложение Streamlit, выполните

# Чтобы запустить приложение, выполните следующие команды:
streamlit run app.py

Перейдите по адресу http://localhost:8501 в своем веб-браузере, чтобы использовать приложение.

Панель прогнозирования:

Панель мониторинга:

Получение информации с помощью отчетов Evidently

Отчет о смещении данных:

Здесь мы можем видеть отчет о смещении данных для нашего проекта. Мы видим, что здесь смещаются 5 колонок. Поэтому нашим следующим шагом будет анализ потенциальных причин смещения этих характеристик с соответствующими экспертами в этой области.

Смещение цели:

Здесь мы видим сдвиг цели; здесь не обнаружено сдвига.

Отчет об эффективности модели

Здесь мы видим все показатели эффективности нашей модели после анализа нового набора данных. Мы можем принимать нужные решения на основе этих показателей.

Отчет о качестве данных

Обычно мы используем отчеты о качестве данных для необработанных данных. Мы можем видеть все связанные с данными показатели, такие как количество столбцов, корреляция, дублирующиеся значения и т. д. Мы также можем использовать их для проведения исследовательского анализа данных.

Интеграция Docker для развертывания:

Понимание Docker и его значение:

Важно докеризировать наш проект, чтобы он работал в любой среде без проблем зависимости. Docker – это важный инструмент для упаковки и распространения приложений. Вот шаги для установки и использования Docker для этого проекта.

Dockerfile:

Dockerfile в нашем каталоге проекта содержит инструкции для создания образа Docker. Напишите Dockerfile с правильным синтаксисом.

Структура проекта Dockerfile:

Написание эффективного Dockerfile

Важно, чтобы размер образа Docker был минимальным. Мы можем использовать такие техники, как многоэтапная сборка в Docker, чтобы уменьшить размер образа, и использовать очень легкий базовый образ для Python.

Код:

# Использовать официальный рантайм Python в качестве родительского образаFROM python:3.9-slim# Установить рабочий каталог в /appWORKDIR /app# Создать необходимые каталогиRUN mkdir -p /app/data /app/models /app/src/controller /app/src/model /app/src/view# Скопировать файлы из вашей хост-машины в контейнерCOPY app.py /app/COPY src/controller/flight_delay_controller.py /app/src/controller/COPY src/model/flight_delay_model.py /app/src/model/COPY src/view/flight_delay_view.py /app/src/view/COPY requirements.txt /app/# Создать и активировать виртуальное окружениеRUN python -m venv venvRUN /bin/bash -c "source venv/bin/activate"# Установить необходимые пакеты, указанные в requirements.txtRUN pip install -r requirements.txt# Установить wgetRUN apt-get update && apt-get install -y wget# Загрузить файл набора данных с помощью wgetRUN wget -O /app/data/DelayedFlights.csv https://flightdelay.blob.core.windows.net/flight-delayed-dataset/DelayedFlights.csv# Загрузить лучшую модель с помощью wgetRUN wget -O /app/models/best_model.pkl https://flightdelay.blob.core.windows.net/flight-delayed-dataset/best_model.pkl# Сделать порт 8501 доступным во внешнем мире этого контейнераEXPOSE 8501# Задать основную команду для запуска приложения StreamlitCMD ["streamlit", "run", "app.py"]

Построение и запуск Docker-контейнеров

Следуйте этим шагам, чтобы построить образ Docker и запустить контейнер в терминале после написания Dockerfile.

docker build -t flight-delay-prediction .docker run -p 8501:8501 flight-delay-prediction

Предоставление нашего приложения Streamlit с помощью Streamlit Sharing:

После создания нашего приложения Streamlit для совместного использования с другими людьми мы можем использовать Streamlit Sharing для хостинга наших приложений бесплатно. Вот ниже приведены шаги, которые следует выполнить:

1) Организовать проект: Убедитесь, что в корневом каталоге есть файл app.py.

2) Задать зависимости: Создайте файл requirements.txt, чтобы Streamlit sharing установил все необходимые пакеты, указанные в этом файле.

3) Использовать GitHub: Отправьте репозиторий в GitHub, чтобы он интегрировался без проблем с Streamlit sharing.

Затем зарегистрируйтесь в Streamlit sharing, вставьте URL вашего репозитория на GitHub и нажмите «развернуть», теперь будет сгенерирован URL вашего проекта. Вы можете поделиться им с другими.

Заключение

Это руководство научило нас интегрировать такие инструменты, как Streamlit, Azure, Docker и Evidently, чтобы создавать удивительные приложения на основе данных. В заключение этого руководства у вас будет достаточно знаний для создания веб-приложений с помощью Streamlit, портативных приложений с использованием Docker и обеспечения качества данных и модели с помощью Evidently. Однако это руководство предназначено для чтения и применения этих знаний в ваших будущих проектах в области науки о данных. Постарайтесь экспериментировать, инновировать и исследовать дальнейшие улучшения, которые можно сделать. Спасибо за то, что присоединились ко мне до конца этого руководства. Продолжайте учиться, действовать и расти!

Основные положения

  • Необходимо упаковать наш проект по машинному обучению в Docker, чтобы он мог работать в любой среде без проблем зависимости.
  • Для уменьшения размера образа Docker рассмотрите использование многопостроением.
  • Внедрение MVC-архитектуры в коде снижает сложность и повышает читаемость кода для сложных веб-приложений.
  • Интеграция помогает нам контролировать качество данных и модели в производственной среде.
  • После анализа всех данных и отчетов модели необходимо принять соответствующие меры.

Часто задаваемые вопросы

GitHub репозиторий: https://github.com/VishalKumar-S/Flight-Delay-Prediction-and-live-Monitoring-with-Azure-Evidently-and-Streamlit-with-MVC-Architecture