全栈开发

FastAPI官方文档学习04

1、安全性

OAuth2是一个规范,它定义了几种处理身份认证和授权的方法。

OpenAPI(以前称为 Swagger)是用于构建 API 的开放规范

from fastapi import FastAPI, Depends
from typing import Annotated
from fastapi.security import OAuth2PasswordBearer


app = FastAPI()

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.get("/items/")
async def read_items(token: Annotated[str, Depends(oauth2_scheme)]):
    return {"token": token}

先安装 python-multipart

安装命令: pip install python-multipart

这是因为 OAuth2 使用表单数据发送 username 与 password

页面右上角出现了一个「Authorize」按钮。

路径操作的右上角也出现了一个可以点击的小锁图标。

点击 Authorize 按钮,弹出授权表单,输入 username 与 password 及其它可选字段

本例使用 OAuth2 的 Password 流以及 Bearer 令牌(Token)。为此要使用 OAuth2PasswordBearer 类。

Bearer 令牌不是唯一的选择。它是适用于绝大多数用例的最佳方案,除非您是 OAuth2 的专家,知道为什么其它方案更合适。

创建 OAuth2PasswordBearer 的类实例时,要传递 tokenUrl 参数。该参数包含客户端(用户浏览器中运行的前端) 的 URL,用于发送 username 与 password,并获取令牌。当前URL:https://example.com/token

该参数不会创建端点或路径操作,但会声明客户端用来获取令牌的 URL /token 。此信息用于 OpenAPI 及 API 文档。

@app.get("/items/")
async def read_items(token: Annotated[str, Depends(oauth2_scheme)]):
    return {"token": token}

该依赖项使用字符串(str)接收路径操作函数的参数 token

实现的操作

FastAPI 校验请求中的 Authorization 请求头,核对请求头的值是不是由 Bearer + 令牌组成, 并返回令牌字符串(str)。

如果没有找到 Authorization 请求头,或请求头的值不是 Bearer + 令牌。FastAPI 直接返回 401 错误状态码(UNAUTHORIZED)。

开发者不需要检查错误信息,查看令牌是否存在,只要该函数能够执行,函数中就会包含令牌字符串。

2、获取当前用户

创建用户模型

from typing import Annotated
from fastapi import Depends, FastAPI
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel

app = FastAPI()

oauth2_schema = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: str
    full_name: str | None = None
    disbaled: bool | None = None


def fake_decode_token(token):
    return User(username=token + "fakedecoded", email="admin@admin.com", full_name="admin")

async def get_current_user(token: Annotated[str, Depends(oauth2_schema)]):
    user = fake_decode_token(token)
    return user


@app.get("/users/me")
async def read_user_me(current_user: Annotated[User, Depends(get_current_user)]):
    return current_user

理解:current_user: Annotated[User, Depends(get_current_user)] 使用注入依赖项get_current_user的返回 赋值给参数current_user

3、OAuth2 实现简单的 Password 和 Bearer 验证

from typing import Annotated
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel


fake_users_db  = {
    "bob": {
        "username": "bob",
        "full_name": "Bob",
        "email": "bob@qq.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False
    },
    "alice": {
        "username": "tom",
        "full_name": "Tom",
        "email": "tom@qq.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True
    }
}

app = FastAPI()

def fake_hash_password(password: str):
    return "fakehashed" + password


oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")


class User(BaseModel):
    username: str
    emial: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserInDB(User):
    hashed_password: str
    

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def fake_decode_token(token):
    user = get_user(fake_users_db, token)
    return user


async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    user = fake_decode_token(token)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user

async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login(form_data: Annotated[OAuth2PasswordRequestForm, Depends()]):
    user_dict = fake_users_db.get(form_data.username)
    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")
    user = UserInDB(**user_dict)
    hashed_password = fake_hash_password(form_data.password)
    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return current_user

4、OAuth2 实现密码哈希与 Bearer JWT 令牌验证

安装依赖

pip install pyjwt
pip install passlib[bcrypt]
from datetime import datetime, timedelta, timezone
from typing import Annotated
import jwt
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}

class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: str | None = None


class User(BaseModel):
    username: str
    email: str | None = None
    full_name: str | None = None
    disabled: bool | None = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()

def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password):
    return pwd_context.hash(password)

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

async def get_current_user(token: Annotated[str, Depends(oauth2_scheme)]):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except InvalidTokenError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/token")
async def login_for_access_token(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()]
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")


@app.get("/users/me/", response_model=User)
async def read_users_me(
    current_user: Annotated[User, Depends(get_current_active_user)]
):
    return current_user

@app.get("/users/me/items/")
async def read_own_items(
    current_user: Annotated[User, Depends(get_current_active_user)],
):
    return [{"item_id": "Foo", "owner": current_user.username}]

用户名: johndoe 密码: secret

5、中间件

"中间件"是一个函数,它在每个请求被特定的路径操作处理之前,以及在每个响应返回之前工作.

  • 它接收你的应用程序的每一个请求.
  • 然后它可以对这个请求做一些事情或者执行任何需要的代码.
  • 然后它将请求传递给应用程序的其他部分 (通过某种路径操作).
  • 然后它获取应用程序生产的响应 (通过某种路径操作).
  • 它可以对该响应做些什么或者执行任何需要的代码.
  • 然后它返回这个 响应.
import time
from fastapi import FastAPI, Request

app = FastAPI()

@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
    start_time = time.perf_counter()
    response = await call_next(request)
    process_time = time.perf_counter() - start_time
    response.headers["X-Process-Time"] = str(process_time)
    return response

6、跨域资源共享CORS

使用 CORSMiddleware

使用 CORSMiddleware 来配置它。

  • 导入 CORSMiddleware
  • 创建一个允许的源列表(由字符串组成)。
  • 将其作为「中间件」添加到你的 FastAPI 应用中。

也可以指定后端是否允许:

  • 凭证(授权 headers,Cookies 等)。
  • 特定的 HTTP 方法(POSTPUT)或者使用通配符 "*" 允许所有方法。
  • 特定的 HTTP headers 或者使用通配符 "*" 允许所有 headers。
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

app = FastAPI()


origins = [
    "http://localhost.tiangolo.com",
    "https://localhost.tiangolo.com",
    "http://localhost",
    "http://localhost:8080",
]

app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"]
)

@app.get("/")
async def main():
    return {"message": "Hello world"}
  • allow_origins - 一个允许跨域请求的源列表。例如 ['https://example.org', 'https://www.example.org']。你可以使用 ['*'] 允许任何源。
  • allow_origin_regex - 一个正则表达式字符串,匹配的源允许跨域请求。例如 'https://.*\.example\.org'
  • allow_methods - 一个允许跨域请求的 HTTP 方法列表。默认为 ['GET']。你可以使用 ['*'] 来允许所有标准方法。
  • allow_headers - 一个允许跨域请求的 HTTP 请求头列表。默认为 []。你可以使用 ['*'] 允许所有的请求头。AcceptAccept-LanguageContent-Language 以及 Content-Type 请求头总是允许 CORS 请求。
  • allow_credentials - 指示跨域请求支持 cookies。默认是 False。另外,允许凭证时 allow_origins 不能设定为 ['*'],必须指定源。
  • expose_headers - 指示可以被浏览器访问的响应头。默认为 []
  • max_age - 设定浏览器缓存 CORS 响应的最长时间,单位是秒。默认为 600