从实验到生产:使用 MLflow 和 BentoML 构建 ML 流水线

2025年2月27日 • 作者:Chaoyu Yang

机器学习项目涉及许多移动部分 - 从实验到生产部署。两个可以很好地协同工作以简化此过程的工具是 MLflowBentoML。在本教程中,我们将演示如何使用 MLflow 进行实验跟踪以及使用 BentoML 进行模型部署和生产部署。

具体来说,您将学习如何

  • 使用 MLflow 记录和跟踪您的模型和实验。
  • 将这些模型保存到 BentoML 中,以便进行统一管理和部署。
  • 将 MLflow 模型转化为生产就绪的 API。
  • 使用 BentoML 强制执行输入数据验证并利用自适应批量处理。
  • 标准化大型团队的部署工作流程。

您可以在 BentoMLflow 仓库 中找到所有源代码。

让我们开始吧!

环境设置

安装必要的软件包

pip install bentoml mlflow scikit-learn

注意:虽然我们出于演示目的使用 scikit-learn,但 MLflow 和 BentoML 都支持各种框架,例如 PyTorch、TensorFlow 和 XGBoost。

启动您的 MLflow 跟踪服务器

mlflow server --host 127.0.0.1 --port 8080

此服务器将跟踪我们的实验并存储我们的模型工件。

使用 MLflow 训练模型

让我们使用 Iris 数据集训练一个简单的分类模型,并使用 MLflow 记录结果

import mlflow from mlflow.models import infer_signature import pandas as pd from sklearn import datasets from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score # Load the Iris dataset X, y = datasets.load_iris(return_X_y=True) # Split the data into training and test sets X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # Define the model hyperparameters params = { "solver": "lbfgs", "max_iter": 1000, "random_state": 8888, } # Train the model lr = LogisticRegression(**params) lr.fit(X_train, y_train) # Predict on the test set y_pred = lr.predict(X_test) # Calculate metrics accuracy = accuracy_score(y_test, y_pred) print(accuracy)

使用 MLflow 跟踪实验

在这里,我们记录模型参数,跟踪指标,并将模型工件保存到 MLflow。

from datetime import date # Set our tracking server uri for logging mlflow.set_tracking_uri(uri="http://127.0.0.1:8080") # Create a new MLflow Experiment mlflow.set_experiment("MLflow+BentoML Quickstart") # Start an MLflow run with mlflow.start_run(): # Log the hyperparameters mlflow.log_params(params) # Log the loss metric mlflow.log_metric("accuracy", accuracy) # Set a tag that we can use to remind ourselves what this run was for mlflow.set_tag("Training Info", "Basic LR model for iris data") # Infer the model signature signature = infer_signature(X_train, lr.predict(X_train)) # Log the model model_info = mlflow.sklearn.log_model( sk_model=lr, artifact_path="iris_model", signature=signature, input_example=X_train, registered_model_name="iris_demo", ) model_uri = mlflow.get_artifact_uri("iris_model")

至此,MLflow 已经

  • 记录了模型超参数
  • 跟踪了模型准确性
  • 保存了模型工件
  • 创建了定义模型输入和输出格式的签名

您可以通过访问 http://127.0.0.1:8080 在 MLflow UI 中查看所有信息。

使用 BentoML 保存和版本化模型

一旦您对性能满意,就可以将模型注册到 BentoML 模型库中进行部署。

import bentoml bento_model = bentoml.mlflow.import_model( 'iris', model_uri=model_uri, labels={ "team": "bento", "stage": "dev", "accuracy": accuracy, "training_date": str(date.today()) } )

请注意

  • MLflow 模型注册表 旨在跟踪和存储实验期间创建的模型工件。它有助于选择和比较模型、重现训练运行或持续训练模型。
  • BentoML 模型库 管理用于应用程序开发和生产部署的已批准模型。它专注于简化生产化工作流程,提高模型分发和加载的效率。模型库还支持版本控制,并将模型链接到其 MLflow 实验元数据。

验证模型是否已保存到模型库

$ bentoml models list Tag                           Module           Size        Creation Time iris:hu5d7xxs3oxmnuqj         bentoml.mlflow   11.75 KiB   2025-02-24 10:14:51

您可以测试从模型库加载模型

import numpy as np import bentoml # Load the latest version of iris model: iris_model = bentoml.mlflow.load_model("iris:latest") # Alternatively, load the model by specifying the model tag # iris_model = bentoml.mlflow.load_model("iris:hu5d7xxs3oxmnuqj") input_data = np.array([[5.9, 3, 5.1, 1.8]]) res = iris_model.predict(input_data) print(res)

使用 BentoML 部署 MLflow 模型

现在模型已准备好,创建 BentoML Service 来部署它。按照惯例,您定义一个名为 service.py 的文件来实现模型部署逻辑。

import bentoml import numpy as np from bentoml.models import BentoModel # Define the runtime environment for your Bento demo_image = bentoml.images.PythonImage(python_version="3.11") \ .python_packages("mlflow", "scikit-learn") target_names = ['setosa', 'versicolor', 'virginica'] @bentoml.service( image=demo_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: # Declare the model as a class attribute bento_model = BentoModel("iris:latest") def __init__(self): self.model = bentoml.mlflow.load_model(self.bento_model) # Define an API endpoint @bentoml.api def predict(self, input_data: np.ndarray) -> list[str]: preds = self.model.predict(input_data) return [target_names[i] for i in preds]

请注意

  • PythonImage 用于定义 Bento 的运行时环境,BentoML 中的统一分发格式。您可以通过设置所需的 Python 版本、依赖项、运行命令等来自定义构建。
  • @bentoml.service 标记一个 Python 类为 BentoML Service。它允许您指定 配置,例如请求超时和资源需求。

使用 BentoML CLI 部署模型

$ bentoml serve service.py:IrisClassifier [INFO] [cli] Starting production HTTP BentoServer from "service:IrisClassifier" listening on https://:3000 (Press CTRL+C to quit) [INFO] [entry_service:IrisClassifier:1] Service IrisClassifier initialized

模型现在正在 https://:3000 运行。查询端点

curl -X 'POST' \ 'https://:3000/predict' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "input_data": [[ 0.1, 0.2, 0.1, 0.1 ]] }' # ["setosa"]

或者,使用 BentoML Python 客户端

import bentoml import numpy as np client = bentoml.SyncHTTPClient("https://:3000") client.predict(np.array([[5.9, 3, 5.1, 1.8]])) # ['virginica']

使用 BentoML 验证输入数据

一个常见问题是处理来自客户端的意外数据格式或类型。例如,如果客户端发送整数值而不是浮点数

curl -X 'POST' \ 'https://:3000/predict' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "input_data": [[ 5,3,5,2 ]] }'

这将导致错误

# client side error: # {"error":"An unexpected error has occurred, please check the server log."} # server side log: # mlflow.exceptions.MlflowException: Failed to enforce schema of data '[[5 3 5 2]]' with schema '[Tensor('float64', (-1, 4))]'. Error: dtype of input int64 does not match expected dtype float64

使用指定 float dtype 的 BentoML Python 客户端可以解决此问题

import bentoml import numpy as np client = bentoml.SyncHTTPClient("https://:3000") client.predict(np.array([[1,1,1,1]], dtype='float64'))

然而,这在将 ML 服务与下游服务集成时带来了挑战。为了进一步帮助进行输入验证,BentoML 扩展了 Pydantic 以处理常见的 ML 数据类型(例如,图像、文本流、浮点数)。您可以在 BentoML Service 中定义严格的模式

import bentoml import numpy as np import numpy.typing as npt from bentoml.models import BentoModel from pydantic import Field from bentoml.validators import Shape, DType from typing import Annotated demo_image = bentoml.images.PythonImage(python_version="3.11") \ .python_packages("mlflow", "scikit-learn") target_names = ['setosa', 'versicolor', 'virginica'] @bentoml.service( image=demo_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: bento_model = BentoModel("iris:latest") def __init__(self): self.model = bentoml.mlflow.load_model(self.bento_model) # Enforce and validate input schemas for the API @bentoml.api def predict( self, input_data: Annotated[npt.NDArray[np.float64], Shape((-1, 4)), DType("float64")] = Field(default=[[0.1, 0.4, 0.2, 1.0]]) ) -> list[str]: preds = self.model.predict(input_data) return [target_names[i] for i in preds]

现在,任何整数输入都可以自动进行验证(如果可能,还会进行转换)。您可以使用通用 HTTP 客户端尝试一下

curl -X 'POST' \ 'https://:3000/predict' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "input_data": [[ 5,3,5,2 ]] }' # ["virginica"]

您还可以检查 OpenAPI 文档以查看您的服务所需的模式

curl localhost:3000/docs.json

这会返回一个 JSON 模式,用于描述 API 的输入和输出格式。

"paths": { ... "/predict": { "post": { "responses": { "200": { "description": "Successful Response", "content": { "application/json": { "schema": { "type": "array", "items": { "type": "number" } } } } }, "400": { "description": "Bad Request", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/InvalidArgument" } } } }, "500": { "description": "Internal Server Error", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/InternalServerError" } } } } }, "requestBody": { "content": { "application/json": { "schema": { "type": "object", "title": "Input", "properties": { "input_data": { "default": [ [ 0.1, 0.4, 0.2, 1 ] ], "items": { "items": { "type": "number" }, "type": "array" }, "title": "Input Data", "type": "array" } } } } } }, "operationId": "IrisClassifier__predict" } } }, ... "components": { "schemas": { "predict__Input": { "type": "object", "title": "predict__Input", "properties": { "input_data": { "default": [ [ 0.1, 0.4, 0.2, 1 ] ], "dim": -4, "dtype": "float64", "format": "numpy-array", "shape": [ -1, 4 ], "title": "Input Data", "type": "tensor" } } }, ...

高级用例:启用自适应批量处理

BentoML 可以通过自适应批量处理来优化性能,它将多个独立请求组合成一个批次,以提高处理效率。

让我们更新我们的 Service 以支持批量处理

import bentoml import numpy as np from bentoml.models import BentoModel demo_image = bentoml.images.PythonImage(python_version="3.11") \ .python_packages("mlflow", "scikit-learn") target_names = ['setosa', 'versicolor', 'virginica'] @bentoml.service( image=demo_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: bento_model = BentoModel("iris:latest") def __init__(self): self.model = bentoml.mlflow.load_model(self.bento_model) # Enable adaptive batching @bentoml.api(batchable=True) def predict( self, input_data: np.ndarray ) -> list[str]: print(f"batch_size: {len(input_data)}") preds = self.model.predict(input_data) return [target_names[i] for i in preds]

您可以使用一个模拟多个并发客户端的脚本来测试它

import requests from concurrent.futures import ThreadPoolExecutor import time import random CONCURRENCY = 20 # Number of threads (concurrent requests) TOTAL_REQUESTS = 1000 # Total number of requests to send client = bentoml.SyncHTTPClient("https://:3000") from sklearn.datasets import load_iris iris = load_iris() data_samples = iris.data.tolist() payloads = [random.choice(data_samples) for _ in range(TOTAL_REQUESTS)] def send_request(index, data): """Send a single HTTP request and print the result.""" try: start_time = time.time() response = client.predict(np.array([data])) duration = time.time() - start_time except Exception as e: print(f"Request {index}: Error -> {e}") print(f"Sending {TOTAL_REQUESTS} requests to {client.url} with concurrency {CONCURRENCY}...") with ThreadPoolExecutor(max_workers=CONCURRENCY) as executor: for i, data in enumerate(payloads, start=1): executor.submit(send_request, i, data) print("Done.")

尽管每个客户端发送一个数据点,但您将从服务器日志中注意到 BentoML 会动态地将多个请求批量处理在一起。这提高了吞吐量并提高了计算效率。

您还可以在 https://:3000/metrics 监控批量大小指标。以下是运行上述脚本后的一些示例指标

# HELP bentoml_service_adaptive_batch_size Service adaptive batch size # TYPE bentoml_service_adaptive_batch_size histogram bentoml_service_adaptive_batch_size_sum{method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 1000.0 bentoml_service_adaptive_batch_size_bucket{le="1.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 66.0 bentoml_service_adaptive_batch_size_bucket{le="2.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 111.0 bentoml_service_adaptive_batch_size_bucket{le="4.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 171.0 bentoml_service_adaptive_batch_size_bucket{le="8.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 227.0 bentoml_service_adaptive_batch_size_bucket{le="16.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 249.0 bentoml_service_adaptive_batch_size_bucket{le="32.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0 bentoml_service_adaptive_batch_size_bucket{le="64.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0 bentoml_service_adaptive_batch_size_bucket{le="100.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0 bentoml_service_adaptive_batch_size_bucket{le="+Inf",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0 bentoml_service_adaptive_batch_size_count{method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0

标准化您的部署工作流程

对于协作开发多个模型和项目的较大团队,BentoML 提供了标准化 ML 服务开发的工具。

示例:跨多个项目强制执行环境依赖和 API 规范

首先,创建一个定义共享组件的 common.py 文件

# common.py import bentoml import numpy as np import numpy.typing as npt from pydantic import Field from bentoml.validators import Shape, DType from typing import Annotated my_image = bentoml.images.PythonImage(python_version="3.11") \ .python_packages("mlflow", "scikit-learn") class MyInputParams(bentoml.IODescriptor): input_data: Annotated[npt.NDArray[np.float64], Shape((-1, 4)), DType("float64")] = Field(default=[[0.1, 0.4, 0.2, 1.0]]) client_id: str

然后,在您的 Service 中使用这些组件

import bentoml import numpy as np import numpy.typing as npt from bentoml.models import BentoModel from common import MyInputParams, my_image @bentoml.service( image=my_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: bento_model = BentoModel("iris:latest") def __init__(self): self.model = bentoml.mlflow.load_model(self.bento_model) @bentoml.api(input_spec=MyInputParams) def predict( self, input_data, client_id, ) -> list[str]: print(f"processing request form user {client_id}") rv = self.model.predict(input_data) return np.asarray(rv)

示例:部署多个模型

BentoML 可以轻松地在单个 Service(或分布式 Services)中部署多个模型。

import bentoml import numpy as np import numpy.typing as npt from bentoml.models import BentoModel from common import MyInputParams, my_image @bentoml.service( image=my_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: bento_model_1 = BentoModel("iris:v1") bento_model_2 = BentoModel("iris:v2") def __init__(self): self.model_1 = bentoml.mlflow.load_model(self.bento_model_1) self.model_2 = bentoml.mlflow.load_model(self.bento_model_2) @bentoml.api(route="/v1/predict", input_spec=MyInputParams) def predict_1( self, input_data, client_id, ) -> np.ndarray: rv = self.model_1.predict(input_data) return np.asarray(rv) @bentoml.api(route="/v2/predict", input_spec=MyInputParams) def predict_2( self, input_data, client_id, ) -> np.ndarray: rv = self.model_2.predict(input_data) return np.asarray(rv) # Combine predictions @bentoml.api(input_spec=MyInputParams) def predict_combined( self, input_data, client_id, ) -> np.ndarray: rv_a = self.model_1.predict(input_data) rv_b = self.model_2.predict(input_data) return np.asarray([rv_a, rv_b])

这种方法允许您

  • 在不同端点下部署多个模型版本
  • 创建结合多个模型预测结果的集成模型
  • 在模型版本之间实施 A/B 测试

有关更多信息,请参阅BentoML 关于多模型组合的文档

部署到生产环境

BentoML 为生产部署提供了多种选项

  1. 容器化:为您的 ML 服务构建符合 OCI 规范的镜像,以便在任何容器平台上部署

    bentoml build bentoml containerize iris_classifier:latest

    有关更多详细信息,请参阅容器化指南

  2. BentoCloud:注册 BentoCloud,直接部署到统一推理平台,享受便捷管理、快速自动扩缩容、企业级安全和全面的可观测性

    bentoml deploy

    有关更多详细信息,请参阅云部署指南

结论

在本教程中,我们了解了 MLflow 和 BentoML 如何协同工作,创建从实验到生产的无缝工作流程

  • MLflow 在开发阶段处理实验跟踪、模型指标和工件存储
  • BentoML 负责生产方面的事务:模型部署、验证、批量处理和部署

这种集成使得数据科学家能够专注于模型开发,同时确保他们的模型能够可靠地部署到生产环境。查看以下内容了解更多信息

  • 注册 BentoCloud,将您的第一个 MLflow 模型部署到云端。
  • 我们的专家交流,获得关于实施 AI 解决方案的定制指导。
  • 加入我们的 Slack 社区,这里汇聚了 AI 从业者和工程师。分享经验,获得实时支持,并及时了解 AI 部署的最佳实践。