2025年2月27日 • 作者:Chaoyu Yang
机器学习项目涉及许多移动部分 - 从实验到生产部署。两个可以很好地协同工作以简化此过程的工具是 MLflow 和 BentoML。在本教程中,我们将演示如何使用 MLflow 进行实验跟踪以及使用 BentoML 进行模型部署和生产部署。
具体来说,您将学习如何
您可以在 BentoMLflow 仓库 中找到所有源代码。
让我们开始吧!
安装必要的软件包
pip install bentoml mlflow scikit-learn
注意:虽然我们出于演示目的使用 scikit-learn,但 MLflow 和 BentoML 都支持各种框架,例如 PyTorch、TensorFlow 和 XGBoost。
启动您的 MLflow 跟踪服务器
mlflow server --host 127.0.0.1 --port 8080
此服务器将跟踪我们的实验并存储我们的模型工件。
让我们使用 Iris 数据集训练一个简单的分类模型,并使用 MLflow 记录结果
import mlflow from mlflow.models import infer_signature import pandas as pd from sklearn import datasets from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score # Load the Iris dataset X, y = datasets.load_iris(return_X_y=True) # Split the data into training and test sets X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42 ) # Define the model hyperparameters params = { "solver": "lbfgs", "max_iter": 1000, "random_state": 8888, } # Train the model lr = LogisticRegression(**params) lr.fit(X_train, y_train) # Predict on the test set y_pred = lr.predict(X_test) # Calculate metrics accuracy = accuracy_score(y_test, y_pred) print(accuracy)
在这里,我们记录模型参数,跟踪指标,并将模型工件保存到 MLflow。
from datetime import date # Set our tracking server uri for logging mlflow.set_tracking_uri(uri="http://127.0.0.1:8080") # Create a new MLflow Experiment mlflow.set_experiment("MLflow+BentoML Quickstart") # Start an MLflow run with mlflow.start_run(): # Log the hyperparameters mlflow.log_params(params) # Log the loss metric mlflow.log_metric("accuracy", accuracy) # Set a tag that we can use to remind ourselves what this run was for mlflow.set_tag("Training Info", "Basic LR model for iris data") # Infer the model signature signature = infer_signature(X_train, lr.predict(X_train)) # Log the model model_info = mlflow.sklearn.log_model( sk_model=lr, artifact_path="iris_model", signature=signature, input_example=X_train, registered_model_name="iris_demo", ) model_uri = mlflow.get_artifact_uri("iris_model")
至此,MLflow 已经
您可以通过访问 http://127.0.0.1:8080
在 MLflow UI 中查看所有信息。
一旦您对性能满意,就可以将模型注册到 BentoML 模型库中进行部署。
import bentoml bento_model = bentoml.mlflow.import_model( 'iris', model_uri=model_uri, labels={ "team": "bento", "stage": "dev", "accuracy": accuracy, "training_date": str(date.today()) } )
请注意
验证模型是否已保存到模型库
$ bentoml models list Tag Module Size Creation Time iris:hu5d7xxs3oxmnuqj bentoml.mlflow 11.75 KiB 2025-02-24 10:14:51
您可以测试从模型库加载模型
import numpy as np import bentoml # Load the latest version of iris model: iris_model = bentoml.mlflow.load_model("iris:latest") # Alternatively, load the model by specifying the model tag # iris_model = bentoml.mlflow.load_model("iris:hu5d7xxs3oxmnuqj") input_data = np.array([[5.9, 3, 5.1, 1.8]]) res = iris_model.predict(input_data) print(res)
现在模型已准备好,创建 BentoML Service 来部署它。按照惯例,您定义一个名为 service.py
的文件来实现模型部署逻辑。
import bentoml import numpy as np from bentoml.models import BentoModel # Define the runtime environment for your Bento demo_image = bentoml.images.PythonImage(python_version="3.11") \ .python_packages("mlflow", "scikit-learn") target_names = ['setosa', 'versicolor', 'virginica'] @bentoml.service( image=demo_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: # Declare the model as a class attribute bento_model = BentoModel("iris:latest") def __init__(self): self.model = bentoml.mlflow.load_model(self.bento_model) # Define an API endpoint @bentoml.api def predict(self, input_data: np.ndarray) -> list[str]: preds = self.model.predict(input_data) return [target_names[i] for i in preds]
请注意
PythonImage
用于定义 Bento 的运行时环境,BentoML 中的统一分发格式。您可以通过设置所需的 Python 版本、依赖项、运行命令等来自定义构建。@bentoml.service
标记一个 Python 类为 BentoML Service。它允许您指定 配置,例如请求超时和资源需求。使用 BentoML CLI 部署模型
$ bentoml serve service.py:IrisClassifier [INFO] [cli] Starting production HTTP BentoServer from "service:IrisClassifier" listening on https://:3000 (Press CTRL+C to quit) [INFO] [entry_service:IrisClassifier:1] Service IrisClassifier initialized
模型现在正在 https://:3000
运行。查询端点
curl -X 'POST' \ 'https://:3000/predict' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "input_data": [[ 0.1, 0.2, 0.1, 0.1 ]] }' # ["setosa"]
或者,使用 BentoML Python 客户端
import bentoml import numpy as np client = bentoml.SyncHTTPClient("https://:3000") client.predict(np.array([[5.9, 3, 5.1, 1.8]])) # ['virginica']
一个常见问题是处理来自客户端的意外数据格式或类型。例如,如果客户端发送整数值而不是浮点数
curl -X 'POST' \ 'https://:3000/predict' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "input_data": [[ 5,3,5,2 ]] }'
这将导致错误
# client side error: # {"error":"An unexpected error has occurred, please check the server log."} # server side log: # mlflow.exceptions.MlflowException: Failed to enforce schema of data '[[5 3 5 2]]' with schema '[Tensor('float64', (-1, 4))]'. Error: dtype of input int64 does not match expected dtype float64
使用指定 float dtype 的 BentoML Python 客户端可以解决此问题
import bentoml import numpy as np client = bentoml.SyncHTTPClient("https://:3000") client.predict(np.array([[1,1,1,1]], dtype='float64'))
然而,这在将 ML 服务与下游服务集成时带来了挑战。为了进一步帮助进行输入验证,BentoML 扩展了 Pydantic 以处理常见的 ML 数据类型(例如,图像、文本流、浮点数)。您可以在 BentoML Service 中定义严格的模式
import bentoml import numpy as np import numpy.typing as npt from bentoml.models import BentoModel from pydantic import Field from bentoml.validators import Shape, DType from typing import Annotated demo_image = bentoml.images.PythonImage(python_version="3.11") \ .python_packages("mlflow", "scikit-learn") target_names = ['setosa', 'versicolor', 'virginica'] @bentoml.service( image=demo_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: bento_model = BentoModel("iris:latest") def __init__(self): self.model = bentoml.mlflow.load_model(self.bento_model) # Enforce and validate input schemas for the API @bentoml.api def predict( self, input_data: Annotated[npt.NDArray[np.float64], Shape((-1, 4)), DType("float64")] = Field(default=[[0.1, 0.4, 0.2, 1.0]]) ) -> list[str]: preds = self.model.predict(input_data) return [target_names[i] for i in preds]
现在,任何整数输入都可以自动进行验证(如果可能,还会进行转换)。您可以使用通用 HTTP 客户端尝试一下
curl -X 'POST' \ 'https://:3000/predict' \ -H 'accept: application/json' \ -H 'Content-Type: application/json' \ -d '{ "input_data": [[ 5,3,5,2 ]] }' # ["virginica"]
您还可以检查 OpenAPI 文档以查看您的服务所需的模式
curl localhost:3000/docs.json
这会返回一个 JSON 模式,用于描述 API 的输入和输出格式。
"paths": { ... "/predict": { "post": { "responses": { "200": { "description": "Successful Response", "content": { "application/json": { "schema": { "type": "array", "items": { "type": "number" } } } } }, "400": { "description": "Bad Request", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/InvalidArgument" } } } }, "500": { "description": "Internal Server Error", "content": { "application/json": { "schema": { "$ref": "#/components/schemas/InternalServerError" } } } } }, "requestBody": { "content": { "application/json": { "schema": { "type": "object", "title": "Input", "properties": { "input_data": { "default": [ [ 0.1, 0.4, 0.2, 1 ] ], "items": { "items": { "type": "number" }, "type": "array" }, "title": "Input Data", "type": "array" } } } } } }, "operationId": "IrisClassifier__predict" } } }, ... "components": { "schemas": { "predict__Input": { "type": "object", "title": "predict__Input", "properties": { "input_data": { "default": [ [ 0.1, 0.4, 0.2, 1 ] ], "dim": -4, "dtype": "float64", "format": "numpy-array", "shape": [ -1, 4 ], "title": "Input Data", "type": "tensor" } } }, ...
BentoML 可以通过自适应批量处理来优化性能,它将多个独立请求组合成一个批次,以提高处理效率。
让我们更新我们的 Service 以支持批量处理
import bentoml import numpy as np from bentoml.models import BentoModel demo_image = bentoml.images.PythonImage(python_version="3.11") \ .python_packages("mlflow", "scikit-learn") target_names = ['setosa', 'versicolor', 'virginica'] @bentoml.service( image=demo_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: bento_model = BentoModel("iris:latest") def __init__(self): self.model = bentoml.mlflow.load_model(self.bento_model) # Enable adaptive batching @bentoml.api(batchable=True) def predict( self, input_data: np.ndarray ) -> list[str]: print(f"batch_size: {len(input_data)}") preds = self.model.predict(input_data) return [target_names[i] for i in preds]
您可以使用一个模拟多个并发客户端的脚本来测试它
import requests from concurrent.futures import ThreadPoolExecutor import time import random CONCURRENCY = 20 # Number of threads (concurrent requests) TOTAL_REQUESTS = 1000 # Total number of requests to send client = bentoml.SyncHTTPClient("https://:3000") from sklearn.datasets import load_iris iris = load_iris() data_samples = iris.data.tolist() payloads = [random.choice(data_samples) for _ in range(TOTAL_REQUESTS)] def send_request(index, data): """Send a single HTTP request and print the result.""" try: start_time = time.time() response = client.predict(np.array([data])) duration = time.time() - start_time except Exception as e: print(f"Request {index}: Error -> {e}") print(f"Sending {TOTAL_REQUESTS} requests to {client.url} with concurrency {CONCURRENCY}...") with ThreadPoolExecutor(max_workers=CONCURRENCY) as executor: for i, data in enumerate(payloads, start=1): executor.submit(send_request, i, data) print("Done.")
尽管每个客户端发送一个数据点,但您将从服务器日志中注意到 BentoML 会动态地将多个请求批量处理在一起。这提高了吞吐量并提高了计算效率。
您还可以在 https://:3000/metrics
监控批量大小指标。以下是运行上述脚本后的一些示例指标
# HELP bentoml_service_adaptive_batch_size Service adaptive batch size # TYPE bentoml_service_adaptive_batch_size histogram bentoml_service_adaptive_batch_size_sum{method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 1000.0 bentoml_service_adaptive_batch_size_bucket{le="1.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 66.0 bentoml_service_adaptive_batch_size_bucket{le="2.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 111.0 bentoml_service_adaptive_batch_size_bucket{le="4.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 171.0 bentoml_service_adaptive_batch_size_bucket{le="8.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 227.0 bentoml_service_adaptive_batch_size_bucket{le="16.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 249.0 bentoml_service_adaptive_batch_size_bucket{le="32.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0 bentoml_service_adaptive_batch_size_bucket{le="64.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0 bentoml_service_adaptive_batch_size_bucket{le="100.0",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0 bentoml_service_adaptive_batch_size_bucket{le="+Inf",method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0 bentoml_service_adaptive_batch_size_count{method_name="predict",runner_name="IrisClassifier",service_name="IrisClassifier",service_version="not available",worker_index="1"} 251.0
对于协作开发多个模型和项目的较大团队,BentoML 提供了标准化 ML 服务开发的工具。
首先,创建一个定义共享组件的 common.py
文件
# common.py import bentoml import numpy as np import numpy.typing as npt from pydantic import Field from bentoml.validators import Shape, DType from typing import Annotated my_image = bentoml.images.PythonImage(python_version="3.11") \ .python_packages("mlflow", "scikit-learn") class MyInputParams(bentoml.IODescriptor): input_data: Annotated[npt.NDArray[np.float64], Shape((-1, 4)), DType("float64")] = Field(default=[[0.1, 0.4, 0.2, 1.0]]) client_id: str
然后,在您的 Service 中使用这些组件
import bentoml import numpy as np import numpy.typing as npt from bentoml.models import BentoModel from common import MyInputParams, my_image @bentoml.service( image=my_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: bento_model = BentoModel("iris:latest") def __init__(self): self.model = bentoml.mlflow.load_model(self.bento_model) @bentoml.api(input_spec=MyInputParams) def predict( self, input_data, client_id, ) -> list[str]: print(f"processing request form user {client_id}") rv = self.model.predict(input_data) return np.asarray(rv)
BentoML 可以轻松地在单个 Service(或分布式 Services)中部署多个模型。
import bentoml import numpy as np import numpy.typing as npt from bentoml.models import BentoModel from common import MyInputParams, my_image @bentoml.service( image=my_image, resources={"cpu": "2"}, traffic={"timeout": 10}, ) class IrisClassifier: bento_model_1 = BentoModel("iris:v1") bento_model_2 = BentoModel("iris:v2") def __init__(self): self.model_1 = bentoml.mlflow.load_model(self.bento_model_1) self.model_2 = bentoml.mlflow.load_model(self.bento_model_2) @bentoml.api(route="/v1/predict", input_spec=MyInputParams) def predict_1( self, input_data, client_id, ) -> np.ndarray: rv = self.model_1.predict(input_data) return np.asarray(rv) @bentoml.api(route="/v2/predict", input_spec=MyInputParams) def predict_2( self, input_data, client_id, ) -> np.ndarray: rv = self.model_2.predict(input_data) return np.asarray(rv) # Combine predictions @bentoml.api(input_spec=MyInputParams) def predict_combined( self, input_data, client_id, ) -> np.ndarray: rv_a = self.model_1.predict(input_data) rv_b = self.model_2.predict(input_data) return np.asarray([rv_a, rv_b])
这种方法允许您
有关更多信息,请参阅BentoML 关于多模型组合的文档。
BentoML 为生产部署提供了多种选项
容器化:为您的 ML 服务构建符合 OCI 规范的镜像,以便在任何容器平台上部署
bentoml build bentoml containerize iris_classifier:latest
有关更多详细信息,请参阅容器化指南。
BentoCloud:注册 BentoCloud,直接部署到统一推理平台,享受便捷管理、快速自动扩缩容、企业级安全和全面的可观测性
bentoml deploy
有关更多详细信息,请参阅云部署指南。
在本教程中,我们了解了 MLflow 和 BentoML 如何协同工作,创建从实验到生产的无缝工作流程
这种集成使得数据科学家能够专注于模型开发,同时确保他们的模型能够可靠地部署到生产环境。查看以下内容了解更多信息