文章
fastapi 权限认证访问接口示例
/.env
AUTH_SECRET_KEY=09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
AUTH_ALGORITHM=HS256
API_URL=http://localhost:3000
/requirements.txt
aiofiles==24.1.0
annotated-types==0.7.0
anyio==4.9.0
bcrypt==4.0.1
certifi==2025.6.15
click==8.2.1
colorama==0.4.6
dnspython==2.7.0
ecdsa==0.19.1
email_validator==2.2.0
fastapi==0.115.14
fastapi-cli==0.0.7
greenlet==3.2.3
h11==0.16.0
httpcore==1.0.9
httptools==0.6.4
httpx==0.28.1
idna==3.10
Jinja2==3.1.6
markdown-it-py==3.0.0
MarkupSafe==3.0.2
mdurl==0.1.2
passlib==1.7.4
pyasn1==0.6.1
pydantic==2.10.6
pydantic_core==2.27.2
Pygments==2.19.2
python-dotenv==1.1.1
python-jose==3.5.0
python-multipart==0.0.20
PyYAML==6.0.2
rich==14.0.0
rich-toolkit==0.14.7
rsa==4.9.1
shellingham==1.5.4
six==1.17.0
sniffio==1.3.1
SQLAlchemy==2.0.41
starlette==0.46.2
typer==0.16.0
typing-inspection==0.4.1
typing_extensions==4.14.0
uvicorn==0.34.3
watchfiles==1.1.0
websockets==15.0.1
/api/database.py
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
SQL_DATABASE_URL = "sqlite:///app.db"
engine = create_engine(
SQL_DATABASE_URL,
connect_args={"check_same_thread": False},
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
/api/models.py
from sqlalchemy import Column, Integer, String, ForeignKey, Table
from api.database import Base
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String, unique=True, index=True)
password = Column(String)
class Article(Base):
__tablename__ = "articles"
id = Column(Integer, primary_key=True)
user_id = Column(Integer, ForeignKey("users.id"))
title = Column(String, index=True)
content = Column(String)
/api/deps.py
import os
from typing import Annotated
from sqlalchemy.orm import Session
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from passlib.context import CryptContext
from jose import jwt, JWTError
from dotenv import load_dotenv
from pydantic import BaseModel
from api.database import SessionLocal
load_dotenv()
SECRET_KEY = os.getenv("AUTH_SECRET_KEY")
ALGORITHM = os.getenv("AUTH_ALGORITHM")
def get_db():
# 生成一个新的 SQLAlchemy Session
db = SessionLocal()
try:
# 将 Session 提供给路由使用
yield db
finally:
# 在请求完成后自动关闭它
db.close()
# db_dependency 是 FastAPI 依赖注入系统的一个快捷方式,用于在路由中获取 SQLAlchemy Session
# 它的本质是通过 get_db() 生成一个临时 Session,并在请求结束时自动清理
# Annotated[Session, Depends(get_db)] 表示这是一个依赖项,类型是 SQLAlchemy 的 Session
# Depends(get_db) 表示 FastAPI 会调用 get_db() 函数来获取数据库会话(Session)
db_dependency = Annotated[Session, Depends(get_db)]
bcrypt_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# 在 Swagger UI 或 ReDoc 中,tokenUrl 会显示一个 “Authorize”按钮,点击后引导用户到指定的登录页面(这里是 /auth/token)
# 即使 tokenUrl 填错(如 tokenUrl="not-exist"),也不会影响代码运行,只会让文档的引导链接失效。
# OAuth2PasswordBearer 的设计就是提取并返回 Token 字符串
oauth2_bearer = OAuth2PasswordBearer(tokenUrl="auth/token")
# 自动提取请求头中的 Token(str)
# 执行流程:
# 当你在路由中使用 oauth2_bearer_dependency(即 Depends(oauth2_bearer))时:
# FastAPI 会调用 oauth2_bearer()(即 OAuth2PasswordBearer.__call__)。
# 该方法会检查请求的 Authorization 头部,验证格式是否为 Bearer <token>。
# 如果有效,返回 <token>(str 类型);否则抛出 HTTP 401 错误。
oauth2_bearer_dependency = Annotated[str, Depends(oauth2_bearer)]
class CurrentUser(BaseModel):
username: str
id: int
async def get_current_user(token: oauth2_bearer_dependency) -> CurrentUser:
"""根据token解析user"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
user_id = payload.get("id")
if username is None or user_id is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate user")
return CurrentUser(username=username, id=user_id)
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate user")
# 自动根据token获取当前已登陆用户的用户名和id
user_dependency = Annotated[dict, Depends(get_current_user)]
/api/routers/auth.py
import os
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastapi import APIRouter, HTTPException, status, Depends
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from fastapi.responses import JSONResponse
from jose import jwt, JWTError
from dotenv import load_dotenv
from api.models import User
from api.deps import db_dependency, bcrypt_context, oauth2_bearer_dependency
load_dotenv()
router = APIRouter(
prefix="/auth",
tags=["auth"],
)
SECRET_KEY = os.getenv("AUTH_SECRET_KEY")
ALGORITHM = os.getenv("AUTH_ALGORITHM")
class UserCreateRequest(BaseModel):
username: str
password: str
# 定义接收JSON请求方式获取token
class UserLoginRequest(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
refresh_token: str
def authenticate_user(username: str, password: str, db: db_dependency) -> User | bool:
user = db.query(User).filter(User.username == username).first()
if not user:
return False
if not bcrypt_context.verify(password, user.password):
return False
return user
def create_access_token(username: str, user_id: int, expires_delta: timedelta):
# 组装jwt需要的参数
encode = {"sub": username, "id": user_id}
expires = datetime.now(timezone.utc) + expires_delta
encode.update({"exp": expires})
# jwt生成token
return jwt.encode(encode, SECRET_KEY, algorithm=ALGORITHM)
# 创建用户 /auth/user
@router.post("/user", status_code=status.HTTP_201_CREATED)
async def create_user(db: db_dependency, create_user_request: UserCreateRequest):
# 检查用户名唯一性
if db.query(User).filter(User.username == create_user_request.username).first():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Username already exists"
)
# 创建用户
try:
create_user_model = User(
username=create_user_request.username,
password=bcrypt_context.hash(create_user_request.password)
)
db.add(create_user_model)
db.commit()
db.refresh(create_user_model) # 刷新以获取数据库生成的ID等字段
return JSONResponse(
content={
"id": create_user_model.id,
"username": create_user_model.username
},
status_code=status.HTTP_201_CREATED
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user"
)
@router.post("/login")
async def login_for_access_token(
user_data: UserLoginRequest, # 直接接收JSON数据
db: db_dependency):
user = authenticate_user(user_data.username, user_data.password, db)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate user")
# 生成access_token 短期有效 如15分钟
access_token = create_access_token(user.username, user.id, timedelta(minutes=15))
# 生成refresh_token 长期有效 如7天
refresh_token = create_access_token(user.username, user.id, timedelta(days=7))
return {"access_token": access_token, "refresh_token": refresh_token, "username": user_data.username}
@router.post("/token")
async def login_for_access_token(form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
db: db_dependency):
user = authenticate_user(form_data.username, form_data.password, db)
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate user")
token = create_access_token(user.username, user.id, timedelta(minutes=20))
return {"access_token": token, "token_type": "bearer"}
# 刷新token的接口
@router.post("/refresh")
async def refresh_access_token(refresh_token: oauth2_bearer_dependency):
# 首先根据token解析用户
try:
payload = jwt.decode(refresh_token, SECRET_KEY, algorithms=[ALGORITHM])
username = payload.get("sub")
user_id = payload.get("id")
if username is None or user_id is None:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate user")
# 使用解析的用户信息 生成新的 access_token
new_access_token = create_access_token(username, user_id, timedelta(minutes=15))
return {"access_token": new_access_token, "token_type": "bearer"}
except JWTError:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate user")
/api/routers/articles.py
from typing import List
from pydantic import BaseModel
from fastapi import APIRouter, HTTPException, status
from starlette.responses import JSONResponse
from api.deps import db_dependency, user_dependency
from api.models import Article
router = APIRouter(
prefix="/articles",
tags=["articles"],
)
class ArticleBase(BaseModel):
title: str
content: str
class ArticleCreate(ArticleBase):
pass
class ArticleResp(ArticleBase):
id: int
@router.get("/", response_model=List[ArticleResp])
async def get_article_list(db: db_dependency, user: user_dependency):
articles = db.query(Article).filter(
Article.user_id == user.id,
).all()
return articles
@router.post("/", status_code=status.HTTP_201_CREATED)
async def create_article(db: db_dependency, user: user_dependency, create_article_request: ArticleCreate):
# 检查文章标题唯一性
if db.query(Article).filter(
Article.title == create_article_request.title,
Article.user_id == user.id,
).first():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Article title already exists",
)
# 创建文章
try:
create_article_model = Article(
title=create_article_request.title,
content=create_article_request.content,
user_id=user.id,
)
db.add(create_article_model)
db.commit()
db.refresh(create_article_model)
return JSONResponse(
content={
"id": create_article_model.id,
"title": create_article_model.title,
"content": create_article_model.content,
},
status_code=status.HTTP_201_CREATED
)
except Exception as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="Failed to create user"
)
@router.delete("/{article_id}")
async def delete_article(db: db_dependency, user: user_dependency, article_id: int):
article = db.query(Article).filter(
Article.id == article_id,
Article.user_id == user.id,
).first()
if not article:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Article not found or access denied")
db.delete(article)
db.commit()
return {"message": "Article deleted successfully"}
@router.get("/{article_id}", response_model=ArticleResp)
async def get_article(db: db_dependency, user: user_dependency, article_id: int):
article = db.query(Article).filter(
Article.id == article_id,
Article.user_id == user.id,
).first()
if not article:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Article not found or access denied")
return article
/api/main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from api.database import Base, engine
from api.routers import auth, articles
from fastapi.openapi.docs import get_swagger_ui_html
app = FastAPI()
# 根据 SQLAlchemy 模型创建数据库表, 适合快速原型开发或测试环境,生产环境建议用迁移工具
# 如果你修改了模型字段,需要重新创建表(需先删除旧表或使用迁移工具
Base.metadata.create_all(bind=engine)
origins = [
# 指定运行访问后端的前端地址
"http://localhost:3000",
"http://127.0.0.1:3000",
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(auth.router)
app.include_router(articles.router)
@app.get("/")
async def root():
return {"message": "Hello World"}
@app.get("/hello/{name}")
async def say_hello(name: str):
return {"message": f"Hello {name}"}
@app.get("/docs", include_in_schema=False)
async def custom_swagger_ui_html():
return get_swagger_ui_html(
openapi_url="/openapi.json",
title="Docs",
swagger_js_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.9.0/swagger-ui-bundle.js",
swagger_css_url="https://cdn.bootcdn.net/ajax/libs/swagger-ui/5.9.0/swagger-ui.css",
)