原文地址: https://mp.weixin.qq.com/s/O0Y5_Ovx8N84XGWc9fYL-Q
每次在 Flask 中自己实现身份验证,都感觉像是在用方轮子重新发明轮子。直到我遇到了 FastAPI Users——一个即插即用的身份验证和用户管理库。
它开箱即用地支持 JWT、OAuth2,甚至社交登录。再也不用面对那些样板代码的疯狂了。
| import sqlalchemy as sa
from fastapi import FastAPI, Depends
from fastapi_users import FastAPIUsers, models
from fastapi_users.db import SQLAlchemyUserDatabase
from fastapi_users.authentication import JWTAuthentication
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
from sqlalchemy.orm import sessionmaker, Session
# 创建数据库模型
Base: DeclarativeMeta = declarative_base()
class UserTable(Base, models.BaseUserTable):
# 可以在这里添加自定义字段
name = sa.Column(sa.String, nullable=True)
# 创建 FastAPI 应用
app = FastAPI()
# 数据库配置(示例)
DATABASE_URL = "sqlite:///./test.db"
engine = sa.create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# 创建表
Base.metadata.create_all(bind=engine)
# 用户模型定义
class User(models.BaseUser):
name: str = None
class UserCreate(models.BaseUserCreate):
name: str = None
class UserUpdate(models.BaseUserUpdate):
name: str = None
class UserDB(User, models.BaseUserDB):
pass
# 获取数据库会话
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 创建用户数据库适配器
def get_user_db(session: Session = Depends(get_db)):
yield SQLAlchemyUserDatabase(UserDB, session, UserTable)
# JWT 配置
SECRET = "YOUR_SECRET_KEY"
jwt_authentication = JWTAuthentication(
secret=SECRET,
lifetime_seconds=3600,
tokenUrl="auth/jwt/login"
)
# 初始化 FastAPI Users
fastapi_users = FastAPIUsers(
get_user_db,
[jwt_authentication],
User,
UserCreate,
UserUpdate,
UserDB,
)
# 包含认证路由
app.include_router(
fastapi_users.get_auth_router(jwt_authentication),
prefix="/auth/jwt",
tags=["auth"]
)
app.include_router(
fastapi_users.get_register_router(),
prefix="/auth",
tags=["auth"]
)
# 受保护的路由示例
@app.get("/protected-route")
async def protected_route(user: User = Depends(fastapi_users.current_user())):
return {"message": f"Hello {user.email}, you are authenticated!"}
# 运行:uvicorn main:app --reload
|
在 Flask 中发送邮件总感觉像是在拆炸弹——一个配置错误就会导致全盘皆输。FastAPI-Mail 让这一切变得轻松愉快。
| from fastapi import FastAPI, BackgroundTasks
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
from pydantic import EmailStr, BaseModel
from typing import List
app = FastAPI()
# 邮件配置
conf = ConnectionConfig(
MAIL_USERNAME = "your-email@gmail.com",
MAIL_PASSWORD = "your-app-password", # 注意:使用应用专用密码
MAIL_FROM = "your-email@gmail.com",
MAIL_PORT = 587,
MAIL_SERVER = "smtp.gmail.com",
MAIL_TLS = True,
MAIL_SSL = False,
USE_CREDENTIALS = True,
VALIDATE_CERTS = True
)
fm = FastMail(conf)
# 邮件数据模型
class EmailSchema(BaseModel):
email: List[EmailStr]
subject: str = "FastAPI Mail"
body: str
async def send_email_async(email: EmailSchema):
message = MessageSchema(
subject=email.subject,
recipients=email.email,
body=email.body,
subtype="html"# 或 "plain"
)
await fm.send_message(message)
@app.post("/send-email")
async def send_email(
background_tasks: BackgroundTasks,
email_data: EmailSchema
):
"""
异步发送邮件
"""
background_tasks.add_task(send_email_async, email_data)
return {"message": "邮件已加入发送队列"}
# 同步发送示例
@app.post("/send-email-sync")
async def send_email_sync(email_data: EmailSchema):
message = MessageSchema(
subject=email_data.subject,
recipients=email_data.email,
body=email_data.body,
)
await fm.send_message(message)
return {"message": "邮件发送成功"}
# 发送带附件的邮件
@app.post("/send-email-with-attachment")
async def send_with_attachment(email_data: EmailSchema):
message = MessageSchema(
subject=email_data.subject,
recipients=email_data.email,
body=email_data.body,
attachments=[{
"file": "path/to/file.pdf",
"filename": "document.pdf"
}]
)
await fm.send_message(message)
return {"message": "带附件的邮件发送成功"}
|
以前我把 SocketIO 硬塞进 Flask 时,总感觉像是在自行车上绑火箭。有了 FastAPI-SocketIO,实时功能变得如此自然。
| from fastapi import FastAPI
from fastapi_socketio import SocketManager
from typing import Optional
import asyncio
app = FastAPI()
socket_manager = SocketManager(app=app, mount_location="/ws/")
# 连接事件
@socket_manager.on("connect")
async def handle_connect(sid, environ, auth):
print(f"客户端 {sid} 已连接")
await socket_manager.emit("welcome", {"msg": "欢迎加入聊天室"}, to=sid)
# 消息事件
@socket_manager.on("message")
async def handle_message(sid, data):
print(f"来自 {sid} 的消息: {data}")
# 广播给所有客户端
await socket_manager.emit("response", {
"from": sid[:8], # 显示短ID
"msg": data["message"]
})
# 私聊示例
@socket_manager.on("private_message")
async def handle_private_message(sid, data):
target_sid = data.get("target_sid")
message = data.get("message")
if target_sid:
await socket_manager.emit("private", {
"from": sid[:8],
"msg": message
}, to=target_sid)
else:
await socket_manager.emit("error", {"msg": "目标用户未指定"}, to=sid)
# 断开连接事件
@socket_manager.on("disconnect")
async def handle_disconnect(sid):
print(f"客户端 {sid} 已断开连接")
# 通知其他用户
await socket_manager.emit("user_left", {"user": sid[:8]})
# HTTP 端点与 WebSocket 结合
@app.get("/online-users")
async def get_online_users():
"""获取在线用户列表"""
# socket_manager.server.rooms 包含连接信息
return {"online_count": len(socket_manager.get_participants("/"))}
# 从 HTTP 端点触发 WebSocket 事件
@app.post("/broadcast")
async def broadcast_message(message: str):
"""向所有连接的客户端广播消息"""
await socket_manager.emit("broadcast", {
"from": "server",
"msg": message,
"timestamp": asyncio.get_event_loop().time()
})
return {"status": "广播发送成功"}
|
我构建的每个公共 API 最终都会遇到滥用问题。在 Flask 中,我只能凑合着写中间件。在 FastAPI 中,我只需添加 FastAPI-Limiter。
| from fastapi import FastAPI, Depends, Request, HTTPException
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
import aioredis
import asyncio
app = FastAPI()
@app.on_event("startup")
async def startup():
"""
初始化 Redis 连接和限流器
"""
redis = await aioredis.create_redis_pool("redis://localhost")
await FastAPILimiter.init(redis)
# 基础限流:每分钟 5 次请求
@app.get("/api/data", dependencies=[Depends(RateLimiter(times=5, seconds=60))])
async def get_data():
return {"message": "您在速率限制内!"}
# 更复杂的限流策略
@app.get("/api/premium-data", dependencies=[Depends(RateLimiter(times=10, seconds=60))])
async def get_premium_data(request: Request):
"""
针对付费用户的高限额
"""
# 可以通过请求头或其他方式识别用户身份
user_type = request.headers.get("X-User-Type", "free")
if user_type == "premium":
# 动态调整限制
pass
return {"data": "高级数据内容"}
# 基于 IP 的限流
@app.get("/api/public")
@RateLimiter(times=2, seconds=30, key_func=lambda request: request.client.host)
async def public_api():
return {"message": "公共 API,限制较严格"}
# 异常处理
@app.exception_handler(429)
async def rate_limit_exception_handler(request: Request, exc):
"""
处理速率限制超出异常
"""
return JSONResponse(
status_code=429,
content={
"error": "请求过多",
"message": "请稍后再试",
"retry_after": 60# 建议的重试时间(秒)
}
)
# 监控端点(用于调试)
@app.get("/rate-limit-info")
async def rate_limit_info(request: Request):
"""
获取当前请求的限流信息
"""
client_ip = request.client.host
# 这里可以添加获取具体限流状态的逻辑
return {
"client_ip": client_ip,
"rate_limited": False,
"remaining": 5
}
|
Flask 中的缓存……只能说很有“创意”。有了 FastAPI-Cache,我根本不需要考虑它。
| from fastapi import FastAPI, Query
from fastapi_cache import FastAPICache, JsonCoder
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import aioredis
from datetime import timedelta
from typing import Optional
app = FastAPI()
@app.on_event("startup")
async def startup():
"""
初始化 Redis 缓存后端
"""
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
# 基础缓存:60秒过期
@app.get("/products")
@cache(expire=60)
async def get_products():
"""
获取产品列表 - 结果缓存60秒
"""
# 模拟数据库查询
await asyncio.sleep(2) # 模拟耗时操作
return {"products": ["手机", "电脑", "平板", "耳机"]}
# 带参数的缓存
@app.get("/product/{product_id}")
@cache(expire=30, key_builder=lambda *args, **kwargs: f"product:{kwargs['product_id']}")
async def get_product(product_id: int):
"""
获取单个产品信息
"""
await asyncio.sleep(1)
return {"id": product_id, "name": f"产品{product_id}", "price": 99.99}
# 条件缓存
@app.get("/search")
@cache(expire=60, unless=lambda response: response.status_code != 200)
async def search_products(
q: str = Query(None, min_length=1),
page: int = Query(1, ge=1)
):
"""
搜索产品 - 只在响应成功时缓存
"""
ifnot q:
return {"error": "请输入搜索关键词"}
await asyncio.sleep(1.5)
return {
"query": q,
"page": page,
"results": [f"{q}结果{i}"for i in range(10)],
"total": 100
}
# 手动缓存操作
from fastapi_cache import FastAPICache
from fastapi_cache.coder import JsonCoder
@app.get("/manual-cache")
async def manual_cache_demo():
"""
手动缓存控制示例
"""
backend = FastAPICache.get_backend()
cache_key = "manual:data"
# 尝试从缓存获取
cached_data = await backend.get(cache_key)
if cached_data:
return {"source": "cache", "data": JsonCoder().decode(cached_data)}
# 生成新数据
new_data = {"id": 1, "value": "新生成的数据"}
# 存入缓存
await backend.set(cache_key, JsonCoder().encode(new_data), expire=120)
return {"source": "database", "data": new_data}
# 清除缓存
@app.post("/clear-cache/{pattern}")
async def clear_cache(pattern: str = "*"):
"""
清除匹配模式的缓存
"""
backend = FastAPICache.get_backend()
if isinstance(backend, RedisBackend):
redis_client = backend.redis
keys = await redis_client.keys(f"{FastAPICache.get_prefix()}{pattern}")
if keys:
await redis_client.delete(*keys)
return {"cleared": len(keys)}
return {"cleared": 0}
|
在 Flask 中,我总是需要手动编写 CRUD 端点。有了 FastAPI-CrudRouter,我确实能节省数小时。
| from fastapi import FastAPI
from fastapi_crudrouter import SQLAlchemyCRUDRouter
from pydantic import BaseModel
from typing import Optional, List
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from datetime import datetime
# 创建数据库模型
Base = declarative_base()
class ItemModel(Base):
__tablename__ = "items"
id = sa.Column(sa.Integer, primary_key=True, index=True)
name = sa.Column(sa.String, index=True)
description = sa.Column(sa.String, nullable=True)
price = sa.Column(sa.Float)
created_at = sa.Column(sa.DateTime, default=datetime.utcnow)
updated_at = sa.Column(sa.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Pydantic 模型
class ItemBase(BaseModel):
name: str
description: Optional[str] = None
price: float
class ItemCreate(ItemBase):
pass
class ItemUpdate(ItemBase):
name: Optional[str] = None
price: Optional[float] = None
class ItemResponse(ItemBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
orm_mode = True
# 数据库配置
DATABASE_URL = "sqlite:///./crud.db"
engine = sa.create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app = FastAPI()
# 神奇的一行:创建完整 CRUD 路由
item_router = SQLAlchemyCRUDRouter(
schema=ItemResponse,
create_schema=ItemCreate,
update_schema=ItemUpdate,
db_model=ItemModel,
db=get_db,
prefix="items",
tags=["商品管理"]
)
app.include_router(item_router)
# 扩展功能:自定义路由
@app.get("/items/search/{keyword}")
def search_items(keyword: str, db: Session = Depends(get_db)):
"""
自定义搜索端点
"""
results = db.query(ItemModel).filter(ItemModel.name.contains(keyword)).all()
return results
# 复杂查询示例
@app.get("/items/expensive/{min_price}")
def get_expensive_items(min_price: float, db: Session = Depends(get_db)):
"""
获取价格高于指定值的商品
"""
items = db.query(ItemModel).filter(ItemModel.price >= min_price).order_by(ItemModel.price.desc()).all()
return items
# 统计端点
@app.get("/items/stats")
def get_item_stats(db: Session = Depends(get_db)):
"""
获取商品统计信息
"""
total_items = db.query(ItemModel).count()
total_value = db.query(sa.func.sum(ItemModel.price)).scalar() or0
avg_price = db.query(sa.func.avg(ItemModel.price)).scalar() or0
return {
"total_items": total_items,
"total_value": total_value,
"average_price": round(avg_price, 2),
"most_expensive": db.query(ItemModel).order_by(ItemModel.price.desc()).first()
}
|
最后,FastAPI-Plugins 感觉像是把所有工具打包进了一个箱子里。它为你提供了 Redis、调度器、缓存和日志记录,全部开箱即用。
| from fastapi import FastAPI, Depends
from fastapi_plugins import (
RedisSettings,
depends_redis,
redis_plugin,
RedisPlugin
)
from fastapi_plugins.cache import cache_plugin, CacheSettings
import aioredis
from contextlib import asynccontextmanager
from typing import Any
# 配置设置
class AppSettings(RedisSettings, CacheSettings):
api_name: str = "fastapi-plugins-demo"
redis_url: str = "redis://localhost:6379/0"
cache_ttl: int = 300# 缓存过期时间(秒)
settings = AppSettings()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期管理
"""
# 启动时
await redis_plugin.init_app(app, config=settings)
await redis_plugin.init()
await cache_plugin.init_app(app, config=settings)
await cache_plugin.init()
yield
# 关闭时
await redis_plugin.terminate()
await cache_plugin.terminate()
app = FastAPI(lifespan=lifespan)
# Redis 操作示例
@app.get("/redis-demo")
async def redis_demo(redis: aioredis.Redis = Depends(depends_redis)):
"""
演示基本的 Redis 操作
"""
# 设置值
await redis.set("my_key", "Hello from FastAPI!")
# 获取值
value = await redis.get("my_key")
# 设置过期时间
await redis.setex("temp_key", 60, "临时数据")
# 增加计数器
await redis.incr("counter")
counter = await redis.get("counter")
# 存储列表
await redis.lpush("my_list", "item1", "item2", "item3")
list_items = await redis.lrange("my_list", 0, -1)
# 存储哈希
await redis.hset("user:1000", mapping={"name": "Alice", "age": "30"})
user_data = await redis.hgetall("user:1000")
return {
"simple_value": value.decode() if value elseNone,
"counter": counter.decode() if counter elseNone,
"list_items": [item.decode() for item in list_items],
"user_data": {k.decode(): v.decode() for k, v in user_data.items()}
}
# 缓存示例
from fastapi_plugins.cache import depends_cache
@app.get("/cached-data")
@cache_plugin.cached(ttl=60)
async def get_cached_data():
"""
自动缓存响应的端点
"""
# 模拟耗时操作
import asyncio
await asyncio.sleep(2)
return {"data": "这是缓存的数据", "timestamp": datetime.utcnow().isoformat()}
# 发布/订阅示例
@app.get("/publish/{channel}")
async def publish_message(
channel: str,
message: str,
redis: aioredis.Redis = Depends(depends_redis)
):
"""
向 Redis 频道发布消息
"""
subscribers = await redis.publish(channel, message)
return {"channel": channel, "message": message, "subscribers": subscribers}
# 监控端点
@app.get("/redis-info")
async def redis_info(redis: aioredis.Redis = Depends(depends_redis)):
"""
获取 Redis 服务器信息
"""
info = await redis.info()
# 获取所有键
keys = await redis.keys("*")
return {
"redis_version": info.get("redis_version"),
"connected_clients": info.get("connected_clients"),
"used_memory_human": info.get("used_memory_human"),
"total_keys": len(keys),
"sample_keys": [key.decode() for key in keys[:10]] if keys else []
}
# 任务调度示例(需安装额外依赖)
try:
from fastapi_plugins.scheduler import scheduler_plugin, SchedulerSettings
class ExtendedSettings(AppSettings, SchedulerSettings):
pass
settings = ExtendedSettings()
@scheduler_plugin.task("interval", seconds=30)
async def scheduled_task():
"""
每30秒运行一次的定时任务
"""
print(f"定时任务执行于 {datetime.utcnow()}")
# 这里可以添加清理缓存、发送报告等逻辑
@app.get("/scheduler/jobs")
async def get_scheduled_jobs():
"""
获取所有计划任务
"""
scheduler = scheduler_plugin.get_scheduler()
jobs = scheduler.get_jobs()
return {"jobs": [str(job) for job in jobs]}
except ImportError:
print("注意:未安装调度器扩展,相关功能不可用")
|