Проект прогнозирования оттока клиентов с использованием MLOps

Проект прогнозирования оттока клиентов с применением MLOps современные методы удержания и анализа

Введение

Когда мы слышим о data science, первое, что приходит на ум, это построение модели на блокнотах и тренировка данных. Но это не ситуация в реальном мире data science. В реальном мире data scientists создают модели и вводят их в продакшн. Продакшн среда имеет разрыв между разработкой, развертыванием и надежностью модели и чтобы обеспечить эффективную и масштабируемую работу, data scientists используют MLOps (Machine Learning Operations), чтобы создавать и разворачивать ML приложения в среде продакшна. В этой статье мы создадим и развернем проект прогнозирования оттока клиентов с помощью MLOps.

Цели обучения

В этой статье вы узнаете:

  • Обзор проекта
  • Мы представим ZenML и основы MLOPS.
  • Узнаете, как развернуть модель локально для прогнозирования
  • Освоите предобработку и инженерию данных, обучение и оценку модели

Эта статья была опубликована в рамках Data Science Blogathon.

Обзор проекта

Прежде всего, нам нужно понять, в чем состоит наш проект. Для этого проекта у нас есть набор данных от телекоммуникационной компании. Теперь для создания модели, которая предсказывает, будет ли пользователь продолжать пользоваться услугами компании или нет. Мы построим это ML приложение с помощью ZenML и MLFlow. Вот рабочий процесс нашего проекта:

Рабочий процесс нашего проекта

  • Сбор данных
  • Предобработка данных
  • Обучение модели
  • Оценка модели
  • Развертывание

Что такое MLOps?

MLOps – это жизненный цикл машинного обучения от начала до развертывания и последующего обслуживания. MLOps – это практика оптимизации и автоматизации всего жизненного цикла моделей машинного обучения с одновременным обеспечением масштабируемости, надежности и эффективности.

Давайте объясним это на простом примере:

Представьте, что вы строите небоскреб в вашем городе. Строительство здания завершено. Но ему не хватает электричества, воды, канализации и т.д. Небоскреб будет неработоспособным и бесполезным.

То же самое относится и к моделям машинного обучения. Если эти модели спроектированы без учета развертывания модели, масштабируемости и долгосрочного обслуживания, они могут стать неэффективными и бесполезными. Это представляет собой серьезное препятствие для data scientists при создании моделей машинного обучения для использования в среде продакшна.

MLOps – это набор лучших практик и стратегий, которые руководят производством, развертыванием и долгосрочным обслуживанием моделей машинного обучения. Он обеспечивает не только достоверные прогнозы этих моделей, но и их надежность, масштабируемость и ценность для компаний. Таким образом, без MLOps все эти задачи становятся непомерно трудоемкими, что вызывает сложности. В этом проекте мы объясним, как работает MLOps, разные этапы и приведем пример конечного проекта в области прогнозирования оттока клиентов.

Представляем ZenML

ZenML – это открытый MLOPS фреймворк, который помогает строить портативные и готовые для продакшна пайплайны. Фреймворк ZenML поможет нам выполнить этот проект, используя MLOps.

⚠️ Если вы пользователь Windows, попробуйте установить wsl на свой компьютер. Zenml не поддерживается в Windows.

Прежде чем мы перейдем к проектам.

Основные понятия MLOPS

  • Шаги: Шаги представляют собой отдельные единицы задач в конвейере или рабочем процессе. Каждый шаг представляет определенное действие или операцию, которую необходимо выполнить для разработки рабочего процесса машинного обучения. Например, очистка данных, предварительная обработка данных, обучение моделей и т. д. – эти шаги необходимы для разработки модели машинного обучения.
  • Конвейеры: Они соединяют несколько шагов вместе, чтобы создать структурированный и автоматизированный процесс для задач машинного обучения. Например, конвейер обработки данных, конвейер оценки модели и конвейер обучения модели.

Начиная с

Создайте виртуальное окружение для проекта:

conda create -n churn_prediction python=3.9

Затем установите эти библиотеки:

pip install numpy pandas matplotlib scikit-learn

После установки установите ZenML:

pip install zenml["server"]

Затем инициализируйте репозиторий ZenML.

zenml init

Если на экране появился зеленый флажок, можно продолжать работу. После инициализации в вашем каталоге будет создана папка .zenml.

Создайте папку для данных в каталоге. Получите данные по этой ссылке:

Создайте папки в соответствии с этой структурой.

Сбор данных

На этом этапе мы собираем данные из нашего файла csv. Эти данные будут использоваться для обучения модели после очистки и кодирования.

Создайте файл ingest_data.py внутри папки steps.

import pandas as pdimport numpy as npimport loggingfrom zenml import stepclass IngestData:    """    Заполнение данных в рабочем процессе.    """    def __init__(self, path:str) -> None:        """        Аргументы:            data_path(str): путь к файлу данных         """        self.path = path        def get_data(self):        df = pd.read_csv(self.path)        logging.info("Чтение csv-файла успешно завершено.")        return df    @step(enable_cache = False)def ingest_df(data_path:str) -> pd.DataFrame:    """    Шаг ZenML для загрузки данных из файла CSV.        """    try:        #Создание экземпляра класса IngestData и заполнение данных        ingest_data = IngestData(data_path)        df = ingest_data.get_data()        logging.info("Заполнение данных завершено")        return df    except Exception as e:        #Регистрация сообщения об ошибке в случае неудачи заполнения данных и возбуждение исключения        logging.error("Ошибка при заполнении данных")        raise e

Вот проект по ссылке.

В этом коде мы сначала создали класс IngestData для инкапсуляции логики заполнения данных. Затем мы создали шаг ZenML, ingest_df, который является отдельной единицей в конвейере сбора данных.

Создание файла training_pipeline.py внутри папки pipeline.

Написать код

from zenml import pipelinefrom steps.ingest_data import ingest_df#Определение конвейера ZenML с названием training_pipeline.@pipeline(enable_cache=False)def train_pipeline(data_path:str):    '''    Конвейер для обучения модели.    Аргументы:        data_path (str): Путь к данным для заполнения.    '''    df = ingest_df(data_path=data_path)

Здесь мы создаем конвейер обучения для тренировки модели машинного обучения с помощью серии шагов.

Затем создайте файл с названием run_pipeline.py в базовом каталоге для запуска конвейера.

из pipelines.training_pipeline импортировать train_pipelineif __name__ == '__main__':    # Запустить пайплайн    train_pipeline(data_path="/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv")

Этот код используется для запуска пайплайна.

Так что мы завершили этап создания пайплайна для обработки данных. Давайте запустим его.

Запустите команду в вашем терминале:

python run_pipeline.py

Затем вы увидите команды, указывающие на успешное завершение обучения пайплайна.

Предварительная обработка данных

На этом шаге мы создадим различные стратегии для очистки данных. Удалятся ненужные столбцы, а категориальные столбцы будут закодированы с использованием метода “Label encoding”. В конце данные будут разделены на обучающие и тестовые данные.

Создайте файл с названием clean_data.py в папке src.

В этом файле мы создадим классы стратегий для очистки данных.

импортировать pandas как pd
импортировать numpy как np
импортировать logging
из sklearn.model_selection импортировать train_test_split
от abc импортировать abstractmethod, ABC
из typing импортировать Union
из sklearn.preprocessing импортировать LabelEncoder
класс DataStrategy(ABC):    @abstractmethod    def handle_data(self, df:pd.DataFrame) -> Union[pd.DataFrame,pd.Series]:        pass            #Стратегия предварительной обработки данных
класс DataPreprocessing(DataStrategy):    def handle_data(self, df: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:        попробовать:            df['TotalCharges'] = df['TotalCharges'].replace(' ', 0).astype(float)            df.drop('customerID', axis=1, inplace=True)            df['Churn'] = df['Churn'].replace({'Yes': 1, 'No': 0}).astype(int)            service = ['PhoneService', 'MultipleLines', 'InternetService', 'OnlineSecurity',                       'OnlineBackup', 'DeviceProtection', 'TechSupport', 'StreamingTV',                       'StreamingMovies']            for col in service:                df[col] = df[col].replace({'No phone service': 'No', 'No internet service': 'No'})            logging.info("Длина df: ", len(df.columns))            return df        except Exception as e:            logging.error("Ошибка в предварительной обработке", e)            raise e# Стратегия кодирования признаков
класс LabelEncoding(DataStrategy):    def handle_data(self, df: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:        попробовать:            df_cat = ['gender', 'Partner', 'Dependents', 'PhoneService', 'MultipleLines',                      'InternetService', 'OnlineSecurity', 'OnlineBackup', 'DeviceProtection',                      'TechSupport', 'StreamingTV',  'StreamingMovies', 'Contract',                      'PaperlessBilling', 'PaymentMethod']            lencod = LabelEncoder()            for col in df_cat:                df[col] = lencod.fit_transform(df[col])            logging.info(df.head())            return df        except Exception as e:            logging.error(e)            raise e            # Стратегия разделения данных
класс DataDivideStrategy(DataStrategy):    def handle_data(self, df:pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:        попробовать:            X = df.drop('Churn', axis=1)            y = df['Churn']            X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)            return X_train, X_test, y_train, y_test        except Exception as e:            logging.error("Ошибка при разделении данных", e)            raise e

Этот код реализует модульный пайплайн предварительной обработки данных для машинного обучения. Он включает стратегии предварительной обработки данных, кодирования признаков и разделения данных для очистки данных для прогнозной модели.

1. DataPreprocessing: Этот класс отвечает за удаление ненужных столбцов и обработку отсутствующих значений (NA значений) в наборе данных.

2. LabelEncoding: Класс LabelEncoding предназначен для преобразования категориальных переменных в числовой формат, с которым могут эффективно работать алгоритмы машинного обучения. Он преобразует текстовые категории в числовые значения.

3. DataDivideStrategy: Этот класс разделяет набор данных на независимые переменные (X) и зависимые переменные (y). Затем данные разделяются на обучающие и тестовые наборы.

Мы будем поэтапно реализовывать их, чтобы подготовить наши данные для задач машинного обучения.

Эти стратегии гарантируют, что данные структурированы и форматированы правильно для обучения и оценки модели.

Создайте data_cleaning.py в папке steps.

import pandas as pd
import numpy as np
from src.clean_data import DataPreprocessing, DataDivideStrategy, LabelEncoding
import logging
from typing_extensions import Annotated
from typing import Tuple
from zenml import step

# Определите шаг ZenML для очистки и предобработки данных
@step(enable_cache=False)
def cleaning_data(df: pd.DataFrame) -> Tuple[
    Annotated[pd.DataFrame, "X_train"],
    Annotated[pd.DataFrame, "X_test"],
    Annotated[pd.Series, "y_train"],
    Annotated[pd.Series, "y_test"],
]:
    try:
        # Создание объекта стратегии предобработки данных
        data_preprocessing = DataPreprocessing()
        
        # Применение предобработки данных к исходному DataFrame
        data = data_preprocessing.handle_data(df)
        
        # Создание объекта стратегии кодирования меток
        feature_encode = LabelEncoding()
        
        # Применение кодирования меток к предобработанным данным
        df_encoded = feature_encode.handle_data(data)
        
        # Журналирование информации о столбцах DataFrame
        logging.info(df_encoded.columns)
        logging.info("Столбцов:", len(df_encoded))
        
        # Создание объекта стратегии разделения данных
        split_data = DataDivideStrategy()
        
        # Разделение закодированных данных на обучающую и тестовую выборки
        X_train, X_test, y_train, y_test = split_data.handle_data(df_encoded)
        
        # Возврат разделенных данных в виде кортежа
        return X_train, X_test, y_train, y_test
      
    except Exception as e:
        # Обработка и журналирование любых ошибок, возникающих в процессе очистки данных
        logging.error("Ошибка в шаге очистки данных", e)
        raise e

На этом шаге мы реализовали созданные нами стратегии в clean_data.py

Теперь реализуем этот шаг в training_pipeline.py

from zenml import pipeline
from steps.ingest_data import ingest_df
from steps.data_cleaning import cleaning_data
import logging

# Определение конвейера ZenML с названием training_pipeline
@pipeline(enable_cache=False)
def train_pipeline(data_path:str):
    '''
    Конвейер данных для обучения модели.
    '''
    df = ingest_df(data_path=data_path)
    X_train, X_test, y_train, y_test = cleaning_data(df=df)

Мы завершили этап предобработки данных в конвейере обучения модели.

Обучение модели

Теперь мы собираемся создать модель для этого проекта. Здесь мы предсказываем бинарную классификацию. Мы можем использовать логистическую регрессию. Наша основная цель – не точность модели, а MLOps.

Для тех, кто не знаком с логистической регрессией, вы можете прочитать об этом здесь. Мы применим те же шаги, что и на предыдущем этапе предобработки данных. Сначала создадим файл training_model.py в папке src.

import pandas as pd
from sklearn.linear_model import LogisticRegression
from abc import ABC, abstractmethod
import logging

# Абстрактный класс модели
class Model(ABC):
    @abstractmethod
    def train(self,X_train:pd.DataFrame,y_train:pd.Series):
        """
        Обучение модели на данных
        """
        pass

class LogisticReg(Model):
    """
    Реализация модели логистической регрессии.
    """
    def train(self, X_train: pd.DataFrame, y_train: pd.Series):
        """
        Обучение модели
        Args:
            X_train: pd.DataFrame,
            y_train: pd.Series
        """
        logistic_reg = LogisticRegression()
        logistic_reg.fit(X_train,y_train)
        return logistic_reg

Мы определяем абстрактный класс Model с методом ‘train’, который должны реализовывать все модели. Класс LogisticReg является конкретной реализацией с использованием логистической регрессии. Следующим шагом является настройка файла с именем config.py в папке steps. Создайте файл с именем config.py в папке steps.

Настройка параметров модели

from zenml.steps import BaseParameters

"""Этот файл используется для настройки и указания различных параметров, связанных с вашей моделью машинного обучения и процессом обучения"""
class ModelName(BaseParameters):
    """
    Конфигурации модели
    """
    model_name: str = "логистическая регрессия"

В файле с именем config.py, в папке steps, вы настраиваете параметры, связанные с вашей моделью машинного обучения. Вы создаете класс ModelName, который наследует от BaseParameters для указания названия модели. Это упрощает изменение типа модели.

import logging
import pandas as pd
from src.training_model import LogisticReg
from zenml import step
from .config import ModelName

# Определение шага под названием train_model
@step(enable_cache=False)
def train_model(X_train: pd.DataFrame, y_train: pd.Series, config: ModelName):
    """Обучает данные на основе настроенной модели"""
    try:
        model = None
        if config == "логистическая регрессия":
            model = LogisticReg()
        else:
            raise ValueError("Неподдерживаемое название модели")

        trained_model = model.train(X_train=X_train, y_train=y_train)
        return trained_model
    
    except Exception as e:
        logging.error("Ошибка при обучении модели:", e)
        raise e

В файле с названием model_train.py в папке steps определите шаг с названием train_model, используя ZenML. Цель этого шага – обучить модель машинного обучения на основе названия модели в ModelName.

В программе

Проверьте название настроенной модели. Если это “логистическая регрессия”, мы создаем экземпляр модели LogisticReg и обучаем ее с использованием предоставленных данных для обучения (X_train и y_train). Если название модели не поддерживается, вызывается ошибка. Любые ошибки в процессе обработки записываются в журнал, и ошибка вызывается.

После этого мы собираемся реализовать этот шаг в файле training_pipeline.py

from zenml import pipeline
from steps.ingest_data import ingest_df
from steps.data_cleaning import cleaning_data
from steps.model_train import train_model
import logging

# Определение конвейера ZenML с названием training_pipeline.
@pipeline(enable_cache=False)
def train_pipeline(data_path: str):
    '''Пиплайн для обучения модели данных.'''
    # Шаг загрузки данных: возвращает данные.
    df = ingest_df(data_path=data_path)
    
    # Шаг очистки данных.
    X_train, X_test, y_train, y_test = cleaning_data(df=df)
    
    # Обучение модели.
    model = train_model(X_train=X_train, y_train=y_train)

Теперь мы реализовали шаг train_model в конвейере. Таким образом, шаг model_train.py завершен.

Оценка модели

На этом шаге мы будем оценивать эффективность нашей модели. Для этого мы будем проверять оценку точности по предсказаниям на тестовых данных. Итак, сначала мы собираемся создать стратегии, которые мы будем использовать в конвейере.

Создайте файл с названием evaluate_model.py в папке src.

import logging
from sklearn.metrics import confusion_matrix, classification_report, accuracy_score
from abc import ABC, abstractmethod
import numpy as np

# Абстрактный класс для оценки модели
class Evaluate(ABC):
    @abstractmethod
    def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        """Абстрактный метод для оценки производительности модели машинного обучения.
        Аргументы:
            y_true (np.ndarray): Истинные метки.
            y_pred (np.ndarray): Предсказанные метки.
        Возвращает:
            float: Результат оценки.
        """
        pass

#Класс для расчета оценки точности
class Accuracy_score(Evaluate):
    """Рассчитывает и возвращает оценку точности для предсказаний модели.
    """
    def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        try:
            accuracy_scr = accuracy_score(y_true=y_true, y_pred=y_pred) * 100
            logging.info("Оценка точности:", accuracy_scr)
            return accuracy_scr
        except Exception as e:
            logging.error("Ошибка при оценке точности модели", e)
            raise e

#Класс для расчета оценки точности
class Precision_Score(Evaluate):
    def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray) -> float:
        """Рассчитывает и возвращает оценку точности для предсказаний модели.
        """
        try:
            precision = precision_score(y_true=y_true, y_pred=y_pred)
            logging.info("Оценка точности:", precision)
            return float(precision)
        except Exception as e:
            logging.error("Ошибка при расчете оценки точности", e)
            raise e

#Класс для расчета оценки F1-меры
class F1_Score(Evaluate):
    def evaluate_model(self, y_true: np.ndarray, y_pred: np.ndarray):
        """Рассчитывает и возвращает оценку F1-меры для предсказаний модели.
        """
        try:
            f1_scr = f1_score(y_pred=y_pred, y_true=y_true)
            logging.info("Оценка F1-меры:", f1_scr)
            return f1_scr
        except Exception as e:
            logging.error("Ошибка при расчете оценки F1-меры", e)
            raise e
        

Теперь, когда мы разработали стратегии оценки, мы будем использовать их для оценки модели. Реализуем код в файле step evaluate_model.py в папке steps. Здесь оценка точности, оценка попадания и оценка точности – это стратегии, которые мы используем в качестве метрик для оценки модели.

Реализуем их в шагах. Создайте файл с именем evaluation.py в steps:

import loggingimport pandas as pdimport numpy as npfrom zenml import stepfrom src.evaluate_model import ClassificationReport, ConfusionMatrix, Accuracy_scorefrom typing import Tuplefrom typing_extensions import Annotatedfrom sklearn.base import ClassifierMixin@step(enable_cache=False)def evaluate_model(    model: ClassifierMixin,    X_test: pd.DataFrame,    y_test: pd.Series) -> Tuple[    Annotated[np.ndarray,"confusion_matix"],    Annotated[str,"classification_report"],    Annotated[float,"accuracy_score"],    Annotated[float,"precision_score"],    Annotated[float,"recall_score"]    ]:    """    Оценить производительность модели машинного обучения с использованием общих метрик.    """    try:        y_pred =  model.predict(X_test)                                precision_score_class = Precision_Score()        precision_score = precision_score_class.evaluate_model(y_pred=y_pred,y_true=y_test)        mlflow.log_metric("Precision_score ",precision_score)                                               accuracy_score_class = Accuracy_score()        accuracy_score = accuracy_score_class.evaluate_model(y_true=y_test, y_pred=y_pred)        logging.info("accuracy_score:",accuracy_score)                         return accuracy_score, precision_score        except Exception as e:        logging.error("Error in evaluating model",e)        raise e

Теперь реализуем этот шаг в конвейере. Обновим training_pipeline.py:

Этот код определяет шаг evaluate_model в конвейере машинного обучения. Он принимает обученную модель классификации (model), независимые тестовые данные (X_test) и истинные метки для тестовых данных (y_test) в качестве входных данных. Затем он оценивает производительность модели с использованием общих метрик классификации и возвращает результаты, такие как precision_score и accuracy_score.

Теперь реализуем этот шаг в конвейере. Обновим training_pipeline.py:

from zenml import pipelinefrom steps.ingest_data import ingest_dffrom steps.data_cleaning import cleaning_datafrom steps.model_train import train_modelfrom steps.evaluation import evaluate_modelimport logging#Определение конвейера ZenML с именем training_pipeline.@pipeline(enable_cache=False)def train_pipeline(data_path:str):    '''    Конвейер данных для обучения модели.    Args:        data_path (str): Путь к данным для ввода в систему.    '''    #шаг ввода данных: возвращает данные.    df = ingest_df(data_path=data_path)    #шаг очистки данных.    X_train, X_test, y_train, y_test = cleaning_data(df=df)    #обучение модели    model = train_model(X_train=X_train,y_train=y_train)    #Оценка метрик данных    accuracy_score, precision_score = evaluate_model(model=model,X_test=X_test, y_test=y_test)

Вот и все. Теперь мы завершили обучение конвейера. Запустить

python run_pipeline.py

В терминале. Если он успешно запустится. Теперь, когда мы завершили запуск конвейера обучения локально, это будет выглядеть так:

Что такое трекер экспериментов?

Трекер экспериментов – это инструмент в машинном обучении, используемый для записи, мониторинга и управления различными экспериментами в процессе разработки машинного обучения.

Ученые-исследователи экспериментируют с разными моделями, чтобы получить лучшие результаты. Поэтому им необходимо вести учет данных и использовать разные модели. Если они делают это вручную, используя таблицу Excel, это будет очень сложно для них.

MLflow

MLflow – это ценный инструмент для эффективного отслеживания и управления экспериментами в машинном обучении. Он автоматизирует отслеживание экспериментов, мониторинг итераций моделей и связанных данных. Это упрощает процесс разработки модели и обеспечивает пользовательский интерфейс для визуализации результатов.

Интеграция MLflow с ZenML повышает надежность и управляемость экспериментов в рамках операционной среды машинного обучения.

Для настройки MLflow с ZenML следуйте этим шагам:

  1. Установите интеграцию MLflow:
    1. Используйте следующую команду для установки интеграции MLflow:
zenml integration install mlflow -y

2. Зарегистрируйте систему отслеживания экспериментов MLflow:

Зарегистрируйте систему отслеживания экспериментов в MLflow с помощью этой команды:

zenml experiment-tracker register mlflow_tracker --flavor=mlflow

3. Зарегистрируйте стек:

В ZenML стек представляет собой набор компонентов, которые определяют задачи в рамках вашего рабочего процесса машинного обучения. Он помогает организовывать и управлять шагами в ML-конвейере эффективно. Зарегистрируйте стек с помощью:

Подробнее вы можете найти в документации.

zenml model-deployer register mlflow --flavor=mlflowzenml stack register mlflow_stack -a default -o default -d mlflow -e mlflow_tracker --set

Это связывает ваш стек с конкретными настройками хранения артефактов, оркестратором, целями развертывания и отслеживанием экспериментов.

4. Просмотрите подробности стека:

Вы можете просмотреть компоненты вашего стека с помощью:

zenml stack describe

Это отображает компоненты, связанные со стеком “mlflow_tracker”.

Теперь давайте реализуем систему отслеживания экспериментов в обучающей модели и проанализируем модель:

Вы можете видеть название компонента как mlflow_tracker.

Настройка системы отслеживания экспериментов ZenML

Сначала начните обновление train_model.py:

import loggingimport mlflowimport pandas as pdfrom src.training_model import LogisticRegfrom sklearn.base import ClassifierMixinfrom zenml import stepfrom .config import ModelName#import from zenml.client import Client# Получение системы отслеживания экспериментов активного стекаexperiment_tracker = Client().active_stack.experiment_tracker# Определение шага с именем train_model@step(experiment_tracker = experiment_tracker.name,enable_cache=False)def train_model(    X_train:pd.DataFrame,    y_train:pd.Series,    config:ModelName    ) -> ClassifierMixin:    """    Обучает данные на основе настроек модели    Args:        X_train: pd.DataFrame = Независимые данные для обучения,        y_train: pd.Series = Зависимые данные для обучения.            """    try:        model = None        if config.model_name == "logistic regression":            # Автоматическое ведение журнала оценок, модели и т.д..            mlflow.sklearn.autolog()            model = LogisticReg()        else:            raise ValueError("Model name is not supported")                trained_model = model.train(X_train=X_train,y_train=y_train)        logging.info("Training model completed.")        return trained_model        except Exception as e:        logging.error("Error in step training model",e)        raise e

В этом коде мы настраиваем систему отслеживания экспериментов с помощью mlflow.sklearn.autolog(), которая автоматически регистрирует все детали о модели, что упрощает отслеживание и анализ экспериментов.

В evaluation.py

from zenml.client import Clientexperiment_tracker = Client().active_stack.experiment_tracker@step(experiment_tracker=experiment_tracker.name, enable_cache = False)

Запуск конвейера

Обновите ваш сценарий run_pipeline.py следующим образом:

from pipelines.training_pipeline import train_pipelinefrom zenml.client import Clientif __name__ == '__main__':    # Вывод uri для отслеживания экспериментов    print(Client().active_stack.experiment_tracker.get_tracking_uri())    # Запуск конвейера    train_pipeline(data_path="/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv")

Скопируйте его и вставьте эту команду.

mlflow ui --backend-store-uri "--uri on the top of "file:/home/ "

“`html

Исследуйте свои эксперименты

Нажмите на ссылку, сгенерированную выше, чтобы открыть интерфейс MLflow. Здесь вы найдете кладезь информации:

  • Пайплайны: Легко получайте доступ ко всем запущенным пайплайнам.

  • Детали модели: Нажмите на пайплайн, чтобы узнать каждую деталь о вашей модели.
  • Метрики: Погрузитесь в раздел метрик, чтобы визуализировать производительность вашей модели.

Теперь вы можете победить в отслеживании экспериментов машинного обучения с ZenML и MLflow!

Развертывание

В следующем разделе мы собираемся развернуть эту модель. Вам нужно знать следующие понятия:

a). Непрерывный развертывающий пайплайн

Этот пайплайн автоматизирует процесс развертывания модели. После того, как модель проходит критерии оценки, она автоматически разворачивается в производственную среду. Например, он начинается с предварительной обработки данных, очистки данных, тренировки данных, оценки модели и т. д.

b). Пайплайн развертывания вывода

Пайплайн развертывания вывода сосредоточен на развертывании моделей машинного обучения для реального времени или пакетного вывода. Пайплайн развертывания вывода специализируется на развертывании моделей для создания прогнозов в производственной среде. Например, он настраивает конечную точку API, на которую пользователи могут отправлять текст. Он обеспечивает доступность и масштабируемость модели и отслеживает ее производительность в реальном времени. Эти пайплайны важны для поддержания эффективности и эффективности систем машинного обучения. Теперь мы собираемся реализовать непрерывный пайплайн.

Создайте файл с названием deployment_pipeline.py в папке пайплайнов.

import numpy as npimport jsonimport loggingimport pandas as pdfrom zenml import pipeline, stepfrom zenml.config import DockerSettingsfrom zenml.constants import DEFAULT_SERVICE_START_STOP_TIMEOUTfrom zenml.integrations.constants import MLFLOWfrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import (    MLFlowModelDeployer,)from zenml.integrations.mlflow.services import MLFlowDeploymentServicefrom zenml.integrations.mlflow.steps import mlflow_model_deployer_stepfrom zenml.steps import BaseParameters, Outputfrom src.clean_data import FeatureEncodingfrom .utils import get_data_for_testfrom steps.data_cleaning import cleaning_datafrom steps.evaluation import evaluate_modelfrom steps.ingest_data import ingest_df# Определите настройки Docker с интеграцией MLflowdocker_settings = DockerSettings(required_integrations = {MLFLOW})# Определите класс для конфигурации развертывающего пайплайнакласс DeploymentTriggerConfig(BaseParameters):    min_accuracy:float = 0.92@step def deployment_trigger(    accuracy: float,    config: DeploymentTriggerConfig,):    """    Он запускает развертывание только в том случае, если точность больше или равна минимальной точности.    Args:        accuracy: точность модели.        config: порог минимальной точности.    """    try:        return accuracy >= config.min_accuracy    except Exception as e:        logging.error("Ошибка в триггере развертывания",e)        raise e# Определите непрерывный пайплайн@pipeline(enable_cache=False,settings={"docker":docker_settings})def continuous_deployment_pipeline(    data_path:str,    min_accuracy:float = 0.92,    workers: int = 1,    timeout: int = DEFAULT_SERVICE_START_STOP_TIMEOUT):      df = ingest_df(data_path=data_path)    X_train, X_test, y_train, y_test = cleaning_data(df=df)    model = train_model(X_train=X_train, y_train=y_train)    accuracy_score, precision_score = evaluate_model(model=model, X_test=X_test, y_test=y_test)    deployment_decision = deployment_trigger(accuracy=accuracy_score)    mlflow_model_deployer_step(        model=model,        deploy_decision = deployment_decision,        workers = workers,        timeout = timeout    )

Фреймворк ZenML для проекта машинного обучения

Этот код определяет непрерывное развертывание проекта машинного обучения с использованием фреймворка ZenML.

1. Импортируйте необходимые библиотеки: Импорт необходимых библиотек для развертывания модели.

“`

2. Настройки Docker: При настройке Docker для использования с MLflow, Docker помогает упаковывать и запускать эти модели последовательно.

3. DeploymentTriggerConfig: Это класс, в котором задается минимальный порог точности для развертывания модели.

4. deployment_trigger: Этот шаг вернет результат, если точность модели превысит минимальную точность.

5. continuous_deployment_pipeline: Этот конвейер состоит из нескольких шагов: ввод данных, очистка данных, обучение модели и оценка модели. И модель развертывается только в том случае, если она удовлетворяет минимальному порогу точности.

Далее мы собираемся реализовать конвейер вывода в файле deployment_pipeline.py

import loggingimport pandas as pdfrom zenml.steps import BaseParameters, Outputfrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import MLFlowModelDeployerfrom zenml.integrations.mlflow.services import MLFlowDeploymentServiceclass MLFlowDeploymentLoaderStepParameters(BaseParameters):    pipeline_name: str    step_name: str    running: bool = True@step(enable_cache=False)def dynamic_importer() -> str:    data = get_data_for_test()    return data@step(enable_cache=False)def prediction_service_loader(    pipeline_name: str,    pipeline_step_name: str,    running: bool = True,    model_name: str = "model",) -> MLFlowDeploymentService:    model_deployer = MLFlowModelDeployer.get_active_model_deployer()    existing_services = model_deployer.find_model_server(        pipeline_name=pipeline_name,        pipeline_step_name=pipeline_step_name,        model_name=model_name,        running=running,    )    if not existing_services:        raise RuntimeError(            f"Служба прогнозирования MLflow, развернутая этим шагом {pipeline_step_name} в конвейере {pipeline_name} для модели '{model_name}', в данный момент не работает."        )    return existing_services[0]@stepdef predictor(service: MLFlowDeploymentService, data: str) -> np.ndarray:    service.start(timeout=10)    data = json.loads(data)    prediction = service.predict(data)    return prediction@pipeline(enable_cache=False, settings={"docker": docker_settings})def inference_pipeline(pipeline_name: str, pipeline_step_name: str):    batch_data = dynamic_importer()    model_deployment_service = prediction_service_loader(        pipeline_name=pipeline_name,        pipeline_step_name=pipeline_step_name,        running=False,    )    prediction = predictor(service=model_deployment_service, data=batch_data)    return prediction

Этот код настраивает конвейер для создания прогнозов с использование развернутой модели машинного обучения через MLflow. Он импортирует данные, загружает развернутую модель и использует ее для создания прогнозов.

Нам нужно создать функцию get_data_for_test() в utils.py в папке конвейеров. Так мы сможем более эффективно управлять нашим кодом.

import loggingimport pandas as pd from src.clean_data import DataPreprocessing, LabelEncoding# Функция для получения данных для тестированияdef get_data_for_test():    try:        df = pd.read_csv('./data/WA_Fn-UseC_-Telco-Customer-Churn.csv')        df = df.sample(n=100)        data_preprocessing = DataPreprocessing()        data = data_preprocessing.handle_data(df)                  # Инициализация стратегии кодирования признаков        label_encode = LabelEncoding()        df_encoded = label_encode.handle_data(data)         df_encoded.drop(['Churn'],axis=1,inplace=True)        logging.info(df_encoded.columns)        result = df_encoded.to_json(orient="split")        return result    except Exception as e:        logging.error("e")        raise e

Теперь давайте реализуем созданный нами конвейер для развертывания модели и прогноза на развернутой модели.

Создайте файл run_deployment.py в проектной папке:

import click  # Для работы с аргументами командной строкиimport logging  from typing import castfrom rich import print  # Для форматирования вывода в консоль# Импорт конвейеров для развертывания и вывода из pipelinefrom pipelines.deployment_pipeline import (continuous_deployment_pipeline, inference_pipeline)# Импорт утилит и компонентов MLflowfrom zenml.integrations.mlflow.mlflow_utils import get_tracking_urifrom zenml.integrations.mlflow.model_deployers.mlflow_model_deployer import ( MLFlowModelDeployer)from zenml.integrations.mlflow.services import MLFlowDeploymentService# Определение констант для различных конфигураций: DEPLOY, PREDICT, DEPLOY_AND_PREDICTDEPLOY = "deploy"PREDICT = "predict"DEPLOY_AND_PREDICT = "deploy_and_predict"# Определение основной функции, использующей Click для обработки аргументов командной строки@click.command()@click.option(    "--config",    "-c",    type=click.Choice([DEPLOY, PREDICT, DEPLOY_AND_PREDICT]),    default=DEPLOY_AND_PREDICT,    help="Optional ты можешь выбрать только запустить конвейер развертывания для обучения и развертывания модели (`deploy`), либо только запустить прогноз с использованием развернутой модели (`predict`). По умолчанию будет запущено и то и другое (`deploy_and_predict`).",)@click.option(    "--min-accuracy",    default=0.92,    help="Минимальная точность, необходимая для развертывания модели",)def run_main(config:str, min_accuracy:float ):    # Получение активного компонента развертывания модели MLFlow    mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer()        # Определение желания пользователя развернуть модель (deploy), делать прогнозы (predict) или и то и другое (deploy_and_predict)    deploy = config == DEPLOY or config == DEPLOY_AND_PREDICT    predict = config == PREDICT or config == DEPLOY_AND_PREDICT        # Если запрашивается развертывание модели:    if deploy:        continuous_deployment_pipeline(            data_path='/mnt/e/Customer_churn/data/WA_Fn-UseC_-Telco-Customer-Churn.csv',            min_accuracy=min_accuracy,            workers=3,            timeout=60        )        # Если запрашивается создание прогнозов:    if predict:        # Инициализация конвейера вывода        inference_pipeline(            pipeline_name="continuous_deployment_pipeline",            pipeline_step_name="mlflow_model_deployer_step",        )        # Вывод инструкций для просмотра запусков эксперимента в MLflow UI    print(        "Вы можете выполнить следующую команду:\n "        f"[italic green]    mlflow ui --backend-store-uri '{get_tracking_uri()}"        "[/italic green]\n ...чтобы просмотреть запуски экспериментов внутри MLflow"        " UI.\n Вы найдете ваши запуски в эксперименте "        "`mlflow_example_pipeline`. Там вы также сможете "        "сравнить два или более запуска.\n\n"    )        # Получение существующих сервисов с тем же именем конвейера, именем шага и именем модели    existing_services = mlflow_model_deployer_component.find_model_server(        pipeline_name = "continuous_deployment_pipeline",        pipeline_step_name = "mlflow_model_deployer_step",    )        # Проверка состояния сервера прогнозирования:    if existing_services:        service = cast(MLFlowDeploymentService, existing_services[0])        if service.is_running:            print(                f"Сервер прогнозирования MLflow работает локально как служба демона, принимающая запросы на прогнозирование по адресу: \n"                f"     {service.prediction_url}\n"                f"Чтобы остановить сервис, выполните"                f"[italic green] zenml model-deployer models delete"                f"{str(service.uuid)}'[/italic green]."            )        elif service.is_failed:            print(                f"Сервер прогнозирования MLflow находится в состоянии 'failed': \n"                f"Последнее состояние: '{service.status.state.value}'\n"                f"Последняя ошибка: '{service.status.last_error}'"            )    else:        print(            "В на

Этот код представляет собой сценарий командной строки для управления и развертывания модели машинного обучения с использованием MLFlow и ZenMl.

Теперь давайте развернем модель.

Выполните эту команду в вашем терминале.

python run_deployment.py --config deploy

Теперь мы развернули нашу модель. Ваш конвейер будет успешно запущен, и вы можете просмотреть его в панели инструментов ZenML.

python run_deployment.py --config predict

Инициация процесса прогнозирования

Теперь наш сервер прогнозирования MLFlow работает.

Нам нужное веб-приложение для ввода данных и просмотра результатов. Возможно, вы задаетесь вопросом, почему нам приходится создавать веб-приложение с нуля.

На самом деле, мы собираемся использовать Streamlit, который является фреймворком с открытым исходным кодом для создания быстрого и простого веб-приложения для нашей модели машинного обучения.

Установите библиотеку

pip install streamlit

Создайте файл с именем streamlit_app.py в вашем проектном каталоге.

import json
import logging 
import numpy as np
import pandas as pd 
import streamlit as st 
from PIL import Image 
from pipelines.deployment_pipeline import prediction_service_loader 
from run_deployment import main 

def main(): 
    st.title("Конвейер от начала до конца для оценки удовлетворенности клиентов с использованием ZenML") 
    st.markdown( 
        """ 
        #### Постановка задачи 
        Здесь целью является прогнозирование оценки удовлетворенности клиента для заданного заказа на основе таких функций, как статус заказа, цена, платежи и т. д. Я буду использовать [ZenML](https://zenml.io/) для создания готового к производству конвейера для прогнозирования оценки удовлетворенности клиента для следующего заказа или покупки. 
        """ 
    ) 
    st.markdown( 
        """ 
        На рисунке показан весь конвейер, мы сначала загружаем данные, очищаем их, обучаем модель, оцениваем модель, и если источник данных изменяется или изменяются значения гиперпараметров, развертывание будет запущено, и (пе)обучит модель, а если модель соответствует требованиям к минимальной точности, модель будет развернута. 
        """ 
    )
    st.markdown( 
        """ 
        #### Описание функций 
        Это приложение предназначено для прогнозирования оценки удовлетворенности клиента для заданного клиента. Вы можете ввести функции, перечисленные ниже, и получить оценку удовлетворенности клиента. 
        | Модели | Описание | 
        | ------------- | - | 
        | Старший гражданин | Указывает, является ли клиент пожилым человеком. | 
        | стаж | Количество месяцев, которое клиент находится в компании. | 
        | Ежемесячные платежи | Ежемесячные расходы клиента. | 
        | Общие расходы | Общие расходы клиента. | 
        | Пол | Пол клиента (Мужчина: 1, Женщина: 0). | 
        | Партнер | Есть ли у клиента партнер (Да: 1, Нет: 0). | 
        | Иждивенцы | Есть ли у клиента иждивенцы (Да: 1, Нет: 0). | 
        | Телефонная связь | Есть ли у клиента телефонная связь (Да: 1, Нет: 0). | 
        | Несколько линий | Есть ли у клиента несколько линий (Да: 1, Нет: 0). | 
        | Интернет-сервис | Тип интернет-сервиса (Нет: 1, Другой: 0). | 
        | Онлайн-безопасность | Есть ли у клиента онлайн-безопасность (Да: 1, Нет: 0). | 
        | Online резервное копирование | Есть ли у клиента резервное копирование в Интернете (Да: 1, Нет: 0). | 
        | Защита устройства | Есть ли у клиента защита устройства (Да: 1, Нет: 0). | 
        | Техническая поддержка | Есть ли у клиента техническая поддержка (Да: 1, Нет: 0). | 
        | Потоковое TV | Есть ли у клиента потоковое телевидение (Да: 1, Нет: 0). | 
        | Потоковые фильмы | Есть ли у клиента потоковые фильмы (Да: 1, Нет: 0). | 
        | Контракт | Тип контракта (Один год: 1, Другое: 0). | 
        | Политика без бумажек | Использует ли клиент безбумажную тарификацию (Да: 1, Нет: 0). | 
        | Способ оплаты | Способ оплаты (Кредитная карта: 1, Другое: 0). | 
        | Отток | Ушел ли клиент (Да: 1, Нет: 0). | 
        """ 
    )
    payment_options = { 
        2: "Электронная проверка", 
        3: "Отправленная проверка", 
        1: "Банковский перевод (автоматический)", 
        0: "Кредитная карта (автоматический)" 
    }
    contract = { 
        0: "Ежемесячный", 
        2: "Двухлетний", 
        1: "Однолетний" 
    }
    def format_func(PaymentMethod): 
        return payment_options[PaymentMethod]
    def format_func_contract(Contract): 
        return contract[Contract]
    
    display = ("мужской", "женский") 
    options = list(range(len(display))) 
    
    # Определение столбцов данных с соответствующими значениями
    SeniorCitizen = st.selectbox("Вы являетесь пожилым человеком?", options=[True, False],) 
    tenure

Этот код определяет StreamLit, который предоставляет интерфейс для прогнозирования оттока клиентов в телекомпании на основе данных о клиентах и демографических сведений.

Пользователи могут вводить свою информацию через удобный интерфейс, а код использует обученную модель машинного обучения (развернутую с помощью ZenML и MLflow) для прогнозирования.

Предсказанный результат затем отображается пользователю.

Теперь выполните эту команду:

⚠️ убедитесь, что ваша модель прогнозирования работает

streamlit run streamlit_app.py

Нажмите на ссылку.

Вот и всё; мы завершили наш проект.

Вот и всё; мы успешно завершили наш проект машинного обучения от начала до конца, показывая, как профессионалы подходят к всему процессу.

Заключение

В этом всестороннем исследовании операций машинного обучения (MLOps) через разработку и развертывание модели прогнозирования оттока клиентов мы увидели трансформационную силу MLOps в оптимизации жизненного цикла машинного обучения. От сбора и предобработки данных до обучения модели, оценки и развертывания, наш проект демонстрирует важную роль MLOps в преодолении разрыва между разработкой и производством. Поскольку организации все больше полагаются на принятие решений на основе данных, эффективные и масштабируемые практики, продемонстрированные здесь, подчеркивают важность MLOps для успешной реализации приложений машинного обучения.

Ключевые моменты

  • MLOps (операции машинного обучения) играют решающую роль в оптимизации всего жизненного цикла машинного обучения, обеспечивая эффективность, надежность и масштабируемость операций.
  • ZenML и MLflow - мощные инструменты, которые облегчают разработку, отслеживание и развертывание моделей машинного обучения в реальных приложениях.
  • Корректная предобработка данных, включая очистку, кодирование и разделение, является основой для создания надежных моделей машинного обучения.
  • Метрики оценки, такие как точность, точность, полнота и F1-оценка, позволяют осознать полную картину производительности модели.
  • Инструменты отслеживания экспериментов, такие как MLflow, улучшают сотрудничество и управление экспериментами в проектах науки о данных.
  • Непрерывность и развертывание пайплайнов обработки критичны для поддержания эффективности и доступности модели в производственных средах.

Часто задаваемые вопросы

Показанный в этой статье материал не принадлежит Analytics Vidhya и используется по усмотрению автора.