文章
FastAPI官方文档学习05
1、SQL(关系型)数据库
安装依赖
pip install sqlmodel
from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, Query
from sqlmodel import Field, Session, SQLModel, create_engine, select
class Hero(SQLModel, table=True):
id: int | None = Field(default=None, primary_key=True)
name: str = Field(index=True)
age: int | None = Field(default=None, index=True)
secret_name: str
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
SessionDep = Annotated[Session, Depends(get_session)]
app = FastAPI()
@app.on_event("startup")
def on_startup():
create_db_and_tables()
@app.post("/heroes/")
def create_hero(hero: Hero, session: SessionDep) -> Hero:
session.add(hero)
session.commit()
session.refresh(hero)
return hero
@app.get("/heroes/")
def read_heroes(
session: SessionDep,
offset: int = 0,
limit: Annotated[int, Query(le=100)] = 100,
) -> list[Hero]:
heroes = session.exec(select(Hero).offset(offset).limit(limit)).all()
return heroes
@app.get("/heroes/{hero_id}")
def read_hero(hero_id: int, session: SessionDep) -> Hero:
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
return hero
@app.delete("/heroes/{hero_id}")
def delete_hero(hero_id: int, session: SessionDep):
hero = session.get(Hero, hero_id)
if not hero:
raise HTTPException(status_code=404, detail="Hero not found")
session.delete(hero)
session.commit()
return {"ok": True}
class Hero(SQLModel, table=True) 类定义的理解
元类(Metaclass)
- 元类是用来创建类的类
SQLModel
本身有一个元类(叫做SQLModelMetaclass
)- 当你继承
SQLModel
时,你的类Hero
也会使用同一个元类
# SQLModel 的定义(简化版)
class SQLModel(metaclass=SQLModelMetaclass):
pass
# 你的 Hero 类
class Hero(SQLModel, table=True):
pass
这相当于:
class Hero(metaclass=SQLModelMetaclass, table=True):
# 继承了 SQLModel 的所有属性和方法
pass
关键结论
SQLModel
是基类,不是元类Hero
是普通类,不是元类Hero
和SQLModel
共享同一个元类SQLModelMetaclass
table=True
被传递给元类SQLModelMetaclass
进行处理
所以 Hero
仍然是一个可以实例化的普通类,只是它的创建过程被特殊的元类控制了。
Hero
类与 Pydantic 模型非常相似(实际上,从底层来看,它确实就是一个 Pydantic 模型)。
有一些区别:
table=True
会告诉 SQLModel 这是一个表模型,它应该表示 SQL 数据库中的一个表,而不仅仅是一个数据模型(就像其他常规的 Pydantic 类一样)。Field(primary_key=True)
会告诉 SQLModelid
是 SQL 数据库中的主键(您可以在 SQLModel 文档中了解更多关于 SQL 主键的信息)。把类型设置为int | None
,SQLModel 就能知道该列在 SQL 数据库中应该是INTEGER
类型,并且应该是NULLABLE
。Field(index=True)
会告诉 SQLModel 应该为此列创建一个 SQL 索引,这样在读取按此列过滤的数据时,程序能在数据库中进行更快的查找。SQLModel 会知道声明为str
的内容将是类型为TEXT
(或VARCHAR
,具体取决于数据库)的 SQL 列。
connect_args = {"check_same_thread": False}
使用 check_same_thread=False
可以让 FastAPI 在不同线程中使用同一个 SQLite 数据库。这很有必要,因为单个请求可能会使用多个线程(例如在依赖项中)。
不用担心,我们会按照代码结构确保每个请求使用一个单独的 SQLModel 会话,这实际上就是 check_same_thread
想要实现的。
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
- 使用
SQLModel.metadata.create_all(engine)
为所有表模型创建表。
创建会话(Session)依赖项¶
Session
会存储内存中的对象并跟踪数据中所需更改的内容,然后它使用 engine
与数据库进行通信。
我们会使用 yield
创建一个 FastAPI 依赖项,为每个请求提供一个新的 Session
。这确保我们每个请求使用一个单独的会话。🤓
然后我们创建一个 Annotated
的依赖项 SessionDep
来简化其他也会用到此依赖的代码。
def get_session():
with Session(engine) as session:
yield session
SessionDep = Annotated[Session, Depends(get_session)]
2、更大的应用 - 多个文件
一个文件结构示例¶
假设你的文件结构如下:
.
├── app # 「app」是一个 Python 包
│ ├── __init__.py # 这个文件使「app」成为一个 Python 包
│ ├── main.py # 「main」模块,例如 import app.main
│ ├── dependencies.py # 「dependencies」模块,例如 import app.dependencies
│ └── routers # 「routers」是一个「Python 子包」
│ │ ├── __init__.py # 使「routers」成为一个「Python 子包」
│ │ ├── items.py # 「items」子模块,例如 import app.routers.items
│ │ └── users.py # 「users」子模块,例如 import app.routers.users
│ └── internal # 「internal」是一个「Python 子包」
│ ├── __init__.py # 使「internal」成为一个「Python 子包」
│ └── admin.py # 「admin」子模块,例如 import app.internal.admin
APIRouter
假设专门用于处理用户逻辑的文件是位于 /app/routers/users.py
的子模块。
你希望将与用户相关的路径操作与其他代码分开,以使其井井有条。
from fastapi import APIRouter
router = APIRouter()
@router.get("/users/", tags=["users"])
async def read_users():
return [{"username": "Rick"}, {"username": "Morty"}]
from fastapi import APIRouter, Depends, HTTPException
from ..dependencies import get_query_token
router = APIRouter(
prefix="/items",
tags=["items"],
dependencies=[Depends(get_query_token)],
responses={404: {"description": "Not found"}}
)
fake_items_db = {"plumbus": {"name": "Plumbus"}, "gun": {"name": "Portal Gun"}}
@router.get("/")
async def read_items():
return fake_items_db
@router.get("/{item_id}")
async def read_item(item_id: str):
if item_id not in fake_items_db:
raise HTTPException(status_code=404, detail="Item not found")
return {"name": fake_items_db[item_id]["name"], "item_id": item_id}
@router.put(
"/{item_id}",
tags=["custom"],
responses={403: {"description": "Operation forbidden"}}
)
async def update_item(item_id: str):
if item_id != "plumbus":
raise HTTPException(
status_code=403, detail="You can only update the item: plumbus"
)
return {"item_id": item_id, "name": "The great Plumbus"}
我们知道此模块中的所有路径操作都有相同的:
- 路径
prefix
:/items
。 tags
:(仅有一个items
标签)。- 额外的
responses
。 dependencies
:它们都需要我们创建的X-Token
依赖项。
我们可以添加一个 tags 列表和额外的 responses 列表,这些参数将应用于此路由器中包含的所有路径操作。
我们可以添加一个 dependencies 列表,这些依赖项将被添加到路由器中的所有路径操作中,并将针对向它们发起的每个请求执行/解决。
3、后台任务
你可以定义在返回响应后运行的后台任务。
这对需要在请求之后执行的操作很有用,但客户端不必在接收响应之前等待操作完成。
包括这些例子:
- 执行操作后发送的电子邮件通知:
- 由于连接到电子邮件服务器并发送电子邮件往往很“慢”(几秒钟),您可以立即返回响应并在后台发送电子邮件通知。
- 处理数据:
- 例如,假设您收到的文件必须经过一个缓慢的过程,您可以返回一个"Accepted"(HTTP 202)响应并在后台处理它。
from fastapi import BackgroundTasks, FastAPI
app = FastAPI()
def write_notification(email: str, message=""):
with open("log.txt", mode="w") as email_file:
content = f"notification for {email}: {message}"
email_file.write(content)
@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_notification, email, message="some notification")
return {"message": "Notification sent in the background"}
FastAPI 会创建一个 BackgroundTasks
类型的对象并作为该参数传入。
from typing import Annotated
from fastapi import BackgroundTasks, Depends, FastAPI
app = FastAPI()
def write_log(message: str):
with open("log.txt", mode="a") as log:
log.write(message)
def get_query(background_tasks: BackgroundTasks, q: str | None = None):
if q:
message = f"found query: {q}\n"
background_tasks.add_task(write_log, message)
return q
@app.post("/send-notification/{email}")
async def send_notification(
email: str, background_tasks: BackgroundTasks, q: Annotated[str, Depends(get_query)]
):
message = f"message to {email}\n"
background_tasks.add_task(write_log, message)
return {"message": "Message sent"}
该示例中,信息会在响应发出 之后 被写到 log.txt
文件。
如果请求中有查询,它将在后台任务中写入日志。
然后另一个在 路径操作函数 生成的后台任务会使用路径参数 email
写入一条信息。