大数据

fastapi 入门

路径参数、查询参数

from fastapi import FastAPI, Path, Query

app = FastAPI()


# 路由上的:路径参数
@app.get("/hello/{name}")
async def say_hello(name: str = Path(max_length=10, description="这里填写用户名")):
    return {"name": f"Hello {name}"}


# 路由上的:查询参数
@app.get("/books")
async def book_list(
        page: int = Query(default=1, ge=1, description="页码"),
        size: int = Query(default=10, description="每页数据")
):
    return {"page": page, "size": size}

body参数、自定义参数验证规则

from fastapi import FastAPI, Body
from pydantic import BaseModel, field_validator, Field
from typing import Annotated

app = FastAPI()


# body参数 需要使用pydantic先定义一个包含所有字段的模型
class Item(BaseModel):
    # name: str = Field(max_length=10)  # 定义验证规则
    name: Annotated[str, Field(max_length=10)]
    description: str | None = None
    price: float | None = 0

    # 可以通过使用@field_validator装饰器来指定某个字段的自定义验证逻辑
    @field_validator("name")
    def validate_name(cls, value: str):
        if " " in value:
            raise ValueError("name中不能包含空格!")
        return value


@app.put("/items/{item_id}")
async def update_item(item_id: int, item: Item = Body(description="JSON格式")):  # item_id来自路径 item来自body传参
    print("item_id", item_id)
    print(item.name, item.description)
    return {"message": "update success"}

Cookie参数

# 获取header里面的Cookie中username=xxx的值
@app.get("/cookie/get")
async def get_cookie(username: str | None = Cookie(default=None)):
    print("username", username)
    return "success"


from fastapi.responses import JSONResponse


@app.get("/cookie/set")
async def set_cookie():
    response = JSONResponse({"message": "set cookie"})
    response.set_cookie("token", "xxx")
    return response

请求头上的参数

# 由于请求头中的字段通常用 - 来拼接单词,而获取数据则将-转为_ 并将字母全部变为小写
# 比如User-Agent在获取参数是通过user_agent来获取的
@app.get("/header")
async def get_headers(
        user_agent: str | None = Header(default=None),
        host: str | None = Header(default=None)
):
    print("user-agent", user_agent)
    print("host", host)
    return "success"

函数依赖项

from fastapi import Depends


async def common(q: str | None = None, skip: int = 0, limit: int = 10):
    return {"q": q, "skip": skip, "limit": limit}


# 以下视图函数都是获取列表的,都需要获取q、skip、limit参数,如果在每个视图函数上都写以便,那么将有很多重复性代码
# 这里可以把获取数据的代码单独抽取成一个函数,然后再在每个视图函数上使用Depends进行依赖注入
@app.get("/items")
async def read_items(common: Dict = Depends(common)):
    print(common.get("q"), common.get("skip"), common.get("limit"))
    return {"message": "ok"}


@app.get("/users")
async def read_users(common: Dict = Depends(common)):
    return common

类依赖项

class CommonQueryParams:
    def __init__(self, q: str | None = None, skip: int = 0, limit: int = 10):
        self.q = q
        self.skip = skip
        self.limit = limit


@app.get("/items2")
async def read_items2(common: Dict = Depends(CommonQueryParams)):
    print(common.get("q"), common.get("skip"), common.get("limit"))
    return {"message": "ok"}

APIRouter

routers/items.py

# coding: utf-8
from fastapi import APIRouter, Header, Depends
from fastapi.exceptions import HTTPException


async def login_required(token: str = Header(None)):
    if token != "xx":
        raise HTTPException(status_code=400, detail="Token验证失败!")


router = APIRouter(prefix="/item", tags=["item"], dependencies=[Depends(login_required)])


@router.get("/list")
async def item_list():
    return {"items": [{"title": "xxx"}]}

routers/users.py

# coding: utf-8
from fastapi import APIRouter

router = APIRouter(prefix="/user", tags=["user"])


@router.get("/list")
async def get_user_list():
    return [{"username": "bob"}]

main.py

from fastapi import FastAPI
from routers import users, items

app = FastAPI()
app.include_router(users.router)
app.include_router(items.router)

数据库连接和模型

安装异步版本

pip install sqlalchemy[asyncio]

安装异步mysql驱动

pip install asyncmy
pip install cryptography

连接参数格式

DB_URI = "mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4"

如果密码中有@符号请使用%%40替换@符号

SQLAlchemy中的Engine对象,负责管理数据库的连接的创建(并不直接操作数据库)、连接池的维护、SQL语句的翻译等。Engine对象在整个过程中只能有一个。

# coding: utf-8
from sqlalchemy.ext.asyncio import create_async_engine

engine = create_async_engine(
    "mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4",
    # 将输出所有执行SQL的日志(默认是关闭的)
    echo=True,
    # 连接池大小(默认5个)
    pool_size=10,
    # 允许连接池最大的连接数(默认10个)
    max_overflow=20,
    # 获得连接超时时间(默认30s)
    pool_timeout=10,
    # 连接回收时间(默认是-1,代表永不回收)
    pool_recycle=3600,
    # 连接前是否预检查(默认为False)
    pool_pre_ping=True,
)

创建会话工厂

使用sqlalchemy.orm.sessionmaker类来创建会话工厂,这个会话工厂实际上就是Session或者他的子类,以后如果要操作数据库,那么需要创建一个会话工厂的对象(即Session类的对象)来完成相关操作。

from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession

AsyncSessionFactory = sessionmaker(
    # Engine或者其子类读一下(这里是AsyncEngine)
    bind=engine,
    # Session类的代替(默认Session类)
    class_=AsyncSession,
    # 是否在查找之前执行flush操作(默认True)
    autoflush=True,
    # 是否在执行commit操作后Session就过期(默认是True)
    expire_on_commit=False
)

应用

Session = AsyncSessionFactory()
session = Session()

创建模型

1、定义base类

Base类是所有ORM Model类的父类,一个ORM Model类对应数据库中的一张表,ORM Model中的一个Column类属性对应数据库表中的一个字段。Base类的生成可以使用sqlalchemy.ext.declarative.declarative_base函数来实现

from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

models/__init__.py

# coding: utf-8
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.declarative import declarative_base

# DB_URI = "mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4"

engine = create_async_engine(
    "mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4",
    # 将输出所有执行SQL的日志(默认是关闭的)
    echo=True,
    # 连接池大小(默认5个)
    pool_size=10,
    # 允许连接池最大的连接数(默认10个)
    max_overflow=20,
    # 获得连接超时时间(默认30s)
    pool_timeout=10,
    # 连接回收时间(默认是-1,代表永不回收)
    pool_recycle=3600,
    # 连接前是否预检查(默认为False)
    pool_pre_ping=True,
)

AsyncSessionFactory = sessionmaker(
    # Engine或者其子类读一下(这里是AsyncEngine)
    bind=engine,
    # Session类的代替(默认Session类)
    class_=AsyncSession,
    # 是否在查找之前执行flush操作(默认True)
    autoflush=True,
    # 是否在执行commit操作后Session就过期(默认是True)
    expire_on_commit=False
)

#Session = AsyncSessionFactory()
#session = Session()

Base = declarative_base()

# 导入其他模型的python文件
from . import users

创建ORM模型:models/users.py

# coding: utf-8
from routers.db import Base
from sqlalchemy import Column, Integer, String


class User(Base):
    __tablename__ = "user"
    id = Column(Integer, primary_key=True,autoincrement=True, index=True)
    email = Column(String(100), unique=True, index=True)
    username = Column(String(100), unique=True)
    password = Column(String(200), nullable=False)

迁移模型

模型定义好后,要将模型映射到数据库中生成表,或者是以后模型上的字段名、字段类型等发生改变了,可以使用alembic来进行迁移

pip install alembic

迁移

创建迁移数仓

alembic的使用类型git,也可以进行版本回退,并且都需要项创建好一个迁移仓库,在项目根路径下使用以下命令生成仓库

alembic init alembic --template async

如果项目mysql不是异步的,比如pymysql,那么就不需要执行 --template async

修改alembic.ini

要将模型迁移到数仓中,还需要修改alembic.ini下连接数据库的配置

sqlalchemy.url = mysql+asyncmy://用户名:密码@主机名:端口号/数据库名称?charset=utf8mb4

修改env.py

将alembic/env.py中的target_metadata修改为如下

from routers.db import Base
target_metadata = Base.metadata

生成迁移脚本

如果模型发生改变了那么需要先将模型生成迁移脚本,执行以下命令

alembic revision --autogenerate -m "修改的内容"

这样就会在alembic/versions文件夹下生成迁移脚本文件

执行迁移脚本

在alembic/versions下生成迁移脚本后,模型的修改并没有同步到数据库中,因此还需要执行以下命令

alembic upgrade head

models/users.py

# coding: utf-8
from sqlalchemy.orm import relationship

from models import Base
from sqlalchemy import Column, Integer, String, ForeignKey


class User(Base):
    __tablename__ = "user"
    id = Column(Integer, primary_key=True, autoincrement=True, index=True)
    email = Column(String(100), unique=True, index=True)
    username = Column(String(100), unique=True)
    password = Column(String(200), nullable=False)


class UserExtension(Base):
    __tablename__ = "user_extension"
    id = Column(Integer, primary_key=True, index=True)
    university = Column(String(100))
    user_id = Column(Integer, ForeignKey("user.id"))
    # 使用uselist=Fasle 表示和User表建立一对一的关系
    user = relationship(User, backref="extension", uselist=False)

models/articles.py 使用外键来实现一对多、多对多

# coding: utf-8
from models import Base
from sqlalchemy import Column, Integer, String, ForeignKey, Text
from sqlalchemy.orm import relationship


class Tag(Base):
    __tablename__ = "tag"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String(100), unique=True)
    articles = relationship("Article", secondary="article_tag", back_populates="tags", lazy="dynamic")


class Article(Base):
    __tablename__ = "article"
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String(100), nullable=True)
    content = Column(Text, nullable=False)
    # author_id和author用于表示一对多的关系
    author_id = Column(Integer, ForeignKey("user.id"))
    author = relationship("User", backref="articles")
    # tags: 表示Article和Tag的多对多的关系
    tags = relationship("Tag", secondary="article_tag", back_populates="articles", lazy="dynamic")


# Article和Tag的中间表
class ArticleTag(Base):
    __tablename__ = "article_tag"
    id = Column(Integer, primary_key=True, index=True)
    article_id = Column(Integer, ForeignKey("article.id"), primary_key=True)
    tag_id = Column(Integer, ForeignKey("tag.id"), primary_key=True)

记得在models/__init__.py中导入articles模块

# 导入其他模型的python文件
from . import users
from . import articles

CRUD操作

创建schema

schemas/users.py

# coding: utf-8
from pydantic import BaseModel


class UserCreateReqSchema(BaseModel):
    email: str
    username: str
    password: str


class UserRespSchema(BaseModel):
    id: int
    username: str
    email: str

创建Session对象

1、使用异步上下文管理器async with创建,会自动关闭Session

@router.post("/add", response_model=UserRespSchema)
async def add_user(user_data: UserCreateReqSchema):
    # 1、不用异步上下文管理器
    session = AsyncSessionFactory()
    # 以下代码会自动创建一个事物
    try:
        user = User(email=user_data.email, username=user_data.username, password=user_data.password)
        session.add(user)
        await session.commit()
        await session.refresh(user, attribute_names=['id'])
    except Exception as e:
        await session.rollback()
        raise HTTPException(status_code=400, detail="用户添加错误!")
    await session.close()
    return user

@router.post("/add", response_model=UserRespSchema)
async def add_user(user_data: UserCreateReqSchema):
    async with AsyncSessionFactory() as session:
        async with session.begin():
            user = User(email=user_data.email, username=user_data.username, password=user_data.password)
            session.add(user)
    return user

2、使用依赖注入

from models import AsyncSession
from fastapi import Depends

async def get_session():
    session = AsyncSessionFactory()
    try:
        yield session
    finally:
        await session.close()

@router.post("/add/depends", response_model=UserRespSchema)
async def add_user_depends(
        user_data: UserCreateReqSchema,
        session: AsyncSession = Depends(get_session)
):
    async with session.begin():
        user = User(email=user_data.email, username=user_data.username, password=user_data.password)
        session.add(user)
    return user

3、使用中间件

fastapi中的中间件包含请求到达视图函数之前,以及响应到达浏览器之前所要执行的操作。我们可以在请求到达视图函数之前,先把Session对象创建好,然后绑定到request.state对象上(request.state可用于绑定任何本次请求中创建的数据),接着在视图函数中,通过request.state.session来进行访问和使用

# main.py 添加如下
from fastapi.requests import Request

@app.middleware("http")
async def create_session_middleware(request: Request, call_next):
    # 请求到达视图前执行的
    session = AsyncSessionFactory()
    request.state.session = session
    # setattr(request.state, "session", session)
    response = await call_next(request)
    # 响应返回给浏览器之前的
    await session.close()
    return response

# routes/users.py
@router.post("/add/middleware", response_model=UserRespSchema)
async def add_user_depends(
        request: Request,
        user_data: UserCreateReqSchema
):
    session = request.state.session
    async with session.begin():
        user = User(email=user_data.email, username=user_data.username, password=user_data.password)
        session.add(user)
    return user

中间件重构

middlewares/dbs.py

# coding: utf-8
from fastapi.requests import Request
from models import AsyncSessionFactory


async def create_session_middleware(request: Request, call_next):
    # 请求到达视图前执行的
    session = AsyncSessionFactory()
    request.state.session = session
    # setattr(request.state, "session", session)
    response = await call_next(request)
    # 响应返回给浏览器之前的
    await session.close()
    return response

main.py修改

# coding: utf-8
from fastapi import FastAPI
from starlette.middleware.base import BaseHTTPMiddleware

from routers import users, items
from middlewares import dbs

app = FastAPI()
app.include_router(users.router)
app.include_router(items.router)
app.add_middleware(BaseHTTPMiddleware, dispatch=dbs.create_session_middleware)

CRUD

from sqlalchemy import delete

@router.delete("/delete/{user_id}")
async def delete_user(user_id: int, request: Request):
    session = request.state.session
    try:
        async with session.begin():
            sql = delete(User).where(User.id == user_id)
            await session.execute(sql)
        return {"message": "删除成功"}
    except Exception as e:
        raise HTTPException(status_code=400, detail="删除失败!")
from sqlalchemy import select


# 查询单条数据
@router.get("/select/{user_id}", response_model=UserRespSchema)
async def select_user_by_id(user_id: int, request: Request):
    session = request.state.session
    async with session.begin():
        stmt = select(User.id, User.username, User.email).where(User.id == user_id)
        query = await session.execute(stmt)
        row = query.one()
        return row._asdict()