全栈开发

FastAPI官方文档学习05

1、SQL(关系型)数据库

安装依赖

pip install sqlmodel
from typing import Annotated

from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select


class Hero(SQLModel, table=True):
    id: int | None = Field(default=None, primary_key=True)
    name: str = Field(index=True)
    age: int | None = Field(default=None, index=True)
    secret_name: str


sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"

connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, connect_args=connect_args)


def create_db_and_tables():
    SQLModel.metadata.create_all(engine)


def get_session():
    with Session(engine) as session:
        yield session


SessionDep = Annotated[Session, Depends(get_session)]

app = FastAPI()


@app.on_event("startup")
def on_startup():
    create_db_and_tables()


@app.post("/heroes/")
def create_hero(hero: Hero, session: SessionDep) -> Hero:
    session.add(hero)
    session.commit()
    session.refresh(hero)
    return hero


@app.get("/heroes/")
def read_heroes(
    session: SessionDep,
    offset: int = 0,
    limit: Annotated[int, Query(le=100)] = 100,
) -> list[Hero]:
    heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
    return heroes


@app.get("/heroes/{hero_id}")
def read_hero(hero_id: int, session: SessionDep) -> Hero:
    hero = session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    return hero


@app.delete("/heroes/{hero_id}")
def delete_hero(hero_id: int, session: SessionDep):
    hero = session.get(Hero, hero_id)
    if not hero:
        raise HTTPException(status_code=404, detail="Hero not found")
    session.delete(hero)
    session.commit()
    return {"ok": True}

class Hero(SQLModel, table=True) 类定义的理解

元类(Metaclass)

  • 元类是用来创建类的类
  • SQLModel 本身有一个元类(叫做 SQLModelMetaclass
  • 当你继承 SQLModel 时,你的类 Hero 也会使用同一个元类
# SQLModel 的定义(简化版)
class SQLModel(metaclass=SQLModelMetaclass):
    pass

# 你的 Hero 类
class Hero(SQLModel, table=True):
    pass

这相当于:

class Hero(metaclass=SQLModelMetaclass, table=True):
    # 继承了 SQLModel 的所有属性和方法
    pass

关键结论

  1. SQLModel 是基类,不是元类
  2. Hero 是普通类,不是元类
  3. HeroSQLModel 共享同一个元类 SQLModelMetaclass
  4. table=True 被传递给元类 SQLModelMetaclass 进行处理

所以 Hero 仍然是一个可以实例化的普通类,只是它的创建过程被特殊的元类控制了。

Hero 类与 Pydantic 模型非常相似(实际上,从底层来看,它确实就是一个 Pydantic 模型)。

有一些区别:

  • table=True 会告诉 SQLModel 这是一个表模型,它应该表示 SQL 数据库中的一个,而不仅仅是一个数据模型(就像其他常规的 Pydantic 类一样)。
  • Field(primary_key=True) 会告诉 SQLModel id 是 SQL 数据库中的主键(您可以在 SQLModel 文档中了解更多关于 SQL 主键的信息)。把类型设置为 int | None ,SQLModel 就能知道该列在 SQL 数据库中应该是 INTEGER 类型,并且应该是 NULLABLE 。
  • Field(index=True) 会告诉 SQLModel 应该为此列创建一个 SQL 索引,这样在读取按此列过滤的数据时,程序能在数据库中进行更快的查找。SQLModel 会知道声明为 str 的内容将是类型为 TEXT (或 VARCHAR ,具体取决于数据库)的 SQL 列。
connect_args = {"check_same_thread": False}

使用 check_same_thread=False 可以让 FastAPI 在不同线程中使用同一个 SQLite 数据库。这很有必要,因为单个请求可能会使用多个线程(例如在依赖项中)。

不用担心,我们会按照代码结构确保每个请求使用一个单独的 SQLModel 会话,这实际上就是 check_same_thread 想要实现的。

def create_db_and_tables():
    SQLModel.metadata.create_all(engine)
  • 使用 SQLModel.metadata.create_all(engine) 为所有表模型创建表

创建会话(Session)依赖项

Session 会存储内存中的对象并跟踪数据中所需更改的内容,然后它使用 engine 与数据库进行通信。

我们会使用 yield 创建一个 FastAPI 依赖项,为每个请求提供一个新的 Session 。这确保我们每个请求使用一个单独的会话。🤓

然后我们创建一个 Annotated 的依赖项 SessionDep 来简化其他也会用到此依赖的代码。

def get_session():
    with Session(engine) as session:
        yield session


SessionDep = Annotated[Session, Depends(get_session)]

2、更大的应用 - 多个文件

一个文件结构示例¶
假设你的文件结构如下:

.
├── app                  # 「app」是一个 Python 包
│   ├── __init__.py      # 这个文件使「app」成为一个 Python 包
│   ├── main.py          # 「main」模块,例如 import app.main
│   ├── dependencies.py  # 「dependencies」模块,例如 import app.dependencies
│   └── routers          # 「routers」是一个「Python 子包」
│   │   ├── __init__.py  # 使「routers」成为一个「Python 子包」
│   │   ├── items.py     # 「items」子模块,例如 import app.routers.items
│   │   └── users.py     # 「users」子模块,例如 import app.routers.users
│   └── internal         # 「internal」是一个「Python 子包」
│       ├── __init__.py  # 使「internal」成为一个「Python 子包」
│       └── admin.py     # 「admin」子模块,例如 import app.internal.admin

APIRouter

假设专门用于处理用户逻辑的文件是位于 /app/routers/users.py 的子模块。

你希望将与用户相关的路径操作与其他代码分开,以使其井井有条。

from fastapi import APIRouter

router = APIRouter()


@router.get("/users/", tags=["users"])
async def read_users():
    return [{"username": "Rick"}, {"username": "Morty"}]
from fastapi import APIRouter, Depends, HTTPException
from ..dependencies import get_query_token

router = APIRouter(
    prefix="/items",
    tags=["items"],
    dependencies=[Depends(get_query_token)],
    responses={404: {"description": "Not found"}}
)

fake_items_db = {"plumbus": {"name": "Plumbus"}, "gun": {"name": "Portal Gun"}}

@router.get("/")
async def read_items():
    return fake_items_db

@router.get("/{item_id}")
async def read_item(item_id: str):
    if item_id not in fake_items_db:
        raise HTTPException(status_code=404, detail="Item not found")
    return {"name": fake_items_db[item_id]["name"], "item_id": item_id}

@router.put(
    "/{item_id}",
    tags=["custom"],
    responses={403: {"description": "Operation forbidden"}}
)
async def update_item(item_id: str):
    if item_id != "plumbus":
        raise HTTPException(
            status_code=403, detail="You can only update the item: plumbus"
        )
    return {"item_id": item_id, "name": "The great Plumbus"}

我们知道此模块中的所有路径操作都有相同的:

  • 路径 prefix/items
  • tags:(仅有一个 items 标签)。
  • 额外的 responses
  • dependencies:它们都需要我们创建的 X-Token 依赖项。

我们可以添加一个 tags 列表和额外的 responses 列表,这些参数将应用于此路由器中包含的所有路径操作。

我们可以添加一个 dependencies 列表,这些依赖项将被添加到路由器中的所有路径操作中,并将针对向它们发起的每个请求执行/解决。

3、后台任务

你可以定义在返回响应后运行的后台任务。

这对需要在请求之后执行的操作很有用,但客户端不必在接收响应之前等待操作完成。

包括这些例子:

  • 执行操作后发送的电子邮件通知:
    • 由于连接到电子邮件服务器并发送电子邮件往往很“慢”(几秒钟),您可以立即返回响应并在后台发送电子邮件通知。
  • 处理数据:
    • 例如,假设您收到的文件必须经过一个缓慢的过程,您可以返回一个"Accepted"(HTTP 202)响应并在后台处理它。
from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}

FastAPI 会创建一个 BackgroundTasks 类型的对象并作为该参数传入。

from typing import Annotated

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: str | None = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}

该示例中,信息会在响应发出 之后 被写到 log.txt 文件。

如果请求中有查询,它将在后台任务中写入日志。

然后另一个在 路径操作函数 生成的后台任务会使用路径参数 email 写入一条信息。