自学内容网 自学内容网

fastapi入门

好的,我将为您生成一个更详细、易于理解的 FastAPI 入门学习文档,特别是对复杂概念(如依赖注入)进行深入解释,帮助您在没有太多基础的情况下也能顺利学习和掌握 FastAPI。


FastAPI 入门学习文档

目录

  1. 简介
  2. 环境搭建
  3. FastAPI 基础
  4. 深入理解 FastAPI
  5. 与数据库集成
  6. 安全与认证
  7. 部署 FastAPI 应用
  8. 附加资源
  9. 总结

1. 简介

FastAPI 是一个用于构建 API 的现代、快速(高性能)的 Web 框架,基于 Python 3.6+ 的类型提示(type hints)。它具有以下特点:

  • 高性能:性能与 Node.js 和 Go 等语言的框架相当,适合高并发、高性能的应用。
  • 快速开发:减少代码量,提高开发效率,支持自动生成文档。
  • 类型安全:利用 Python 的类型提示,提供更好的代码补全和类型检查,减少错误。
  • 自动文档:自动生成交互式 API 文档,基于 OpenAPI(Swagger)和 JSON Schema。
  • 异步支持:原生支持异步编程(async/await),提升高并发场景下的性能。

2. 环境搭建

在开始使用 FastAPI 之前,需要进行环境的搭建。

2.1 安装 Python

首先,确保已安装 Python 3.6 及以上版本。

  • 检查 Python 版本:

    python --version
    

    如果输出的版本号小于 3.6,请前往 Python 官网 下载并安装最新版本。

2.2 创建虚拟环境

使用虚拟环境可以隔离项目的依赖,避免与其他项目的库发生冲突。

  • 创建虚拟环境:

    # 创建名为 venv 的虚拟环境
    python -m venv venv
    
  • 激活虚拟环境:

    • Windows:

      venv\Scripts\activate
      
    • macOS/Linux:

      source venv/bin/activate
      

    激活后,命令行前会出现 (venv),表示已进入虚拟环境。

2.3 安装 FastAPI 和 Uvicorn

  • 安装 FastAPI:

    pip install fastapi
    
  • 安装 Uvicorn:

    Uvicorn 是一个用于运行 ASGI 应用的高性能服务器。

    pip install "uvicorn[standard]"
    

3. FastAPI 基础

3.1 创建第一个 FastAPI 应用

  • 创建 main.py 文件:

    from fastapi import FastAPI
    
    app = FastAPI()
    
    @app.get("/")
    async def read_root():
        return {"message": "Hello, FastAPI!"}
    

    解释:

    • 导入 FastAPI 类:

      from fastapi import FastAPI
      
    • 创建 FastAPI 应用实例:

      app = FastAPI()
      
    • 定义路径操作(路由):

      @app.get("/")
      async def read_root():
          return {"message": "Hello, FastAPI!"}
      
      • @app.get("/") 是一个装饰器,表示当用户访问根路径 / 时,执行下面的 read_root 函数。
      • async def 定义了一个异步函数,后续会详细介绍异步编程。
      • 函数返回一个字典,FastAPI 会自动将其转换为 JSON 格式的响应。
  • 运行应用:

    在终端中,运行以下命令启动应用:

    uvicorn main:app --reload
    

    解释:

    • uvicorn 是运行 ASGI 应用的服务器。
    • main:app 指的是 main.py 文件中的 app 对象。
    • --reload 选项使得应用在代码更改时自动重新加载,方便开发。
  • 访问应用:

    打开浏览器,访问 http://127.0.0.1:8000/,应看到以下 JSON 响应:

    {
      "message": "Hello, FastAPI!"
    }
    
  • 查看自动生成的文档:

    FastAPI 提供了自动生成的 API 文档:

    • Swagger UI:http://127.0.0.1:8000/docs
    • ReDoc:http://127.0.0.1:8000/redoc

    在这些页面上,你可以看到应用的 API 结构,并进行交互式测试。

3.2 路径操作(路由)

路径操作用于定义应用响应特定路径和 HTTP 方法的函数。

  • 定义路径参数:

    @app.get("/items/{item_id}")
    async def read_item(item_id: int):
        return {"item_id": item_id}
    

    解释:

    • {item_id} 表示路径参数,item_id 的值将从请求的 URL 中提取。
    • 在函数参数中,声明 item_id: int,表示 item_id 的类型为整数。
    • FastAPI 会自动进行类型转换和验证,如果传入的 item_id 不能转换为整数,将返回一个 422 错误(Unprocessable Entity)。
  • 访问示例:

    访问 http://127.0.0.1:8000/items/5,将返回:

    {
      "item_id": 5
    }
    
  • 指定请求方法:

    FastAPI 支持以下 HTTP 方法:

    • @app.get():处理 GET 请求
    • @app.post():处理 POST 请求
    • @app.put():处理 PUT 请求
    • @app.delete():处理 DELETE 请求
    • @app.patch():处理 PATCH 请求

3.3 请求参数

请求参数包括路径参数、查询参数和请求体。

  • 查询参数:

    @app.get("/items/")
    async def read_items(skip: int = 0, limit: int = 10):
        return {"skip": skip, "limit": limit}
    

    解释:

    • skip: int = 0limit: int = 10 是查询参数,具有默认值。
    • 如果在 URL 中提供了 skiplimit,将覆盖默认值。
    • 查询参数是可选的,如果未提供,则使用默认值。
  • 访问示例:

    • 访问 http://127.0.0.1:8000/items/?skip=5&limit=20,返回:

      {
        "skip": 5,
        "limit": 20
      }
      
  • 路径参数和查询参数组合:

    @app.get("/users/{user_id}/items/{item_id}")
    async def read_user_item(user_id: int, item_id: str, q: str = None):
        item = {"user_id": user_id, "item_id": item_id}
        if q:
            item["q"] = q
        return item
    

    解释:

    • user_iditem_id 是路径参数,从 URL 中提取。
    • q 是查询参数,默认为 None,表示可选。
    • 根据是否提供查询参数 q,返回的数据会有所不同。

3.4 数据模型(Pydantic 模型)

FastAPI 使用 Pydantic 来定义数据模型和进行数据验证。

  • 安装 Pydantic:

    Pydantic 是 FastAPI 的依赖,一般情况下已经安装,无需单独安装。

  • 定义数据模型:

    from pydantic import BaseModel
    
    class Item(BaseModel):
        name: str
        description: str = None
        price: float
        tax: float = None
    

    解释:

    • 继承自 BaseModel

      Pydantic 的所有模型都需要继承自 BaseModel

    • 定义字段和类型:

      • name: str:必须的字符串字段。
      • description: str = None:可选的字符串字段,默认为 None
      • price: float:必须的浮点数字段。
      • tax: float = None:可选的浮点数字段。
    • 类型提示:

      使用类型提示(Type Hints)定义字段的类型,Pydantic 会根据这些类型进行数据验证和解析。

  • 接收请求体:

    @app.post("/items/")
    async def create_item(item: Item):
        return item
    

    解释:

    • 函数参数 item: Item 指定请求体将被解析为 Item 类型的对象。
    • FastAPI 会自动从请求体中读取 JSON 数据,并转换为 Item 对象。
    • 如果请求体的数据不符合 Item 模型的定义(如缺少必须字段、类型错误等),FastAPI 会返回 422 错误,指出具体的验证错误。
  • 请求示例:

    发送 POST 请求到 /items/,请求体为:

    {
      "name": "Apple",
      "description": "A juicy fruit",
      "price": 1.2,
      "tax": 0.1
    }
    

    服务器将返回相同的数据,表示成功接收。

3.5 响应模型

可以使用响应模型来控制返回的数据结构。

  • 指定响应模型:

    @app.post("/items/", response_model=Item)
    async def create_item(item: Item):
        return item
    

    解释:

    • response_model=Item 指定了响应的数据模型为 Item
    • 即使函数内部返回了更多数据,FastAPI 也只会根据 Item 模型来过滤和返回字段。
  • 示例:

    @app.post("/items/", response_model=Item)
    async def create_item(item: Item):
        return {"name": item.name, "description": item.description, "price": item.price, "tax": item.tax, "id": 123}
    

    即使返回的字典中包含了 id 字段,响应中也不会包含该字段,因为 Item 模型中未定义 id


4. 深入理解 FastAPI

4.1 依赖注入(Dependency Injection)

依赖注入(Dependency Injection)是一种设计模式,用于将组件的依赖关系从内部移到外部,使得组件更加灵活和可测试。

在 FastAPI 中,依赖注入用于复用代码、共享逻辑,如验证、数据库连接等。

4.1.1 什么是依赖注入
  • 传统方式:

    在函数内部直接调用其他函数或创建对象。

    def process():
        db = get_db()
        data = db.fetch_data()
        return data
    

    缺点:

    • 难以复用代码。
    • 不易于测试和维护。
  • 依赖注入方式:

    将依赖关系作为参数传入函数,由外部提供。

    def process(db = Depends(get_db)):
        data = db.fetch_data()
        return data
    

    优点:

    • 提高代码复用性。
    • 方便测试和替换依赖项。
    • FastAPI 会自动管理依赖项的创建和销毁。
4.1.2 FastAPI 中的依赖注入
  • 使用 Depends 声明依赖项:

    from fastapi import Depends
    
    def get_db():
        db = DatabaseConnection()
        try:
            yield db
        finally:
            db.close()
    
    @app.get("/items/")
    async def read_items(db = Depends(get_db)):
        items = db.fetch_items()
        return items
    

    解释:

    • get_db 函数是一个依赖项,返回一个数据库连接对象。
    • read_items 函数中,使用 db = Depends(get_db) 声明了对 get_db 的依赖。
    • FastAPI 会在调用 read_items 时,自动执行 get_db,将返回的 db 对象作为参数传入 read_items
  • 依赖项可以是同步或异步的函数:

    • 同步函数:普通的 def 函数。
    • 异步函数:使用 async def 定义。
  • 依赖项中的 yield

    • 当依赖项函数使用 yield 时,表示这是一个 上下文管理器,在请求结束后会执行 finally 部分。
    • 这对于需要在请求结束后进行清理的操作(如关闭数据库连接)非常有用。
  • 多级依赖:

    依赖项本身也可以依赖其他依赖项,形成依赖树。

    def get_token_header(x_token: str = Header(...)):
        if x_token != "fake-super-secret-token":
            raise HTTPException(status_code=400, detail="Invalid X-Token header")
    
    def get_db(token = Depends(get_token_header)):
        db = DatabaseConnection()
        yield db
        db.close()
    
    @app.get("/items/")
    async def read_items(db = Depends(get_db)):
        items = db.fetch_items()
        return items
    

    解释:

    • get_token_header 依赖从请求头中获取 X-Token,并进行验证。
    • get_db 依赖 get_token_header,只有在验证通过后才会获取数据库连接。
    • read_items 依赖 get_db,最终形成依赖链。
4.1.3 示例:公共参数
  • 定义一个依赖项,提供公共查询参数:

    from typing import Optional
    
    def common_parameters(q: Optional[str] = None, skip: int = 0, limit: int = 10):
        return {"q": q, "skip": skip, "limit": limit}
    
    @app.get("/items/")
    async def read_items(commons: dict = Depends(common_parameters)):
        return commons
    
    @app.get("/users/")
    async def read_users(commons: dict = Depends(common_parameters)):
        return commons
    

    解释:

    • common_parameters 函数定义了常用的查询参数 qskiplimit
    • 在路径操作函数中,使用 commons: dict = Depends(common_parameters) 声明依赖。
    • FastAPI 会自动执行 common_parameters,并将返回值(字典)传递给路径操作函数。
    • 这样,多个路径操作函数可以共享相同的查询参数逻辑,避免重复代码。
4.1.4 依赖注入的应用场景
  • 数据库连接和会话管理
  • 验证和授权
  • 配置和环境变量
  • 日志记录
  • 缓存

理解中间件(Middleware)对于构建功能丰富且高效的 FastAPI 应用至关重要。为了帮助您更好地掌握这一概念,我将重新整理并详细解释 4.2 中间件 部分,同时保持整体文档的平衡和简洁。


4.2 中间件

中间件(Middleware)是在 请求(Request)和 响应(Response)之间执行的函数。它们可以用于执行一些在每个请求或响应中重复的任务,如日志记录、认证、修改请求或响应等。可以将中间件类比为**“中间人”**,它们在请求到达路径操作函数之前或响应返回给客户端之前进行处理。

4.2.1 什么是中间件?

  • 定义: 中间件是在请求和响应过程中拦截并处理这些请求和响应的组件。
  • 作用:
    • 日志记录: 记录每个请求的详细信息,如请求路径、方法、处理时间等。
    • 认证与授权: 检查请求是否具有访问特定资源的权限。
    • 修改请求或响应: 如添加自定义头部、处理跨域请求(CORS)等。
    • 错误处理: 捕获并处理全局错误,统一返回错误响应。

4.2.2 FastAPI 中的中间件工作原理

在 FastAPI 中,中间件是使用装饰器 @app.middleware("http") 注册的异步函数。当一个请求到达 FastAPI 应用时,所有注册的中间件会按注册顺序依次执行,处理请求并在响应返回时再次按相反的顺序处理响应。

中间件的执行流程:
  1. 请求到达中间件: 请求首先由第一个中间件接收。
  2. 中间件预处理: 中间件可以在调用下一个处理程序之前,对请求进行处理(如记录日志、验证认证)。
  3. 调用下一个处理程序: 使用 call_next(request) 将请求传递给下一个中间件或最终的路径操作函数。
  4. 响应返回中间件: 响应通过中间件链返回,中间件可以在返回响应之前进行后处理(如修改响应头)。
  5. 最终响应返回客户端: 经过所有中间件处理后的响应被发送给客户端。

4.2.3 中间件函数的组成部分

一个典型的 FastAPI 中间件函数包含以下部分:

  1. 装饰器 @app.middleware("http") 用于注册中间件函数,并指定其作用于 HTTP 请求。
  2. 请求对象(Request): 包含了请求的所有信息,如路径、方法、头部、体等。
  3. call_next 函数: 一个特殊的函数,用于将请求传递给下一个中间件或最终的路径操作函数。
  4. 响应对象(Response): 由路径操作函数或下一个中间件生成的响应。

4.2.4 详细示例:记录请求处理时间

让我们通过一个具体的示例来深入理解中间件的工作原理。

示例代码:
from fastapi import FastAPI, Request
import time

app = FastAPI()

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    """
    中间件函数:记录请求的处理时间,并将其添加到响应头中。

    参数:
    - request: Request 对象,包含请求的详细信息。
    - call_next: 函数,用于调用下一个中间件或路径操作函数。

    返回:
    - response: 经过处理的响应对象。
    """
    start_time = time.time()  # 记录请求开始时间
    response = await call_next(request)  # 将请求传递给下一个处理程序
    process_time = time.time() - start_time  # 计算处理时间
    response.headers["X-Process-Time"] = f"{process_time:.4f} seconds"  # 添加处理时间到响应头
    return response  # 返回修改后的响应
解释:
  1. 注册中间件:

    @app.middleware("http")
    async def add_process_time_header(request: Request, call_next):
        ...
    
    • @app.middleware("http") 装饰器告诉 FastAPI 这是一个 HTTP 中间件。
    • 中间件函数必须接收两个参数:
      • request:当前的请求对象。
      • call_next:用于将请求传递给下一个中间件或路径操作函数的函数。
  2. 记录开始时间:

    start_time = time.time()
    
    • 使用 time.time() 记录当前时间,以计算处理请求所需的时间。
  3. 调用下一个处理程序:

    response = await call_next(request)
    
    • call_next(request) 将请求传递给下一个中间件或最终的路径操作函数。
    • 使用 await 确保异步调用,防止阻塞。
  4. 计算处理时间:

    process_time = time.time() - start_time
    
    • 计算从记录开始时间到收到响应的时间差。
  5. 添加处理时间到响应头:

    response.headers["X-Process-Time"] = f"{process_time:.4f} seconds"
    
    • 将处理时间作为自定义头部 X-Process-Time 添加到响应中。
  6. 返回响应:

    return response
    
    • 返回修改后的响应对象。
路径操作函数示例:

为了完整演示中间件的作用,我们创建一个简单的路径操作函数:

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    """
    路径操作函数:返回指定 item_id 的项目信息。
    """
    return {"item_id": item_id, "name": f"Item {item_id}"}

完整示例代码:

from fastapi import FastAPI, Request
import time

app = FastAPI()

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    """
    中间件函数:记录请求的处理时间,并将其添加到响应头中。
    """
    start_time = time.time()
    response = await call_next(request)
    process_time = time.time() - start_time
    response.headers["X-Process-Time"] = f"{process_time:.4f} seconds"
    return response

@app.get("/items/{item_id}")
async def read_item(item_id: int):
    """
    路径操作函数:返回指定 item_id 的项目信息。
    """
    return {"item_id": item_id, "name": f"Item {item_id}"}

运行应用并测试:

  1. 启动应用:

    uvicorn main:app --reload
    
  2. 访问路径操作函数:

    打开浏览器,访问 http://127.0.0.1:8000/items/5,您将看到以下 JSON 响应:

    {
      "item_id": 5,
      "name": "Item 5"
    }
    
  3. 查看响应头:

    使用浏览器的开发者工具或工具如 Postman,查看响应头,您会看到 X-Process-Time 头部,显示请求处理所需的时间。例如:

    X-Process-Time: 0.0002 seconds
    

4.2.5 中间件的执行顺序

理解中间件的执行顺序有助于正确设计和调试应用。中间件按照注册顺序依次执行,响应则按相反的顺序返回。

示例:

假设我们有两个中间件 middleware_onemiddleware_two,它们按以下顺序注册:

@app.middleware("http")
async def middleware_one(request: Request, call_next):
    print("Middleware One: Before")
    response = await call_next(request)
    print("Middleware One: After")
    return response

@app.middleware("http")
async def middleware_two(request: Request, call_next):
    print("Middleware Two: Before")
    response = await call_next(request)
    print("Middleware Two: After")
    return response

执行流程:

  1. 请求到达 middleware_one

    Middleware One: Before
    
  2. 请求传递到 middleware_two

    Middleware Two: Before
    
  3. 请求传递到路径操作函数:

    # 执行路径操作函数
    
  4. 响应返回到 middleware_two

    Middleware Two: After
    
  5. 响应返回到 middleware_one

    Middleware One: After
    

控制台输出:

Middleware One: Before
Middleware Two: Before
Middleware Two: After
Middleware One: After
客户端请求
      |
      v
+------------------+
|   中间件1 (请求)   |
+------------------+
          |
          v
+------------------+
|   中间件2 (请求)   |
+------------------+
          |
          v
+------------------+
| 路径操作函数 (处理) |
+------------------+
          |
          v
+------------------+
|   中间件2 (响应)   |
+------------------+
          |
          v
+------------------+
|   中间件1 (响应)   |
+------------------+
          |
          v
    返回响应给客户端

4.2.6 详细示例:身份认证中间件

让我们创建一个中间件,检查每个请求是否包含特定的认证头部 X-API-KEY,如果没有则返回 401 未授权错误。

示例代码:
from fastapi import FastAPI, Request, HTTPException
from starlette.status import HTTP_401_UNAUTHORIZED

app = FastAPI()

API_KEY = "mysecretapikey"

@app.middleware("http")
async def verify_api_key(request: Request, call_next):
    """
    中间件函数:验证请求是否包含正确的 X-API-KEY 头部。
    """
    api_key = request.headers.get("X-API-KEY")
    if api_key != API_KEY:
        raise HTTPException(
            status_code=HTTP_401_UNAUTHORIZED,
            detail="Invalid or missing API Key",
        )
    response = await call_next(request)
    return response

@app.get("/protected")
async def protected_route():
    """
    受保护的路径操作函数,只有通过中间件验证的请求才能访问。
    """
    return {"message": "You have access to this protected route!"}
解释:
  1. 定义 API 密钥:

    API_KEY = "mysecretapikey"
    
    • 这是预定义的密钥,客户端必须在请求头中提供此密钥才能访问受保护的资源。
  2. 注册中间件:

    @app.middleware("http")
    async def verify_api_key(request: Request, call_next):
        ...
    
    • 中间件函数 verify_api_key 将验证每个请求的 X-API-KEY 头部。
  3. 获取并验证 API 密钥:

    api_key = request.headers.get("X-API-KEY")
    if api_key != API_KEY:
        raise HTTPException(
            status_code=HTTP_401_UNAUTHORIZED,
            detail="Invalid or missing API Key",
        )
    
    • 从请求头中获取 X-API-KEY
    • 如果密钥不匹配,抛出一个 HTTP 401 异常,表示未授权。
  4. 传递请求并返回响应:

    response = await call_next(request)
    return response
    
    • 如果密钥验证通过,将请求传递给下一个中间件或路径操作函数。
    • 最终返回响应。
  5. 定义受保护的路径操作函数:

    @app.get("/protected")
    async def protected_route():
        return {"message": "You have access to this protected route!"}
    
    • 只有在中间件验证通过后,才能访问此路径。
测试:
  1. 未提供 API 密钥:

    • 请求:

      curl -X GET "http://127.0.0.1:8000/protected"
      
    • 响应:

      {
        "detail": "Invalid or missing API Key"
      }
      
  2. 提供正确的 API 密钥:

    • 请求:

      curl -X GET "http://127.0.0.1:8000/protected" -H "X-API-KEY: mysecretapikey"
      
    • 响应:

      {
        "message": "You have access to this protected route!"
      }
      

4.2.7 中间件的最佳实践

  1. 保持中间件简洁: 中间件应专注于一个特定的任务,避免过于复杂。
  2. 正确处理异步: 使用 asyncawait 关键字确保中间件不会阻塞事件循环,特别是在处理 I/O 密集型任务时。
  3. 错误处理: 在中间件中捕获可能的异常,并返回适当的错误响应,避免应用崩溃。
  4. 顺序注册: 中间件的注册顺序影响它们的执行顺序。确保按照逻辑顺序注册中间件,例如先进行认证,再进行日志记录。
  5. 避免过多依赖: 尽量减少中间件之间的依赖关系,保持它们的独立性,便于维护和测试。

4.3 错误处理

FastAPI 提供了简洁的错误处理方式,可以自定义异常和响应。

4.3.1 抛出 HTTPException
  • 示例:

    from fastapi import HTTPException
    
    @app.get("/items/{item_id}")
    async def read_item(item_id: int):
        if item_id not in items_db:
            raise HTTPException(status_code=404, detail="Item not found")
        return items_db[item_id]
    

    解释:

    • HTTPException 用于在路径操作函数中抛出 HTTP 错误。
    • status_code 指定 HTTP 状态码。
    • detail 提供错误的详细信息。
4.3.2 自定义异常处理器
  • 示例:处理自定义异常

    from fastapi import Request
    from fastapi.responses import JSONResponse
    
    class UnicornException(Exception):
        def __init__(self, name: str):
            self.name = name
    
    @app.exception_handler(UnicornException)
    async def unicorn_exception_handler(request: Request, exc: UnicornException):
        return JSONResponse(
            status_code=418,
            content={"message": f"Oh no! {exc.name} did something wrong."},
        )
    
    @app.get("/unicorns/{name}")
    async def read_unicorn(name: str):
        if name == "yolo":
            raise UnicornException(name=name)
        return {"unicorn_name": name}
    

    解释:

    • 定义了一个自定义异常 UnicornException
    • 使用 @app.exception_handler 注册异常处理器,当抛出 UnicornException 时,执行处理器函数。
    • 在处理器中,返回自定义的响应。

4.4 异步支持

FastAPI 原生支持异步编程,允许使用 asyncawait 关键字定义异步函数。

4.4.1 什么是异步编程
  • 同步编程:
    • 代码按顺序执行,一个任务完成后才能执行下一个任务。
    • 在 I/O 操作(如网络请求、文件读写)时,程序会等待操作完成,导致阻塞。
  • 异步编程:
    • 通过事件循环(Event Loop)调度任务,允许在等待 I/O 操作时执行其他任务。
    • 提高程序的并发能力,适合 I/O 密集型的应用。
4.4.2 在 FastAPI 中使用异步函数
  • 定义异步路径操作函数:

    @app.get("/async-items/")
    async def read_async_items():
        await some_async_task()
        return [{"item_id": "Foo"}]
    

    解释:

    • 使用 async def 定义异步函数。
    • 可以使用 await 调用其他异步函数。
  • 注意事项:

    • 如果函数不涉及 I/O 操作,可以使用普通的同步函数(def)。
    • 不要在同步函数中调用异步函数(即不要在同步函数中使用 await)。
4.4.3 异步库的使用
  • 异步数据库驱动:

    • 使用支持异步的数据库驱动,如 asyncpgdatabases 等。
  • 示例:

    from databases import Database
    
    database = Database("sqlite:///./test.db")
    
    @app.on_event("startup")
    async def startup():
        await database.connect()
    
    @app.on_event("shutdown")
    async def shutdown():
        await database.disconnect()
    
    @app.get("/items/")
    async def read_items():
        query = "SELECT * FROM items"
        return await database.fetch_all(query)
    

    解释:

    • 在应用启动和关闭时,连接和断开数据库。
    • 使用异步的数据库查询,提高性能。
  • @app.on_event的作用

    • @app.on_event是FastAPI提供的一个装饰器,用于定义在特定事件发生时执行的函数。
    • FastAPI的声明周期时间包括:
      • startup:在应用启动时触发,仅执行一次。
      • shutdown:在应用关闭时触发,仅执行一次。
    • startup事件
      • 当FastAPI应用启动时会触发startup事件。
      • 可以在这个时间中执行一些初始化操作,例如:
        • 连接数据库。
        • 初始化缓存。
        • 加载配置文件。
        • 初始化全局变量或状态。
    • shutdown事件
      • 当FastAPI应用关闭时会触发shutdown事件。
      • 通常用于清理资源,例如:
        • 关闭数据库连接。
        • 清理缓存。
        • 停止后台任务。

示例操作:

from fastapi import FastAPI
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker

app = FastAPI()

# 配置异步数据库引擎
DATABASE_URL = "sqlite+aiosqlite:///./test.db"
engine = create_async_engine(DATABASE_URL, echo=True)
SessionLocal = sessionmaker(bind=engine, class_=AsyncSession, expire_on_commit=False)

# 应用启动时连接数据库
@app.on_event("startup")
async def startup():
    print("Connecting to the database...")
    # 在这里可以进行一些额外的初始化逻辑,例如检查数据库状态等

# 应用关闭时断开数据库连接
@app.on_event("shutdown")
async def shutdown():
    print("Disconnecting from the database...")
    await engine.dispose()

解释:

  1. @app.on_event("startup")
    • 在应用启动时,打印 “Connecting to the database…”,并可添加其他逻辑(如测试数据库连接)。
  2. @app.on_event("shutdown")
    • 在应用关闭时,打印 “Disconnecting from the database…”,并调用 engine.dispose() 释放数据库资源。

5. 与数据库集成

5.1 使用 SQLAlchemy

SQLAlchemy 是 Python 中广泛使用的 ORM(对象关系映射)库,可以将数据库表映射为 Python 类。

5.1.1 安装依赖
pip install sqlalchemy
5.1.2 创建数据库引擎
from sqlalchemy import create_engine

DATABASE_URL = "sqlite:///./test.db"

engine = create_engine(
    DATABASE_URL, connect_args={"check_same_thread": False}
)

解释:

  • DATABASE_URL 指定了数据库的连接 URL,此处使用 SQLite 数据库,文件名为 test.db
  • connect_args={"check_same_thread": False} 是 SQLite 特有的参数,允许多个线程访问同一个数据库连接。
5.1.3 创建数据库会话
from sqlalchemy.orm import sessionmaker

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

解释:

  • SessionLocal 是一个本地的数据库会话类,每个请求都应创建一个新的会话实例。
5.1.4 定义模型
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Float

Base = declarative_base()

class Item(Base):
    __tablename__ = "items"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    description = Column(String, nullable=True)
    price = Column(Float)
    tax = Column(Float, nullable=True)

解释:

  • Base 是所有模型类的基类,使用 declarative_base() 创建。
  • Item 类定义了数据库中的 items 表,每个属性对应一个字段。
5.1.5 创建数据库表
Base.metadata.create_all(bind=engine)

解释:

  • 这行代码会在数据库中创建所有继承自 Base 的模型对应的表。
5.1.6 依赖注入数据库会话
from fastapi import Depends

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

解释:

  • get_db 是一个依赖项函数,返回一个数据库会话。
  • 使用 yielddb 对象提供给依赖它的函数,在请求结束后自动关闭会话。
5.1.7 在路径操作中使用数据库
from sqlalchemy.orm import Session
from fastapi import Depends

@app.post("/items/", response_model=schemas.Item)
async def create_item(item: schemas.ItemCreate, db: Session = Depends(get_db)):
    db_item = models.Item(name=item.name, description=item.description, price=item.price, tax=item.tax)
    db.add(db_item)
    db.commit()
    db.refresh(db_item)
    return db_item

解释:

  • db: Session = Depends(get_db) 声明了对数据库会话的依赖。
  • 在函数中使用 db 进行数据库操作,如添加、提交、刷新。

5.2 使用 SQLModel(推荐)

SQLModel 是一个基于 Pydantic 和 SQLAlchemy 的库,简化了模型定义,统一了数据模型和 ORM 模型。

5.2.1 安装 SQLModel
pip install sqlmodel
5.2.2 定义模型
from typing import Optional
from sqlmodel import SQLModel, Field

class Item(SQLModel, table=True):
    id: Optional[int] = Field(default=None, primary_key=True)
    name: str
    description: Optional[str] = None
    price: float
    tax: Optional[float] = None

解释:

  • 继承自 SQLModel,并设置 table=True,表示这是一个数据库表模型。
  • 使用类型提示定义字段,Field 用于设置额外的元数据,如 primary_key
5.2.3 创建数据库引擎
from sqlmodel import create_engine

engine = create_engine("sqlite:///database.db")
5.2.4 创建数据库表
SQLModel.metadata.create_all(engine)
5.2.5 使用数据库会话
from sqlmodel import Session

@app.post("/items/", response_model=Item)
def create_item(item: Item):
    with Session(engine) as session:
        session.add(item)
        session.commit()
        session.refresh(item)
        return item

解释:

  • 使用 with Session(engine) as session 创建一个会话。
  • 直接使用 item 对象进行数据库操作,无需手动转换。

5.3 数据库迁移

使用 Alembic 管理数据库迁移,跟踪数据库模式的变化。

5.3.1 安装 Alembic
pip install alembic
5.3.2 初始化 Alembic
alembic init alembic
5.3.3 配置 Alembic
  • 编辑 alembic.ini 文件,设置数据库连接字符串。

    sqlalchemy.url = sqlite:///./test.db
    
  • 编辑 alembic/env.py,引入模型的元数据。

    from models import Base
    target_metadata = Base.metadata
    
5.3.4 创建迁移脚本
alembic revision --autogenerate -m "Initial migration"
5.3.5 应用迁移
alembic upgrade head

6. 安全与认证

6.1 OAuth2 与 JWT

使用 OAuth2JWT(JSON Web Tokens)实现基于令牌的认证。

6.1.1 安装依赖
pip install python-multipart
pip install python-jose[cryptography]
pip install passlib[bcrypt]
  • python-multipart:处理表单数据。
  • python-jose:生成和验证 JWT。
  • passlib[bcrypt]:处理密码哈希。
6.1.2 配置安全依赖
from fastapi.security import OAuth2PasswordBearer

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

解释:

  • OAuth2PasswordBearer 指定了获取令牌的 URL(/token)。
  • oauth2_scheme 是一个依赖项,用于从请求中获取 Authorization 头中的 Bearer 令牌。
6.1.3 创建令牌
from datetime import datetime, timedelta
from jose import JWTError, jwt

SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def create_access_token(data: dict):
    to_encode = data.copy()
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

解释:

  • SECRET_KEY:用于加密和解密 JWT 的密钥,应设置为随机字符串并保密。
  • ALGORITHM:加密算法,通常使用 HS256
  • ACCESS_TOKEN_EXPIRE_MINUTES:令牌的有效期。
  • create_access_token:创建 JWT,包含传入的数据和过期时间。
6.1.4 验证令牌
from fastapi import Depends, HTTPException, status
from jose import JWTError, jwt

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        # 从数据库中获取用户对象
        user = get_user(username)
        if user is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    return user

解释:

  • 使用 jwt.decode 解码令牌,获取负载数据。
  • 验证令牌是否有效,是否包含 username
  • 根据 username 获取用户对象,若不存在则抛出异常。
6.1.5 创建登录接口
from fastapi.security import OAuth2PasswordRequestForm

@app.post("/token")
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    access_token = create_access_token(data={"sub": user.username})
    return {"access_token": access_token, "token_type": "bearer"}

解释:

  • 使用 OAuth2PasswordRequestForm 处理表单数据,包含 usernamepassword
  • 验证用户名和密码,成功后创建并返回令牌。

6.2 基于表单的认证

  • 处理表单数据:

    from fastapi import Form
    
    @app.post("/login")
    async def login(username: str = Form(...), password: str = Form(...)):
        # 验证用户名和密码
        return {"message": "Login successful"}
    

    解释:

    • 使用 Form(...) 指定参数从表单数据中获取。
    • 处理登录逻辑,返回响应。

7. 部署 FastAPI 应用

7.1 使用 Uvicorn

  • 运行应用:

    uvicorn main:app --host 0.0.0.0 --port 80
    

    解释:

    • --host 0.0.0.0:监听所有网络接口。
    • --port 80:使用 80 端口。

7.2 使用 Gunicorn 和 Uvicorn Workers

  • 安装 Gunicorn:

    pip install gunicorn
    
  • 运行应用:

    gunicorn -w 4 -k uvicorn.workers.UvicornWorker main:app
    

    解释:

    • -w 4:启动 4 个工作进程。
    • -k uvicorn.workers.UvicornWorker:使用 Uvicorn 的工作进程。

7.3 Docker 部署

7.3.1 创建 Dockerfile
FROM python:3.9-slim

WORKDIR /app

COPY requirements.txt .

RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "80"]

解释:

  • 基于官方的 Python 3.9 精简版镜像。
  • 复制项目文件,安装依赖。
  • 使用 Uvicorn 启动应用。
7.3.2 构建 Docker 镜像
docker build -t myfastapiapp .
7.3.3 运行 Docker 容器
docker run -d -p 80:80 myfastapiapp

简单版本

以下是一个简单的 FastAPI 演示代码,帮助你理解服务器和路由的基本概念:

from fastapi import FastAPI

# 创建 FastAPI 应用实例
app = FastAPI()

# 定义一个根路由,返回欢迎信息
@app.get("/")
async def read_root():
    return {"message": "欢迎来到我的 FastAPI 应用!"}

# 定义一个用户信息的路由,接收用户 ID
@app.get("/users/{user_id}")
async def read_user(user_id: int):
    return {"user_id": user_id, "name": "用户名称"}

# 定义一个创建用户的路由,使用 POST 方法
@app.post("/users/")
async def create_user(user: dict):
    return {"message": "用户已创建", "user": user}

# 运行应用
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

代码解析:

  1. 创建应用: app = FastAPI() 创建一个 FastAPI 应用实例。
  2. 路由定义:
    • @app.get("/"): 定义一个 GET 请求的路由,访问根 URL 返回欢迎信息。
    • @app.get("/users/{user_id}"): 定义一个动态路由,通过 URL 参数接收 user_id,返回该用户的 ID 和名称。
    • @app.post("/users/"): 定义一个 POST 请求的路由,接收用户信息并返回创建结果。
  3. 运行应用: 使用 uvicorn.run() 方法运行应用,监听所有地址的 8000 端口。

运行方式:

  1. 将代码保存为 main.py
  2. 在终端中运行 python main.py
  3. 在浏览器访问 http://localhost:8000/docs,查看自动生成的 API 文档和测试接口。

这个简单的示例展示了 FastAPI 如何定义路由并处理请求。希望这能帮助你更好地理解相关概念!

复杂版本

好的,下面是一个简化的 FastAPI 工程示例,结构和功能与您提供的代码类似,旨在帮助您理解 FastAPI 的路由和服务器相关的知识。

项目结构

my_fastapi_app/
│
├─ main.py                     # 主应用文件
├─ requirements.txt            # 依赖项文件
└─ routers/
    ├─ __init__.py             # 初始化模块
    ├─ user.py                  # 用户相关路由
    └─ item.py                  # 项目相关路由

1. main.py

from fastapi import FastAPI
from routers import user, item

app = FastAPI(title="My FastAPI App", version="1.0")

# 将路由挂载到应用上
app.include_router(user.router, prefix="/users", tags=["Users"])
app.include_router(item.router, prefix="/items", tags=["Items"])

@app.get("/", tags=["Root"])
async def read_root():
    return {"message": "Welcome to My FastAPI App"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

讲解:

  • 导入库: 引入 FastAPI 和路由模块 useritem
  • 创建应用: app = FastAPI(...) 创建 FastAPI 应用实例,设定标题和版本。
  • 挂载路由
    • app.include_router(...) 将用户和项目路由挂载到应用上,设置前缀和标签以便分类。
  • 根路由
    • @app.get("/") 定义根路由,访问时返回一个欢迎消息。
  • 运行应用: 使用 uvicorn.run(...) 启动应用,监听所有 IP 地址 (0.0.0.0) 和 8000 端口。

2. routers/user.py

from fastapi import APIRouter, Body
from typing import List

router = APIRouter()

fake_users_db = []

@router.get("/", response_model=List[dict], summary="Get all users")
async def get_users():
    return fake_users_db

@router.post("/", response_model=dict, summary="Create a new user")
async def create_user(user: dict = Body(...)):
    fake_users_db.append(user)
    return {"message": "User created", "user": user}

@router.get("/{user_id}", summary="Get a specific user")
async def read_user(user_id: int):
    if user_id < len(fake_users_db):
        return fake_users_db[user_id]
    return {"error": "User not found"}

讲解:

  • 路由器实例: router = APIRouter() 创建一个路由器实例。
  • 模拟数据库: fake_users_db 是一个列表,用于存储用户数据。
  • 获取所有用户
    • @router.get("/") 定义获取所有用户的 GET 请求。"/" 表示这个路由的路径是根路径。也就是说,访问 /users/(假设该路由挂载在 /users 下)时,会触发这个函数。
    • 响应模型: response_model=List[dict] 指定了该路由的响应体应该是一个字典列表。具体来说,返回的 JSON 数据格式应该是一个数组,其中每个元素都是一个字典。
    • 类型提示: 使用类型提示可以帮助 FastAPI 自动生成 API 文档,并在请求时提供数据验证。这里表示返回的数据结构是一个包含字典的列表,字典的键值对可以根据具体实现自行定义。
    • 摘要: summary 参数是一个可选的描述,用于提供该路由的简短说明。在自动生成的 API 文档(如 Swagger UI)中,这个摘要会显示在相应的路由下,帮助用户快速理解该路由的功能。
    • 返回 fake_users_db 列表。
  • 创建新用户
    • @router.post("/") 定义创建新用户的 POST 请求。
    • 使用 Body(...) 解析请求体中的用户数据,并添加到 fake_users_db
  • 获取特定用户
    • @router.get("/{user_id}") 通过用户 ID 获取用户。
    • 检查用户 ID 是否有效,若有效返回用户数据,否则返回错误消息。

3. routers/item.py

from fastapi import APIRouter, Body
from typing import List

router = APIRouter()

fake_items_db = []

@router.get("/", response_model=List[dict], summary="Get all items")
async def get_items():
    return fake_items_db

@router.post("/", response_model=dict, summary="Create a new item")
async def create_item(item: dict = Body(...)):
    fake_items_db.append(item)
    return {"message": "Item created", "item": item}

@router.get("/{item_id}", summary="Get a specific item")
async def read_item(item_id: int):
    if item_id < len(fake_items_db):
        return fake_items_db[item_id]
    return {"error": "Item not found"}

讲解:

  • 路由器实例: 同样使用 APIRouter() 创建路由器。
  • 模拟数据库: fake_items_db 用于存储项目数据。
  • 获取所有项目
    • @router.get("/") 定义获取所有项目的 GET 请求,返回 fake_items_db 列表。
  • 创建新项目
    • @router.post("/") 定义创建新项目的 POST 请求,使用 Body(...) 解析请求体。
  • 获取特定项目
    • @router.get("/{item_id}") 通过项目 ID 获取项目数据,检查有效性。

4. requirements.txt

fastapi
uvicorn

说明:

  1. 目录结构: 项目分为主要应用 (main.py) 和路由模块 (routers/),使得代码更易于维护。
  2. 路由:
    • user.py: 定义用户相关的 API,包括获取用户、创建用户和获取特定用户。
    • item.py: 定义项目相关的 API,功能与用户类似。
  3. 挂载路由: 在 main.py 中,使用 include_router 将用户和项目路由挂载到主应用上,定义了前缀 /users/items
  4. 运行: 使用 uvicorn 启动应用,访问 http://localhost:8000/docs 查看 API 文档。

如何运行:

  1. 创建一个名为 my_fastapi_app 的文件夹,并在其中创建上述文件和文件夹。

  2. 在终端中,导航到该文件夹,并运行以下命令安装依赖项:

    pip install -r requirements.txt
    
  3. 然后,运行应用:

    python main.py
    

希望这个示例能帮助您理解 FastAPI 的基本用法!如果有任何问题,请随时问我。

chatchat的api.py

import nltk
import sys
import os

sys.path.append(os.path.dirname(os.path.dirname(__file__)))

from configs import VERSION
from configs.model_config import NLTK_DATA_PATH
from configs.server_config import OPEN_CROSS_DOMAIN
import argparse
import uvicorn
from fastapi import Body
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
from server.chat.chat import chat
from server.chat.search_engine_chat import search_engine_chat
from server.chat.completion import completion
from server.chat.feedback import chat_feedback
from server.embeddings_api import embed_texts_endpoint
from server.llm_api import (list_running_models, list_config_models,
                            change_llm_model, stop_llm_model,
                            get_model_config, list_search_engines)
from server.utils import (BaseResponse, ListResponse, FastAPI, MakeFastAPIOffline,
                          get_server_configs, get_prompt_template)
from typing import List, Literal

nltk.data.path = [NLTK_DATA_PATH] + nltk.data.path


async def document():
    return RedirectResponse(url="/docs")


def create_app(run_mode: str = None):
    app = FastAPI(
        title="Langchain-Chatchat API Server",
        version=VERSION
    )
    MakeFastAPIOffline(app)
    # Add CORS middleware to allow all origins
    # 在config.py中设置OPEN_DOMAIN=True,允许跨域
    # set OPEN_DOMAIN=True in config.py to allow cross-domain
    if OPEN_CROSS_DOMAIN:
        app.add_middleware(
            CORSMiddleware,
            allow_origins=["*"],
            allow_credentials=True,
            allow_methods=["*"],
            allow_headers=["*"],
        )
    mount_app_routes(app, run_mode=run_mode)
    return app


def mount_app_routes(app: FastAPI, run_mode: str = None):
    app.get("/",
            response_model=BaseResponse,
            summary="swagger 文档")(document)

    # Tag: Chat
    app.post("/chat/chat",
             tags=["Chat"],
             summary="与llm模型对话(通过LLMChain)",
             )(chat)

    app.post("/chat/search_engine_chat",
             tags=["Chat"],
             summary="与搜索引擎对话",
             )(search_engine_chat)

    app.post("/chat/feedback",
             tags=["Chat"],
             summary="返回llm模型对话评分",
             )(chat_feedback)

    # 知识库相关接口
    mount_knowledge_routes(app)
    # 摘要相关接口
    mount_filename_summary_routes(app)

    # LLM模型相关接口
    app.post("/llm_model/list_running_models",
             tags=["LLM Model Management"],
             summary="列出当前已加载的模型",
             )(list_running_models)

    app.post("/llm_model/list_config_models",
             tags=["LLM Model Management"],
             summary="列出configs已配置的模型",
             )(list_config_models)

    app.post("/llm_model/get_model_config",
             tags=["LLM Model Management"],
             summary="获取模型配置(合并后)",
             )(get_model_config)

    app.post("/llm_model/stop",
             tags=["LLM Model Management"],
             summary="停止指定的LLM模型(Model Worker)",
             )(stop_llm_model)

    app.post("/llm_model/change",
             tags=["LLM Model Management"],
             summary="切换指定的LLM模型(Model Worker)",
             )(change_llm_model)

    # 服务器相关接口
    app.post("/server/configs",
             tags=["Server State"],
             summary="获取服务器原始配置信息",
             )(get_server_configs)

    app.post("/server/list_search_engines",
             tags=["Server State"],
             summary="获取服务器支持的搜索引擎",
             )(list_search_engines)

    @app.post("/server/get_prompt_template",
             tags=["Server State"],
             summary="获取服务区配置的 prompt 模板")
    def get_server_prompt_template(
        type: Literal["llm_chat", "knowledge_base_chat", "search_engine_chat", "agent_chat"]=Body("llm_chat", description="模板类型,可选值:llm_chat,knowledge_base_chat,search_engine_chat,agent_chat"),
        name: str = Body("default", description="模板名称"),
    ) -> str:
        return get_prompt_template(type=type, name=name)

    # 其它接口
    app.post("/other/completion",
             tags=["Other"],
             summary="要求llm模型补全(通过LLMChain)",
             )(completion)

    app.post("/other/embed_texts",
            tags=["Other"],
            summary="将文本向量化,支持本地模型和在线模型",
            )(embed_texts_endpoint)


def mount_knowledge_routes(app: FastAPI):
    from server.chat.knowledge_base_chat import knowledge_base_chat
    from server.chat.file_chat import upload_temp_docs, file_chat
    from server.chat.agent_chat import agent_chat
    from server.knowledge_base.kb_api import list_kbs, create_kb, delete_kb
    from server.knowledge_base.kb_doc_api import (list_files, upload_docs, delete_docs,
                                                update_docs, download_doc, recreate_vector_store,
                                                search_docs, DocumentWithVSId, update_info,
                                                update_docs_by_id,)

    app.post("/chat/knowledge_base_chat",
             tags=["Chat"],
             summary="与知识库对话")(knowledge_base_chat)

    app.post("/chat/file_chat",
             tags=["Knowledge Base Management"],
             summary="文件对话"
             )(file_chat)

    app.post("/chat/agent_chat",
             tags=["Chat"],
             summary="与agent对话")(agent_chat)

    # Tag: Knowledge Base Management
    app.get("/knowledge_base/list_knowledge_bases",
            tags=["Knowledge Base Management"],
            response_model=ListResponse,
            summary="获取知识库列表")(list_kbs)

    app.post("/knowledge_base/create_knowledge_base",
             tags=["Knowledge Base Management"],
             response_model=BaseResponse,
             summary="创建知识库"
             )(create_kb)

    app.post("/knowledge_base/delete_knowledge_base",
             tags=["Knowledge Base Management"],
             response_model=BaseResponse,
             summary="删除知识库"
             )(delete_kb)

    app.get("/knowledge_base/list_files",
            tags=["Knowledge Base Management"],
            response_model=ListResponse,
            summary="获取知识库内的文件列表"
            )(list_files)

    app.post("/knowledge_base/search_docs",
             tags=["Knowledge Base Management"],
             response_model=List[DocumentWithVSId],
             summary="搜索知识库"
             )(search_docs)

    app.post("/knowledge_base/update_docs_by_id",
             tags=["Knowledge Base Management"],
             response_model=BaseResponse,
             summary="直接更新知识库文档"
             )(update_docs_by_id)


    app.post("/knowledge_base/upload_docs",
             tags=["Knowledge Base Management"],
             response_model=BaseResponse,
             summary="上传文件到知识库,并/或进行向量化"
             )(upload_docs)

    app.post("/knowledge_base/delete_docs",
             tags=["Knowledge Base Management"],
             response_model=BaseResponse,
             summary="删除知识库内指定文件"
             )(delete_docs)

    app.post("/knowledge_base/update_info",
             tags=["Knowledge Base Management"],
             response_model=BaseResponse,
             summary="更新知识库介绍"
             )(update_info)
    app.post("/knowledge_base/update_docs",
             tags=["Knowledge Base Management"],
             response_model=BaseResponse,
             summary="更新现有文件到知识库"
             )(update_docs)

    app.get("/knowledge_base/download_doc",
            tags=["Knowledge Base Management"],
            summary="下载对应的知识文件")(download_doc)

    app.post("/knowledge_base/recreate_vector_store",
             tags=["Knowledge Base Management"],
             summary="根据content中文档重建向量库,流式输出处理进度。"
             )(recreate_vector_store)

    app.post("/knowledge_base/upload_temp_docs",
             tags=["Knowledge Base Management"],
             summary="上传文件到临时目录,用于文件对话。"
             )(upload_temp_docs)


def mount_filename_summary_routes(app: FastAPI):
    from server.knowledge_base.kb_summary_api import (summary_file_to_vector_store, recreate_summary_vector_store,
                                                      summary_doc_ids_to_vector_store)

    app.post("/knowledge_base/kb_summary_api/summary_file_to_vector_store",
             tags=["Knowledge kb_summary_api Management"],
             summary="单个知识库根据文件名称摘要"
             )(summary_file_to_vector_store)
    app.post("/knowledge_base/kb_summary_api/summary_doc_ids_to_vector_store",
             tags=["Knowledge kb_summary_api Management"],
             summary="单个知识库根据doc_ids摘要",
             response_model=BaseResponse,
             )(summary_doc_ids_to_vector_store)
    app.post("/knowledge_base/kb_summary_api/recreate_summary_vector_store",
             tags=["Knowledge kb_summary_api Management"],
             summary="重建单个知识库文件摘要"
             )(recreate_summary_vector_store)



def run_api(host, port, **kwargs):
    if kwargs.get("ssl_keyfile") and kwargs.get("ssl_certfile"):
        uvicorn.run(app,
                    host=host,
                    port=port,
                    ssl_keyfile=kwargs.get("ssl_keyfile"),
                    ssl_certfile=kwargs.get("ssl_certfile"),
                    )
    else:
        uvicorn.run(app, host=host, port=port)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(prog='langchain-ChatGLM',
                                     description='About langchain-ChatGLM, local knowledge based ChatGLM with langchain'
                                                 ' | 基于本地知识库的 ChatGLM 问答')
    parser.add_argument("--host", type=str, default="0.0.0.0")
    parser.add_argument("--port", type=int, default=7861)
    parser.add_argument("--ssl_keyfile", type=str)
    parser.add_argument("--ssl_certfile", type=str)
    # 初始化消息
    args = parser.parse_args()
    args_dict = vars(args)

    app = create_app()

    run_api(host=args.host,
            port=args.port,
            ssl_keyfile=args.ssl_keyfile,
            ssl_certfile=args.ssl_certfile,
            )

这段代码是一个 FastAPI 应用的完整实现,包含了聊天、知识库管理和模型管理等功能。以下是对代码各部分的详细讲解:

1. 导入模块

import nltk
import sys
import os
import argparse
import uvicorn
from fastapi import Body
from fastapi.middleware.cors import CORSMiddleware
from starlette.responses import RedirectResponse
from server.chat.chat import chat
from server.chat.search_engine_chat import search_engine_chat
# ... 省略其他导入
  • nltk: 用于自然语言处理。
  • sys和os: 用于系统路径管理。
  • argparse: 用于解析命令行参数。
  • uvicorn: 用于运行 FastAPI 应用的 ASGI 服务器。
  • FastAPI 和相关模块: 提供构建 web 应用的功能。
  • 从其他模块导入的函数: 这些函数实现了具体的业务逻辑。

2. 设置 NLTK 数据路径

nltk.data.path = [NLTK_DATA_PATH] + nltk.data.path
  • 将 NLTK 数据的路径添加到 NLTK 的数据搜索路径中,确保应用能找到所需的 NLP 数据。

3. 创建 FastAPI 应用

def create_app(run_mode: str = None):
    app = FastAPI(
        title="Langchain-Chatchat API Server",
        version=VERSION
    )
    MakeFastAPIOffline(app)
  • create_app: 创建并配置一个 FastAPI 应用实例。
  • 标题和版本: 设置应用的标题和版本信息。
  • MakeFastAPIOffline: 可能用于将 FastAPI 应用配置为离线模式(具体实现需查看该函数的定义)。

4. CORS 中间件

if OPEN_CROSS_DOMAIN:
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
  • CORS 中间件: 允许跨域请求。OPEN_CROSS_DOMAIN 变量决定是否启用跨域功能。
  • 设置: 允许所有来源、凭证、HTTP 方法和请求头。

5. 注册路由

mount_app_routes(app, run_mode=run_mode)
  • 调用 mount_app_routes 函数,将多个路由注册到 FastAPI 应用中。

6. mount_app_routes 函数

此函数负责将各种路由挂载到 FastAPI 应用上:

  • 根路由:

    app.get("/", response_model=BaseResponse, summary="swagger 文档")(document)
    
    • 指定根路径返回 Swagger 文档的重定向。
  • 聊天相关路由:

    app.post("/chat/chat", tags=["Chat"], summary="与llm模型对话(通过LLMChain)")(chat)
    
    • 注册与 LLM 模型对话的路由,使用 chat 函数处理请求。
  • 知识库和模型管理路由: 包含创建、更新、删除知识库的功能,以及列出和管理 LLM 模型的接口。

7. 知识库和摘要相关接口

def mount_knowledge_routes(app: FastAPI):
    from server.chat.knowledge_base_chat import knowledge_base_chat
    # 省略导入...
    app.post("/knowledge_base/upload_docs", tags=["Knowledge Base Management"], response_model=BaseResponse, summary="上传文件到知识库,并/或进行向量化")(upload_docs)
  • 该函数集中管理知识库相关的路由,包括文件上传、搜索和更新操作。

8. 启动 API

def run_api(host, port, **kwargs):
    if kwargs.get("ssl_keyfile") and kwargs.get("ssl_certfile"):
        uvicorn.run(app, host=host, port=port, ssl_keyfile=kwargs.get("ssl_keyfile"), ssl_certfile=kwargs.get("ssl_certfile"))
    else:
        uvicorn.run(app, host=host, port=port)
  • run_api: 启动 FastAPI 应用,可以选择使用 SSL。

9. 主程序入口

if __name__ == "__main__":
    parser = argparse.ArgumentParser(prog='langchain-ChatGLM', description='About langchain-ChatGLM, local knowledge based ChatGLM with langchain')
    # 省略命令行参数解析
    app = create_app()
    run_api(host=args.host, port=args.port, ssl_keyfile=args.ssl_keyfile, ssl_certfile=args.ssl_certfile)
  • 命令行参数: 允许用户通过命令行指定主机、端口及 SSL 证书。
  • 创建和启动应用: 调用 create_app 函数创建 FastAPI 应用,并启动 API 服务。

总结

这段代码展示了一个功能全面的 FastAPI 应用框架,集成了聊天功能、知识库管理和模型控制。通过模块化设计,清晰地划分了不同的业务逻辑,并提供了丰富的接口供前端或其他服务调用。代码也展示了如何处理跨域请求、路由注册和命令行参数解析等常见的 web 应用开发任务。

Uvicorn

当然可以!Uvicorn 是 Python 生态系统中一个高性能的 ASGI(Asynchronous Server Gateway Interface)服务器,广泛用于运行异步Web框架,如 FastAPI 和 Starlette。作为一个刚开始 Python 后端开发的人员,理解 Uvicorn 的概念、功能、使用方法以及如何优化它,对于构建高效、可扩展的后端服务至关重要。以下是关于 Uvicorn 的全面讲解。


1. 什么是 Uvicorn?

Uvicorn 是一个基于 uvloophttptools 的轻量级、快速的 ASGI 服务器。它专为运行异步应用程序而设计,能够充分利用 Python 的异步特性,实现高并发和低延迟的性能表现。

ASGI 简介

  • WSGI(Web Server Gateway Interface):传统的 Python Web 服务器接口,支持同步请求处理。
  • ASGI(Asynchronous Server Gateway Interface):现代的 Python Web 服务器接口,支持异步请求处理,能够处理长连接(如 WebSocket)和高并发。

Uvicorn 实现了 ASGI 规范,使其成为支持异步功能的现代 Web 框架(如 FastAPI)的理想选择。


2. Uvicorn 的主要特性

  • 高性能:基于 uvloop(一个快速的事件循环)和 httptools(一个快速的 HTTP 解析库),提供卓越的性能表现。
  • 异步支持:原生支持异步编程,适用于需要高并发和实时通信的应用。
  • 轻量级:简洁的设计和低资源消耗,适合微服务和小型应用。
  • 多协议支持:支持 HTTP/1.1、HTTP/2 和 WebSocket 协议。
  • 易于集成:与 FastAPI、Starlette 等现代 Python Web 框架无缝集成。
  • 热重载:开发模式下支持代码更改时自动重启,提升开发效率。
  • 灵活的配置:提供多种启动参数和配置选项,满足不同的部署需求。

3. 安装 Uvicorn

使用 pip 安装

pip install uvicorn

安装特定版本

如果需要安装特定版本,可以指定版本号:

pip install uvicorn==0.20.0

安装额外依赖

Uvicorn 支持不同的工作模式和功能,您可以根据需要安装额外的依赖:

  • 标准安装

    pip install uvicorn
    
  • 安装全功能版本(包含 HTTP/2 和 WebSocket 支持)

    pip install uvicorn[standard]
    

    这将安装 uvloophttptoolsgunicornpython-dotenv 等额外依赖,提升性能和功能。


4. 基本使用

启动一个简单的 ASGI 应用

假设您有一个简单的 ASGI 应用 app.py

# app.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def read_root():
    return {"message": "Hello, Uvicorn!"}

使用 Uvicorn 启动应用

在命令行中运行以下命令:

uvicorn app:app --reload
  • app:app:指的是 app.py 文件中的 app 对象。
  • --reload:启用热重载,开发阶段使用,代码更改时自动重启服务器。

访问应用

在浏览器中访问 http://127.0.0.1:8000,您将看到:

{
  "message": "Hello, Uvicorn!"
}

5. 主要命令行参数

Uvicorn 提供了丰富的命令行参数,用于控制服务器的行为。以下是一些常用参数:

参数描述默认值
appASGI 应用路径(如 module:app必须指定
--host / -h服务器绑定的主机地址127.0.0.1
--port / -p服务器绑定的端口号8000
--reload启用自动重载(开发模式)False
--workers / -w工作进程数量(建议在生产环境中使用 Gunicorn 管理多个 Uvicorn 进程)1
--loop选择事件循环实现(autoasynciouvloopauto
--http选择 HTTP 实现(autohttptoolsh11auto
--ws选择 WebSocket 实现(autowebsocketsnoneauto
--log-level设置日志级别(criticalerrorwarninginfodebuginfo
--access-log启用访问日志True
--proxy-headers启用代理头部支持(如 X-Forwarded-ForFalse
--root-path设置应用的根路径,用于部署在子路径下的应用/
--ssl-keyfileSSL 密钥文件路径,启用 HTTPS(如 --ssl-keyfile=key.pem --ssl-certfile=cert.pemNone
--ssl-certfileSSL 证书文件路径,启用 HTTPSNone
--timeout-keep-alive设置连接保持活动状态的超时时间(秒)5

示例:指定主机和端口

uvicorn app:app --host 0.0.0.0 --port 9000

这将启动服务器,监听所有可用IP地址的9000端口。

示例:启用 HTTPS

uvicorn app:app --ssl-keyfile=path/to/key.pem --ssl-certfile=path/to/cert.pem

这将在安全的 HTTPS 上运行服务器。


6. 集成 FastAPI 与 Uvicorn

FastAPI 是一个现代、快速(高性能)的 Web 框架,基于标准的 Python 类型提示,使用 Uvicorn 作为 ASGI 服务器可以充分发挥其性能优势。

创建一个 FastAPI 应用

# main.py
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
async def read_root():
    return {"message": "Hello, FastAPI with Uvicorn!"}

启动应用

uvicorn main:app --reload

访问文档

FastAPI 自动生成交互式 API 文档,访问:

  • Swagger UI:http://127.0.0.1:8000/docs
  • ReDoc:http://127.0.0.1:8000/redoc

7. 生产环境中的部署

虽然 Uvicorn 可以直接在生产环境中使用,但结合 Gunicorn 管理多个工作进程,可以进一步提升性能和可靠性。

安装 Gunicorn 和 Uvicorn Workers

pip install gunicorn
pip install uvicorn

使用 Gunicorn 启动 Uvicorn Workers

gunicorn app.main:app -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000 --workers 4
  • -k uvicorn.workers.UvicornWorker:指定 Gunicorn 使用 Uvicorn 的工作进程。
  • --workers 4:启动4个工作进程,根据服务器的 CPU 核心数进行调整。

使用配置文件

为了更好地管理 Gunicorn 配置,可以使用配置文件。例如,创建 gunicorn_conf.py

# gunicorn_conf.py
bind = "0.0.0.0:8000"
workers = 4
worker_class = "uvicorn.workers.UvicornWorker"
loglevel = "info"

然后启动 Gunicorn:

gunicorn app.main:app -c gunicorn_conf.py

使用 Process Manager

在生产环境中,推荐使用 Process Manager(如 systemdsupervisordpm2)来管理 Gunicorn 进程,确保在服务器重启或应用崩溃时自动重启。

示例:使用 systemd 管理 Gunicorn
  1. 创建 systemd 服务文件

    创建 /etc/systemd/system/gunicorn.service

    [Unit]
    Description=Gunicorn instance to serve Auto Build API
    After=network.target
    
    [Service]
    User=your_username
    Group=www-data
    WorkingDirectory=/path/to/your/project
    ExecStart=/path/to/your/venv/bin/gunicorn app.main:app -c gunicorn_conf.py
    
    [Install]
    WantedBy=multi-user.target
    
  2. 启动并启用服务

    sudo systemctl start gunicorn
    sudo systemctl enable gunicorn
    
  3. 检查服务状态

    sudo systemctl status gunicorn
    

8. Uvicorn 的高级功能与优化

8.1 热重载

在开发阶段,使用 --reload 参数可以自动重启服务器,方便快速迭代开发。

uvicorn app.main:app --reload

注意:不要在生产环境中使用 --reload,因为它会增加服务器的开销。

8.2 SSL 支持

为了确保数据传输的安全性,可以启用 SSL,使用 --ssl-keyfile--ssl-certfile 参数。

uvicorn app.main:app --host 0.0.0.0 --port 443 --ssl-keyfile=path/to/key.pem --ssl-certfile=path/to/cert.pem

注意:在生产环境中,通常建议使用反向代理(如 NGINX)来处理 SSL,而不是直接在 Uvicorn 上启用 SSL。

8.3 环境变量与配置

使用环境变量和配置文件管理应用配置,提升安全性和灵活性。

示例:使用 .env 文件
  1. 安装 python-dotenv

    pip install python-dotenv
    
  2. 创建 .env 文件

    HOST=0.0.0.0
    PORT=8000
    LOG_LEVEL=info
    
  3. 加载环境变量

    main.py 中:

    import os
    from dotenv import load_dotenv
    from fastapi import FastAPI
    
    load_dotenv()  # 加载 .env 文件
    
    app = FastAPI()
    
    @app.get("/")
    async def read_root():
        return {"message": "Hello, Uvicorn with Env Variables!"}
    
  4. 使用环境变量启动 Uvicorn

    uvicorn app.main:app --host $HOST --port $PORT --log-level $LOG_LEVEL
    

8.4 事件循环与并发

Uvicorn 使用异步编程模型,通过事件循环实现高并发处理。确保在编写应用程序时充分利用异步特性,避免阻塞操作。

示例:异步数据库操作
from fastapi import FastAPI
from databases import Database

app = FastAPI()
database = Database("sqlite:///test.db")

@app.on_event("startup")
async def startup():
    await database.connect()

@app.on_event("shutdown")
async def shutdown():
    await database.disconnect()

@app.get("/users")
async def get_users():
    query = "SELECT * FROM users"
    results = await database.fetch_all(query)
    return results

8.5 中间件(Middleware)

使用中间件对请求和响应进行预处理和后处理,如日志记录、身份验证、跨域资源共享(CORS)等。

示例:添加 CORS 中间件
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()

# 配置允许的来源
origins = [
    "http://localhost",
    "http://localhost:3000",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.get("/")
async def read_root():
    return {"message": "Hello, Uvicorn with CORS!"}

8.6 性能监控与优化

使用 Gunicorn 的预热

Gunicorn 支持预热工作进程,减少启动延迟,提高响应速度。

gunicorn app.main:app -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000 --workers 4 --preload
启用 HTTP/2

通过使用支持 HTTP/2 的服务器或代理(如 NGINX)来提升性能,尤其是在高延迟网络环境下。

负载均衡

在多台服务器之间分配请求,使用 NGINX、HAProxy 等工具实现负载均衡,提升应用的可扩展性和可靠性。


9. 常见问题与故障排除

9.1 连接无法建立

可能原因

  • Uvicorn 未启动或未正确绑定到指定的主机和端口。
  • 防火墙或安全组阻止了连接。
  • 服务器地址或端口错误。

解决方法

  • 确认 Uvicorn 是否正在运行,并监听正确的主机和端口。
  • 检查防火墙设置,确保允许指定端口的流量。
  • 使用 curl 或其他工具测试连接。

9.2 热重载不起作用

可能原因

  • 使用了不支持热重载的部署方式。
  • 代码未保存,导致监控不到文件更改。

解决方法

  • 确保在开发环境中使用 --reload 参数启动 Uvicorn。
  • 确保代码文件已保存,并且文件系统支持监控(部分虚拟文件系统可能不支持)。

9.3 性能低下

可能原因

  • 应用中存在阻塞操作,未充分利用异步特性。
  • 工作进程数量不足,无法处理高并发请求。
  • 资源(CPU、内存)不足。

解决方法

  • 确保所有 I/O 操作(如数据库、网络请求)使用异步库。
  • 根据服务器的 CPU 核心数调整工作进程数量。
  • 监控服务器资源使用情况,必要时升级硬件或优化代码。

9.4 SSL 证书错误

可能原因

  • SSL 密钥或证书文件路径错误。
  • 证书格式不正确或损坏。
  • 使用自签名证书时,客户端未信任证书。

解决方法

  • 确认密钥和证书文件路径正确。
  • 检查证书文件的格式和完整性。
  • 在开发阶段,可以使用自签名证书并在客户端信任。

10. 最佳实践与建议

10.1 使用虚拟环境

始终在虚拟环境中开发和运行项目,避免依赖冲突并确保项目可移植性。

python3 -m venv venv
source venv/bin/activate  # Unix/macOS
# 或
venv\Scripts\activate     # Windows

10.2 管理依赖

使用 requirements.txtPipfile 管理项目依赖,确保环境一致性。

# 生成 requirements.txt
pip freeze > requirements.txt

# 安装依赖
pip install -r requirements.txt

10.3 日志记录

配置适当的日志记录,便于监控和调试。

import logging
from fastapi import FastAPI

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

app = FastAPI()

@app.get("/")
async def read_root():
    logger.info("Root endpoint called")
    return {"message": "Hello, Uvicorn with Logging!"}

10.4 安全性

  • 使用 HTTPS:确保数据传输的安全性。
  • 认证与授权:保护 API 端点,确保只有授权用户可以访问。
  • 输入验证:使用 Pydantic 模型验证输入数据,防止注入攻击等。

10.5 监控与报警

集成监控工具,如 PrometheusGrafana,实时监控应用性能和健康状态,并设置报警机制。

10.6 编写测试

编写单元测试和集成测试,确保应用功能的正确性和稳定性。使用 pytest 等测试框架进行测试。

# test_app.py
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_read_root():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"message": "Hello, Uvicorn!"}

运行测试:

pytest

10.7 文档与代码质量

  • 自动生成文档:利用 FastAPI 的自动文档生成功能,提供详细的 API 文档。
  • 代码格式化:使用工具如 Blackisort 统一代码格式。
  • 代码静态检查:使用 flake8mypy 等工具检查代码质量和类型错误。

10.8 持续集成与部署(CI/CD)

配置 CI/CD 流水线(如 GitHub Actions、GitLab CI),实现代码的自动测试、构建和部署,提高开发效率和代码质量。

示例:GitHub Actions 工作流

创建 .github/workflows/ci.yml

name: CI

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

jobs:
  build:
    runs-on: ubuntu-latest

    steps:
    - uses: actions/checkout@v2
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Lint with flake8
      run: |
        pip install flake8
        flake8 .
    - name: Test with pytest
      run: |
        pip install pytest
        pytest

11. 示例项目:综合应用 Uvicorn 与 FastAPI

为了更好地理解 Uvicorn 的应用,以下是一个综合示例项目,涵盖基本的 API 创建、WebSocket 通信、日志记录和部署。

项目结构

auto_build_project/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── http.py
│   │   └── websocket.py
│   ├── modules/
│   │   ├── __init__.py
│   │   ├── config_checker.py
│   │   ├── info_collector.py
│   │   ├── workflow_generator.py
│   │   ├── config_generator.py
│   │   ├── workflow_adjuster.py
│   │   └── workflow_debugger.py
│   └── schemas/
│       ├── __init__.py
│       ├── initiate.py
│       ├── workflow.py
│       ├── compile.py
│       ├── adjustment.py
│       └── debug.py
├── requirements.txt
├── README.md
└── .gitignore

1. app/main.py

from fastapi import FastAPI
from app.routers import http, websocket

app = FastAPI()

app.include_router(http.router)
app.include_router(websocket.router)

@app.get("/")
def read_root():
    return {"message": "Auto Build API is running."}

2. app/routers/websocket.py

实现 WebSocket 端点,包括 /auto_build/initiate/auto_build/workflow/adjustment/auto_build/workflow/debug

from fastapi import APIRouter, WebSocket, WebSocketDisconnect
from app.schemas.initiate import InitiateResponse
from app.schemas.adjustment import AdjustmentResponse
from app.schemas.debug import DebugStepResult
from app.modules.config_checker import check_configurations
from app.modules.info_collector import collect_information
from app.modules.workflow_adjuster import adjust_workflow
from app.modules.workflow_debugger import debug_workflow
import json

router = APIRouter()

@router.websocket("/auto_build/initiate")
async def initiate_build(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            request = json.loads(data)

            user_id = request.get("user_id")
            autobuild_id = request.get("autobuild_id")
            user_input = request.get("user_input")

            print(f"Received build request from user_id={user_id}, autobuild_id={autobuild_id}")

            # 前置配置检查
            missing_configs = check_configurations()
            if missing_configs:
                response = InitiateResponse(
                    status="failure",
                    missing_configurations=missing_configs,
                    message="配置检查未通过。"
                )
                await websocket.send_text(json.dumps(response.dict()))
                continue  # 继续等待下一个请求

            # 前置配置通过,进行信息收集
            collected_info = collect_information(user_input)
            response = {
                "status": "success",
                "collected_info": collected_info,
                "message": "信息收集完毕,构建准备状态。"
            }
            await websocket.send_text(json.dumps(response))
    except WebSocketDisconnect:
        print(f"WebSocket连接断开: {websocket.client}")
    except Exception as e:
        print(f"发生错误: {e}")
        await websocket.close(code=1003)

@router.websocket("/auto_build/workflow/adjustment")
async def adjust_workflow_ws(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            request = json.loads(data)

            user_id = request.get("user_id")
            autobuild_id = request.get("autobuild_id")
            workflow = request.get("workflow")
            user_commands = request.get("user_commands", [])  # 用户调整指令

            # 调整工作流
            adjusted_workflow = adjust_workflow(workflow, user_commands)

            response = {
                "status": "success",
                "workflow": adjusted_workflow,
                "message": "工作流调整成功。"
            }
            await websocket.send_text(json.dumps(response))
    except WebSocketDisconnect:
        print(f"WebSocket连接断开: {websocket.client}")
    except Exception as e:
        print(f"发生错误: {e}")
        await websocket.close(code=1003)

@router.websocket("/auto_build/workflow/debug")
async def debug_workflow_ws(websocket: WebSocket):
    await websocket.accept()
    try:
        data = await websocket.receive_text()
        request = json.loads(data)

        user_id = request.get("user_id")
        autobuild_id = request.get("autobuild_id")
        workflow = request.get("workflow")

        # 调试工作流
        async for step_result in debug_workflow(workflow):
            await websocket.send_text(json.dumps(step_result))

        await websocket.send_text(json.dumps({"status": "completed", "message": "工作流调试完成。"}))
    except WebSocketDisconnect:
        print(f"WebSocket连接断开: {websocket.client}")
    except Exception as e:
        print(f"发生错误: {e}")
        await websocket.close(code=1003)

3. app/routers/http.py

实现 HTTP 端点,包括 /auto_build/workflow/auto_build/workflow/compile

from fastapi import APIRouter, HTTPException
from app.schemas.workflow import WorkflowRequest, WorkflowResponse
from app.schemas.compile import CompileRequest, CompileResponse
from app.modules.workflow_generator import generate_workflow
from app.modules.config_generator import generate_config

router = APIRouter()

@router.post("/auto_build/workflow", response_model=WorkflowResponse)
def create_workflow(request: WorkflowRequest):
    try:
        workflow = generate_workflow(request.collected_info)
        return WorkflowResponse(
            status="success",
            workflow=workflow,
            message="工作流生成成功。"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@router.post("/auto_build/workflow/compile", response_model=CompileResponse)
def compile_workflow(request: CompileRequest):
    try:
        compiled_workflow = generate_config(request.workflow)
        return CompileResponse(
            status="success",
            workflow=compiled_workflow,
            message="工作流配置补全成功。"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

4. 其他模块实现

根据需求,实现各个模块的功能。

4.1 app/modules/config_checker.py
def check_configurations():
    missing = []

    # 检查大模型配置是否存在
    model_config_exists = False  # 实际检查逻辑
    if not model_config_exists:
        missing.append("大模型配置不存在")

    # 检查必要的资源和服务是否可用
    resources_available = False  # 实际检查逻辑
    if not resources_available:
        missing.append("必要的资源和服务不可用")

    return missing
4.2 app/modules/info_collector.py
def collect_information(user_input):
    # 根据 user_input 收集必要的信息
    app_name = user_input.get("app_name")
    app_function = user_input.get("app_function")

    collected_info = {
        "app_name": app_name,
        "app_function": app_function
    }
    return collected_info
4.3 app/modules/workflow_generator.py
def match_template(collected_info):
    # 假设有预定义的模板
    templates = {
        "Data Processing": {"steps": ["fetch_data", "process_data", "store_results"]},
        "Machine Learning": {"steps": ["data_preprocessing", "train_model", "evaluate_model"]}
    }
    app_function = collected_info.get("app_function")
    if app_function in templates:
        return templates[app_function]
    return None

def generate_workflow(collected_info):
    template = match_template(collected_info)
    if template:
        # 使用模板生成工作流
        workflow = {
            "autobuild_id": collected_info.get("autobuild_id"),
            "steps": template["steps"]
        }
    else:
        # 模板匹配失败,生成自定义工作流
        workflow = {
            "autobuild_id": collected_info.get("autobuild_id"),
            "steps": ["custom_step1", "custom_step2"]
        }
    return workflow
4.4 app/modules/config_generator.py
def generate_config(workflow):
    # 检查每个步骤是否有配置信息,若无则补全
    for step in workflow.get("steps", []):
        if isinstance(step, dict) and "config" not in step:
            step["config"] = {"default_param": "value"}  # 补全配置
    return workflow
4.5 app/modules/workflow_adjuster.py
def adjust_workflow(workflow, user_commands):
    # 根据 user_commands 调整工作流
    for command in user_commands:
        action = command.get("action")
        step = command.get("step")
        if action == "add":
            workflow["steps"].append(step)
        elif action == "delete":
            workflow["steps"] = [s for s in workflow["steps"] if s["name"] != step["name"]]
        elif action == "modify":
            for s in workflow["steps"]:
                if s["name"] == step["name"]:
                    s.update(step.get("changes", {}))
    return workflow
4.6 app/modules/workflow_debugger.py
import asyncio

def execute_step(step):
    # 模拟步骤执行,实际应包含具体逻辑
    if step["name"] == "fetch_data":
        return {"step": "fetch_data", "status": "success", "message": "数据获取成功。"}
    elif step["name"] == "process_data":
        return {"step": "process_data", "status": "success", "message": "数据处理成功。"}
    elif step["name"] == "validate_data":
        return {"step": "validate_data", "status": "failure", "message": "数据验证失败。"}
    else:
        return {"step": step["name"], "status": "success", "message": "步骤执行成功。"}

async def debug_workflow(workflow):
    for step in workflow.get("steps", []):
        # 模拟执行时间
        await asyncio.sleep(1)
        result = execute_step(step)
        yield result

5. app/schemas/

使用 Pydantic 定义请求和响应的模型,确保数据验证和类型检查。

5.1 app/schemas/initiate.py
from pydantic import BaseModel
from typing import List

class InitiateRequest(BaseModel):
    user_id: str
    autobuild_id: str
    user_input: dict

class InitiateResponse(BaseModel):
    status: str
    missing_configurations: List[str] = []
    message: str = ""
5.2 app/schemas/workflow.py
from pydantic import BaseModel

class WorkflowRequest(BaseModel):
    user_id: str
    autobuild_id: str
    collected_info: dict

class WorkflowResponse(BaseModel):
    status: str
    workflow: dict = {}
    message: str = ""
5.3 app/schemas/compile.py
from pydantic import BaseModel

class CompileRequest(BaseModel):
    user_id: str
    autobuild_id: str
    workflow: dict

class CompileResponse(BaseModel):
    status: str
    workflow: dict = {}
    message: str = ""
5.4 app/schemas/adjustment.py
from pydantic import BaseModel

class AdjustmentRequest(BaseModel):
    user_id: str
    autobuild_id: str
    workflow: dict
    user_commands: list = []

class AdjustmentResponse(BaseModel):
    status: str
    workflow: dict = {}
    message: str = ""
5.5 app/schemas/debug.py
from pydantic import BaseModel

class DebugRequest(BaseModel):
    user_id: str
    autobuild_id: str
    workflow: dict

class DebugStepResult(BaseModel):
    step: str
    status: str
    message: str = ""

6. 运行和测试项目

6.1 启动服务器

在项目根目录下运行以下命令:

uvicorn app.main:app --reload
6.2 测试 HTTP 端点

使用 PostmancURL 发送 POST 请求到 /auto_build/workflow/auto_build/workflow/compile

示例:生成工作流

curl -X POST "http://127.0.0.1:8000/auto_build/workflow" \
-H "Content-Type: application/json" \
-d '{
    "user_id": "user123",
    "autobuild_id": "build456",
    "collected_info": {
        "app_name": "MyApp",
        "app_function": "Data Processing"
    }
}'

预期响应

{
    "status": "success",
    "workflow": {
        "autobuild_id": "build456",
        "steps": ["fetch_data", "process_data", "store_results"]
    },
    "message": "工作流生成成功。"
}

示例:补全配置

curl -X POST "http://127.0.0.1:8000/auto_build/workflow/compile" \
-H "Content-Type: application/json" \
-d '{
    "user_id": "user123",
    "autobuild_id": "build456",
    "workflow": {
        "autobuild_id": "build456",
        "steps": [
            {"name": "fetch_data"},
            {"name": "process_data"},
            {"name": "store_results"}
        ]
    }
}'

预期响应

{
    "status": "success",
    "workflow": {
        "autobuild_id": "build456",
        "steps": [
            {"name": "fetch_data", "config": {"default_param": "value"}},
            {"name": "process_data", "config": {"default_param": "value"}},
            {"name": "store_results", "config": {"default_param": "value"}}
        ]
    },
    "message": "工作流配置补全成功。"
}
6.3 测试 WebSocket 端点

使用浏览器控制台或 WebSocket 客户端工具发送构建请求和调整请求。

示例:使用浏览器控制台

  1. 打开浏览器,访问 http://127.0.0.1:8000
  2. 打开开发者工具(通常按 F12),切换到“Console”标签。
  3. 输入以下 JavaScript 代码
// 创建 WebSocket 连接到 initiate 端点
const ws_initiate = new WebSocket("ws://127.0.0.1:8000/auto_build/initiate");

ws_initiate.onopen = () => {
    console.log("WebSocket (initiate) 连接已打开");
    const request = {
        user_id: "user123",
        autobuild_id: "build456",
        user_input: {
            app_name: "MyApp",
            app_function: "Data Processing"
        }
    };
    ws_initiate.send(JSON.stringify(request));
};

ws_initiate.onmessage = (event) => {
    const response = JSON.parse(event.data);
    console.log("收到 initiate 响应:", response);
};

ws_initiate.onerror = (error) => {
    console.error("WebSocket (initiate) 错误:", error);
};

ws_initiate.onclose = () => {
    console.log("WebSocket (initiate) 连接已关闭");
};

// 创建 WebSocket 连接到 debug 端点
const ws_debug = new WebSocket("ws://127.0.0.1:8000/auto_build/workflow/debug");

ws_debug.onopen = () => {
    console.log("WebSocket (debug) 连接已打开");
    const request = {
        user_id: "user123",
        autobuild_id: "build456",
        workflow: {
            "autobuild_id": "build456",
            "steps": [
                {"name": "fetch_data"},
                {"name": "process_data"},
                {"name": "validate_data"}
            ]
        }
    };
    ws_debug.send(JSON.stringify(request));
};

ws_debug.onmessage = (event) => {
    const response = JSON.parse(event.data);
    console.log("收到 debug 响应:", response);
};

ws_debug.onerror = (error) => {
    console.error("WebSocket (debug) 错误:", error);
};

ws_debug.onclose = () => {
    console.log("WebSocket (debug) 连接已关闭");
};

预期输出

  • Initiate 端点

    • 如果

      config_checker
      

      返回缺少配置:

      {
          "status": "failure",
          "missing_configurations": ["大模型配置不存在", "必要的资源和服务不可用"],
          "message": "配置检查未通过。"
      }
      
    • 如果配置检查通过:

      {
          "status": "success",
          "collected_info": {
              "app_name": "MyApp",
              "app_function": "Data Processing"
          },
          "message": "信息收集完毕,构建准备状态。"
      }
      
  • Debug 端点

    • 分步骤返回调试结果:

      {
          "step": "fetch_data",
          "status": "success",
          "message": "数据获取成功。"
      }
      {
          "step": "process_data",
          "status": "success",
          "message": "数据处理成功。"
      }
      {
          "step": "validate_data",
          "status": "failure",
          "message": "数据验证失败。"
      }
      {
          "status": "completed",
          "message": "工作流调试完成。"
      }
      

12. 性能优化与调优

12.1 使用 uvloop 提升性能

uvloop 是一个基于 libuv 的高性能事件循环,能够显著提升异步应用的性能。Uvicorn 默认在支持的平台上使用 uvloop

安装 uvloop
pip install uvloop
确认使用 uvloop

在 Uvicorn 启动日志中,您可以看到事件循环的实现。例如:

INFO:     Started server process [12345]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)

12.2 使用 httptools 提升 HTTP 解析速度

httptools 是一个快速的 HTTP 解析库,Uvicorn 使用它来处理 HTTP 请求。它提供了比纯 Python 实现更高的性能。

安装 httptools
pip install httptools
确认使用 httptools

默认情况下,Uvicorn 会自动检测并使用 httptools,除非通过 --http 参数指定其他实现。

12.3 合理配置工作进程和线程

根据服务器的硬件资源和应用的需求,合理配置 Uvicorn 的工作进程和线程数量。

  • 工作进程数量:通常与服务器的 CPU 核心数相匹配。例如,4 核 CPU 建议使用 4 个工作进程。
  • 线程数量:默认使用单线程工作进程,适合 I/O 密集型应用。对于计算密集型任务,可以增加线程数量,但需要谨慎。

12.4 使用缓存

使用缓存技术(如 Redis、Memcached)减少重复计算和数据库访问,提升应用性能。

12.5 数据库优化

  • 使用异步数据库驱动:如 DatabasesSQLModelTortoise-ORM 等,充分利用异步特性。
  • 优化查询:确保数据库查询高效,使用索引和优化查询语句。

12.6 静态文件处理

对于静态文件(如图片、CSS、JavaScript),推荐使用专门的静态文件服务器(如 NGINX)处理,减轻 Uvicorn 的负担。


13. 安全性考虑

13.1 使用 HTTPS

在生产环境中,确保所有通信使用 HTTPS,保护数据的机密性和完整性。

推荐做法:使用反向代理服务器(如 NGINX、HAProxy)处理 SSL,转发请求到 Uvicorn。

13.2 认证与授权

保护 API 端点,确保只有授权用户可以访问。常见的认证方式包括:

  • JWT(JSON Web Tokens)
  • OAuth2
  • API Key
示例:使用 FastAPI 的 OAuth2
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def fake_decode_token(token):
    return {"sub": "user123"}

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="无效的认证凭证",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

@app.get("/protected")
async def protected_route(current_user: dict = Depends(get_current_user)):
    return {"message": f"Hello, {current_user['sub']}!"}

13.3 输入验证

使用 Pydantic 模型验证所有输入数据,防止恶意数据和注入攻击。

from pydantic import BaseModel, Field

class UserInput(BaseModel):
    app_name: str = Field(..., min_length=1)
    app_function: str = Field(..., min_length=1)

@app.post("/submit")
async def submit(input: UserInput):
    return {"message": f"Received {input.app_name} with function {input.app_function}"}

13.4 防止跨站请求伪造(CSRF)

对于需要保护的端点,实施 CSRF 保护机制,尤其是涉及状态更改的操作。


14. 实践中的建议

14.1 持续学习与实践

  • 官方文档:深入阅读 Uvicorn 官方文档FastAPI 官方文档
  • 开源项目:参与开源项目,学习实际应用中的 Uvicorn 和 FastAPI 使用方式。
  • 社区支持:加入相关社区和论坛,如 Stack Overflow、Reddit 的 r/FastAPI 等,获取帮助和交流经验。

14.2 编写清晰的代码与文档

  • 代码风格:遵循 PEP 8 代码规范,保持代码整洁和可读。
  • 注释与文档:为复杂的逻辑添加注释,使用 docstring 记录函数和类的用途。
  • 自动化文档:利用 FastAPI 的自动文档生成功能,保持 API 文档的最新和准确。

14.3 自动化测试

编写单元测试和集成测试,确保应用功能的正确性和稳定性。使用工具如 pytestTestClient 进行测试。

# test_main.py
from fastapi.testclient import TestClient
from app.main import app

client = TestClient(app)

def test_read_root():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"message": "Auto Build API is running."}

14.4 性能监控与优化

集成性能监控工具,如 PrometheusGrafana,实时监控应用的性能指标,并根据监控数据进行优化。

14.5 持续集成与部署(CI/CD)

配置 CI/CD 流水线,实现代码的自动化测试、构建和部署,提高开发效率和代码质量。


15. 总结

Uvicorn 是一个强大且高性能的 ASGI 服务器,适用于现代异步 Python Web 框架(如 FastAPI)的部署。通过理解其工作原理、配置选项和最佳实践,您可以构建高效、可扩展且安全的后端服务。以下是关键点的回顾:

  1. Uvicorn 介绍
    • 高性能的 ASGI 服务器,基于 uvloop 和 httptools。
    • 支持异步编程和多协议通信。
  2. 安装与基本使用
    • 使用 pip 安装。
    • 快速启动 ASGI 应用,并通过命令行参数进行配置。
  3. 集成 FastAPI
    • 无缝与 FastAPI 集成,充分利用其性能和功能优势。
    • 自动生成 API 文档,简化开发流程。
  4. 生产环境部署
    • 结合 Gunicorn 管理多个 Uvicorn 工作进程,提升性能和可靠性。
    • 使用反向代理服务器(如 NGINX)处理 SSL 和负载均衡。
  5. 高级功能与优化
    • 使用异步编程和优化事件循环,提升性能。
    • 配置日志记录、监控和安全机制,确保应用的稳定性和安全性。
  6. 最佳实践
    • 采用虚拟环境和依赖管理,保持项目的可移植性。
    • 编写清晰、可维护的代码,进行充分的测试。
    • 持续学习和实践,跟进行业最佳实践和新技术。

通过系统学习和实践这些内容,您将能够充分利用 Uvicorn 的强大功能,构建高效、可靠的 Python 后端服务。如果在开发过程中遇到任何具体问题或需要进一步的示例,欢迎随时提问!


原文地址:https://blog.csdn.net/xnuscd/article/details/144015848

免责声明:本站文章内容转载自网络资源,如本站内容侵犯了原著者的合法权益,可联系本站删除。更多内容请关注自学内容网(zxcms.com)!