文章
fastapi 入门
路径参数、查询参数
from fastapi import FastAPI, Path, Query
app = FastAPI()
# 路由上的:路径参数
@app.get("/hello/{name}")
async def say_hello(name: str = Path(max_length=10, description="这里填写用户名")):
return {"name": f"Hello {name}"}
# 路由上的:查询参数
@app.get("/books")
async def book_list(
page: int = Query(default=1, ge=1, description="页码"),
size: int = Query(default=10, description="每页数据")
):
return {"page": page, "size": size}
body参数、自定义参数验证规则
from fastapi import FastAPI, Body
from pydantic import BaseModel, field_validator, Field
from typing import Annotated
app = FastAPI()
# body参数 需要使用pydantic先定义一个包含所有字段的模型
class Item(BaseModel):
# name: str = Field(max_length=10) # 定义验证规则
name: Annotated[str, Field(max_length=10)]
description: str | None = None
price: float | None = 0
# 可以通过使用@field_validator装饰器来指定某个字段的自定义验证逻辑
@field_validator("name")
def validate_name(cls, value: str):
if " " in value:
raise ValueError("name中不能包含空格!")
return value
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item = Body(description="JSON格式")): # item_id来自路径 item来自body传参
print("item_id", item_id)
print(item.name, item.description)
return {"message": "update success"}
Cookie参数
# 获取header里面的Cookie中username=xxx的值
@app.get("/cookie/get")
async def get_cookie(username: str | None = Cookie(default=None)):
print("username", username)
return "success"
from fastapi.responses import JSONResponse
@app.get("/cookie/set")
async def set_cookie():
response = JSONResponse({"message": "set cookie"})
response.set_cookie("token", "xxx")
return response
请求头上的参数
# 由于请求头中的字段通常用 - 来拼接单词,而获取数据则将-转为_ 并将字母全部变为小写
# 比如User-Agent在获取参数是通过user_agent来获取的
@app.get("/header")
async def get_headers(
user_agent: str | None = Header(default=None),
host: str | None = Header(default=None)
):
print("user-agent", user_agent)
print("host", host)
return "success"
函数依赖项
from fastapi import Depends
async def common(q: str | None = None, skip: int = 0, limit: int = 10):
return {"q": q, "skip": skip, "limit": limit}
# 以下视图函数都是获取列表的,都需要获取q、skip、limit参数,如果在每个视图函数上都写以便,那么将有很多重复性代码
# 这里可以把获取数据的代码单独抽取成一个函数,然后再在每个视图函数上使用Depends进行依赖注入
@app.get("/items")
async def read_items(common: Dict = Depends(common)):
print(common.get("q"), common.get("skip"), common.get("limit"))
return {"message": "ok"}
@app.get("/users")
async def read_users(common: Dict = Depends(common)):
return common
类依赖项
class CommonQueryParams:
def __init__(self, q: str | None = None, skip: int = 0, limit: int = 10):
self.q = q
self.skip = skip
self.limit = limit
@app.get("/items2")
async def read_items2(common: Dict = Depends(CommonQueryParams)):
print(common.get("q"), common.get("skip"), common.get("limit"))
return {"message": "ok"}
APIRouter
routers/items.py
# coding: utf-8
from fastapi import APIRouter, Header, Depends
from fastapi.exceptions import HTTPException
async def login_required(token: str = Header(None)):
if token != "xx":
raise HTTPException(status_code=400, detail="Token验证失败!")
router = APIRouter(prefix="/item", tags=["item"], dependencies=[Depends(login_required)])
@router.get("/list")
async def item_list():
return {"items": [{"title": "xxx"}]}
routers/users.py
# coding: utf-8
from fastapi import APIRouter
router = APIRouter(prefix="/user", tags=["user"])
@router.get("/list")
async def get_user_list():
return [{"username": "bob"}]
main.py
from fastapi import FastAPI
from routers import users, items
app = FastAPI()
app.include_router(users.router)
app.include_router(items.router)
数据库连接和模型
安装异步版本
pip install sqlalchemy[asyncio]
安装异步mysql驱动
pip install asyncmy
pip install cryptography
连接参数格式
DB_URI = "mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4"
如果密码中有@符号请使用%%40替换@符号
SQLAlchemy中的Engine对象,负责管理数据库的连接的创建(并不直接操作数据库)、连接池的维护、SQL语句的翻译等。Engine对象在整个过程中只能有一个。
# coding: utf-8
from sqlalchemy.ext.asyncio import create_async_engine
engine = create_async_engine(
"mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4",
# 将输出所有执行SQL的日志(默认是关闭的)
echo=True,
# 连接池大小(默认5个)
pool_size=10,
# 允许连接池最大的连接数(默认10个)
max_overflow=20,
# 获得连接超时时间(默认30s)
pool_timeout=10,
# 连接回收时间(默认是-1,代表永不回收)
pool_recycle=3600,
# 连接前是否预检查(默认为False)
pool_pre_ping=True,
)
创建会话工厂
使用sqlalchemy.orm.sessionmaker类来创建会话工厂,这个会话工厂实际上就是Session或者他的子类,以后如果要操作数据库,那么需要创建一个会话工厂的对象(即Session类的对象)来完成相关操作。
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
AsyncSessionFactory = sessionmaker(
# Engine或者其子类读一下(这里是AsyncEngine)
bind=engine,
# Session类的代替(默认Session类)
class_=AsyncSession,
# 是否在查找之前执行flush操作(默认True)
autoflush=True,
# 是否在执行commit操作后Session就过期(默认是True)
expire_on_commit=False
)
应用
Session = AsyncSessionFactory()
session = Session()
创建模型
1、定义base类
Base类是所有ORM Model类的父类,一个ORM Model类对应数据库中的一张表,ORM Model中的一个Column类属性对应数据库表中的一个字段。Base类的生成可以使用sqlalchemy.ext.declarative.declarative_base函数来实现
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
models/__init__.py
# coding: utf-8
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.declarative import declarative_base
# DB_URI = "mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4"
engine = create_async_engine(
"mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4",
# 将输出所有执行SQL的日志(默认是关闭的)
echo=True,
# 连接池大小(默认5个)
pool_size=10,
# 允许连接池最大的连接数(默认10个)
max_overflow=20,
# 获得连接超时时间(默认30s)
pool_timeout=10,
# 连接回收时间(默认是-1,代表永不回收)
pool_recycle=3600,
# 连接前是否预检查(默认为False)
pool_pre_ping=True,
)
AsyncSessionFactory = sessionmaker(
# Engine或者其子类读一下(这里是AsyncEngine)
bind=engine,
# Session类的代替(默认Session类)
class_=AsyncSession,
# 是否在查找之前执行flush操作(默认True)
autoflush=True,
# 是否在执行commit操作后Session就过期(默认是True)
expire_on_commit=False
)
#Session = AsyncSessionFactory()
#session = Session()
Base = declarative_base()
# 导入其他模型的python文件
from . import users
创建ORM模型:models/users.py
# coding: utf-8
from routers.db import Base
from sqlalchemy import Column, Integer, String
class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True,autoincrement=True, index=True)
email = Column(String(100), unique=True, index=True)
username = Column(String(100), unique=True)
password = Column(String(200), nullable=False)
迁移模型
模型定义好后,要将模型映射到数据库中生成表,或者是以后模型上的字段名、字段类型等发生改变了,可以使用alembic来进行迁移
pip install alembic
迁移
创建迁移数仓
alembic的使用类型git,也可以进行版本回退,并且都需要项创建好一个迁移仓库,在项目根路径下使用以下命令生成仓库
alembic init alembic --template async
如果项目mysql不是异步的,比如pymysql,那么就不需要执行 --template async
修改alembic.ini
要将模型迁移到数仓中,还需要修改alembic.ini下连接数据库的配置
sqlalchemy.url = mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4
修改env.py
将alembic/env.py中的target_metadata修改为如下
from routers.db import Base
target_metadata = Base.metadata
生成迁移脚本
如果模型发生改变了那么需要先将模型生成迁移脚本,执行以下命令
alembic revision --autogenerate -m "修改的内容"
这样就会在alembic/versions文件夹下生成迁移脚本文件
执行迁移脚本
在alembic/versions下生成迁移脚本后,模型的修改并没有同步到数据库中,因此还需要执行以下命令
alembic upgrade head
models/users.py
# coding: utf-8
from sqlalchemy.orm import relationship
from models import Base
from sqlalchemy import Column, Integer, String, ForeignKey
class User(Base):
__tablename__ = "user"
id = Column(Integer, primary_key=True, autoincrement=True, index=True)
email = Column(String(100), unique=True, index=True)
username = Column(String(100), unique=True)
password = Column(String(200), nullable=False)
class UserExtension(Base):
__tablename__ = "user_extension"
id = Column(Integer, primary_key=True, index=True)
university = Column(String(100))
user_id = Column(Integer, ForeignKey("user.id"))
# 使用uselist=Fasle 表示和User表建立一对一的关系
user = relationship(User, backref="extension", uselist=False)
models/articles.py 使用外键来实现一对多、多对多
# coding: utf-8
from models import Base
from sqlalchemy import Column, Integer, String, ForeignKey, Text
from sqlalchemy.orm import relationship
class Tag(Base):
__tablename__ = "tag"
id = Column(Integer, primary_key=True, index=True)
name = Column(String(100), unique=True)
articles = relationship("Article", secondary="article_tag", back_populates="tags", lazy="dynamic")
class Article(Base):
__tablename__ = "article"
id = Column(Integer, primary_key=True, index=True)
title = Column(String(100), nullable=True)
content = Column(Text, nullable=False)
# author_id和author用于表示一对多的关系
author_id = Column(Integer, ForeignKey("user.id"))
author = relationship("User", backref="articles")
# tags: 表示Article和Tag的多对多的关系
tags = relationship("Tag", secondary="article_tag", back_populates="articles", lazy="dynamic")
# Article和Tag的中间表
class ArticleTag(Base):
__tablename__ = "article_tag"
id = Column(Integer, primary_key=True, index=True)
article_id = Column(Integer, ForeignKey("article.id"), primary_key=True)
tag_id = Column(Integer, ForeignKey("tag.id"), primary_key=True)
记得在models/__init__.py中导入articles模块
# 导入其他模型的python文件
from . import users
from . import articles
CRUD操作
创建schema
schemas/users.py
# coding: utf-8
from pydantic import BaseModel
class UserCreateReqSchema(BaseModel):
email: str
username: str
password: str
class UserRespSchema(BaseModel):
id: int
username: str
email: str
创建Session对象
1、使用异步上下文管理器async with创建,会自动关闭Session
@router.post("/add", response_model=UserRespSchema)
async def add_user(user_data: UserCreateReqSchema):
# 1、不用异步上下文管理器
session = AsyncSessionFactory()
# 以下代码会自动创建一个事物
try:
user = User(email=user_data.email, username=user_data.username, password=user_data.password)
session.add(user)
await session.commit()
await session.refresh(user, attribute_names=['id'])
except Exception as e:
await session.rollback()
raise HTTPException(status_code=400, detail="用户添加错误!")
await session.close()
return user
@router.post("/add", response_model=UserRespSchema)
async def add_user(user_data: UserCreateReqSchema):
async with AsyncSessionFactory() as session:
async with session.begin():
user = User(email=user_data.email, username=user_data.username, password=user_data.password)
session.add(user)
return user
2、使用依赖注入
from models import AsyncSession
from fastapi import Depends
async def get_session():
session = AsyncSessionFactory()
try:
yield session
finally:
await session.close()
@router.post("/add/depends", response_model=UserRespSchema)
async def add_user_depends(
user_data: UserCreateReqSchema,
session: AsyncSession = Depends(get_session)
):
async with session.begin():
user = User(email=user_data.email, username=user_data.username, password=user_data.password)
session.add(user)
return user
3、使用中间件
fastapi中的中间件包含请求到达视图函数之前,以及响应到达浏览器之前所要执行的操作。我们可以在请求到达视图函数之前,先把Session对象创建好,然后绑定到request.state对象上(request.state可用于绑定任何本次请求中创建的数据),接着在视图函数中,通过request.state.session来进行访问和使用
# main.py 添加如下
from fastapi.requests import Request
@app.middleware("http")
async def create_session_middleware(request: Request, call_next):
# 请求到达视图前执行的
session = AsyncSessionFactory()
request.state.session = session
# setattr(request.state, "session", session)
response = await call_next(request)
# 响应返回给浏览器之前的
await session.close()
return response
# routes/users.py
@router.post("/add/middleware", response_model=UserRespSchema)
async def add_user_depends(
request: Request,
user_data: UserCreateReqSchema
):
session = request.state.session
async with session.begin():
user = User(email=user_data.email, username=user_data.username, password=user_data.password)
session.add(user)
return user
中间件重构
middlewares/dbs.py
# coding: utf-8
from fastapi.requests import Request
from models import AsyncSessionFactory
async def create_session_middleware(request: Request, call_next):
# 请求到达视图前执行的
session = AsyncSessionFactory()
request.state.session = session
# setattr(request.state, "session", session)
response = await call_next(request)
# 响应返回给浏览器之前的
await session.close()
return response
main.py修改
# coding: utf-8
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware
from routers import users, items
from middlewares import dbs
app = FastAPI()
app.include_router(users.router)
app.include_router(items.router)
app.add_middleware(BaseHTTPMiddleware, dispatch=dbs.create_session_middleware)
CRUD
from sqlalchemy import delete
@router.delete("/delete/{user_id}")
async def delete_user(user_id: int, request: Request):
session = request.state.session
try:
async with session.begin():
sql = delete(User).where(User.id == user_id)
await session.execute(sql)
return {"message": "删除成功"}
except Exception as e:
raise HTTPException(status_code=400, detail="删除失败!")
from sqlalchemy import select
# 查询单条数据
@router.get("/select/{user_id}", response_model=UserRespSchema)
async def select_user_by_id(user_id: int, request: Request):
session = request.state.session
async with session.begin():
stmt = select(User.id, User.username, User.email).where(User.id == user_id)
query = await session.execute(stmt)
row = query.one()
return row._asdict()