from fastapi import FastAPI, Depends
from fastapi_plugins import (
RedisSettings,
depends_redis,
redis_plugin,
RedisPlugin
)
from fastapi_plugins.cache import cache_plugin, CacheSettings
import aioredis
from contextlib import asynccontextmanager
from typing import Any
# 配置设置
class AppSettings(RedisSettings, CacheSettings):
api_name: str = "fastapi-plugins-demo"
redis_url: str = "redis://localhost:6379/0"
cache_ttl: int = 300# 缓存过期时间(秒)
settings = AppSettings()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
应用生命周期管理
"""
# 启动时
await redis_plugin.init_app(app, config=settings)
await redis_plugin.init()
await cache_plugin.init_app(app, config=settings)
await cache_plugin.init()
yield
# 关闭时
await redis_plugin.terminate()
await cache_plugin.terminate()
app = FastAPI(lifespan=lifespan)
# Redis 操作示例
@app.get("/redis-demo")
async def redis_demo(redis: aioredis.Redis = Depends(depends_redis)):
"""
演示基本的 Redis 操作
"""
# 设置值
await redis.set("my_key", "Hello from FastAPI!")
# 获取值
value = await redis.get("my_key")
# 设置过期时间
await redis.setex("temp_key", 60, "临时数据")
# 增加计数器
await redis.incr("counter")
counter = await redis.get("counter")
# 存储列表
await redis.lpush("my_list", "item1", "item2", "item3")
list_items = await redis.lrange("my_list", 0, -1)
# 存储哈希
await redis.hset("user:1000", mapping={"name": "Alice", "age": "30"})
user_data = await redis.hgetall("user:1000")
return {
"simple_value": value.decode() if value elseNone,
"counter": counter.decode() if counter elseNone,
"list_items": [item.decode() for item in list_items],
"user_data": {k.decode(): v.decode() for k, v in user_data.items()}
}
# 缓存示例
from fastapi_plugins.cache import depends_cache
@app.get("/cached-data")
@cache_plugin.cached(ttl=60)
async def get_cached_data():
"""
自动缓存响应的端点
"""
# 模拟耗时操作
import asyncio
await asyncio.sleep(2)
return {"data": "这是缓存的数据", "timestamp": datetime.utcnow().isoformat()}
# 发布/订阅示例
@app.get("/publish/{channel}")
async def publish_message(
channel: str,
message: str,
redis: aioredis.Redis = Depends(depends_redis)
):
"""
向 Redis 频道发布消息
"""
subscribers = await redis.publish(channel, message)
return {"channel": channel, "message": message, "subscribers": subscribers}
# 监控端点
@app.get("/redis-info")
async def redis_info(redis: aioredis.Redis = Depends(depends_redis)):
"""
获取 Redis 服务器信息
"""
info = await redis.info()
# 获取所有键
keys = await redis.keys("*")
return {
"redis_version": info.get("redis_version"),
"connected_clients": info.get("connected_clients"),
"used_memory_human": info.get("used_memory_human"),
"total_keys": len(keys),
"sample_keys": [key.decode() for key in keys[:10]] if keys else []
}
# 任务调度示例(需安装额外依赖)
try:
from fastapi_plugins.scheduler import scheduler_plugin, SchedulerSettings
class ExtendedSettings(AppSettings, SchedulerSettings):
pass
settings = ExtendedSettings()
@scheduler_plugin.task("interval", seconds=30)
async def scheduled_task():
"""
每30秒运行一次的定时任务
"""
print(f"定时任务执行于 {datetime.utcnow()}")
# 这里可以添加清理缓存、发送报告等逻辑
@app.get("/scheduler/jobs")
async def get_scheduled_jobs():
"""
获取所有计划任务
"""
scheduler = scheduler_plugin.get_scheduler()
jobs = scheduler.get_jobs()
return {"jobs": [str(job) for job in jobs]}
except ImportError:
print("注意:未安装调度器扩展,相关功能不可用")