跳转至

我的博客

风控模型的一般步骤

在开始数据建模工作之前,博主其实看了蛮多关于数据挖掘的书籍。但是,真正从事数据建模工作后,可以说只能用 “纸上谈兵” 来形容自己,我甚至搞不清楚在数据建模分为几个阶段,要先做什么后做什么,模型原理更是一知半解。随着自我的学习以及大佬们的指教,慢慢懂得数据挖掘工作一般步骤与套路,熟记这些建模的步骤与讨论,能让你明确当前任务是什么,下一步任务又是什么,每一步任务中应该注意的一些问题。

下面内容为整理的对建模的一般步骤,内容也仅作参考。值得注意的是,这里的一般步骤只是建模的常规套路,关于模型的原理部分还需要自己去深入了解。

新建 Nextra 项目

参考文档: https://nextra.site/docs/docs-theme/start

Nextra 是一个基于 Next.js 的静态站点生成器,专注于文档和博客。它提供了一个简单的界面来创建和管理内容,同时利用 Next.js 的强大功能来构建高性能的网站。

我一直使用 Mkdocs-Material 来搭建自己的个人博客站点,但近期学习 NextJS 框架,决定研究下 Nextra 部署自己的文档站点。另外一个原因就是,Mkdocs-Material 有一部分功能并不开源,而 nextra 完全开源,且采用 MDX 语法,支持在文档中引用 React 组件,从而大大提高了交互性(如果文档需要展示一些交互内容的话,那么强烈推荐这个框架)。

注意

Mkdocs-Material 已经处于维护状态,原作者基于 Mkdocs-Material 开发了一个新的项目 Zensical,有兴趣的同学可以去研究下 Zensical 的使用。

1. 初始化项目

首先我们先使用 Next.JS 初始化一个项目,

cd mononote  # 如果是 Github 项目,建议仓库不包含任何文件
npx create-next-app@latest .

接着安装 Nextra 及其相关依赖:

npm i next react react-dom nextra nextra-theme-docs

执行完成后,可以执行 npm run dev 来启动开发服务,浏览器访问 http://localhost:3000 就可以看到页面了。

使用 turbopack

如果你使用的是 Next.js 13.4 版本以上,建议使用 turbopack 来提升开发体验。可以在 package.json 中将 dev 脚本修改为:

"scripts": {
  "dev": "next dev --turbopack"
}

2. 配置 Nextra

next.config.ts
import nextra from 'nextra'

// Set up Nextra with its configuration
const withNextra = nextra({
  // ... Add Nextra-specific options here
})

// Export the final Next.js config with Nextra included
export default withNextra({
  // ... Add regular Next.js options here
})

3. 添加 mdx-components 文件

在项目根目录下创建一个 mdx-components.tsx 文件,用于定义 MDX 组件:

mdx-components.tsx
import { useMDXComponents as getThemeComponents } from 'nextra-theme-docs' // nextra-theme-blog or your custom theme

// Get the default MDX components
const themeComponents = getThemeComponents()

// Merge components
export function useMDXComponents(components) {
  return {
    ...themeComponents,
    ...components
  }
}

4. 设置搜索

  1. 安装 pagefind 依赖
npm i -D pagefind
  1. 添加 pagefind 配置

Pagefind 索引 .html 文件,因此索引必须在构建应用程序后进行。

package.json
1
2
3
4
5
{
  "scripts": {
    "postbuild": "next build && pagefind --site .next/server/app --output-path public/_pagefind",
  }
}
  1. _pagefind/ 添加到您的 .gitignore 文件以避免提交生成的索引文件。

  2. 验证输出索引:构建并运行 postbuild 脚本后,检查 public/out/ 中是否存在 _pagefind/ 目录。启动您的应用程序并测试搜索栏以确认一切正常。

5. 创建根布局

接下来,我们在 app 目录下创建一个 layout.tsx 文件,用于定义根布局:

app/layout.tsx
import { Footer, Layout, Navbar } from 'nextra-theme-docs'
import { Banner, Head } from 'nextra/components'
import { getPageMap } from 'nextra/page-map'
import 'nextra-theme-docs/style.css'

export const metadata = {
  // Define your metadata here
  // For more information on metadata API, see: https://nextjs.org/docs/app/building-your-application/optimizing/metadata
}

const banner = <Banner storageKey="some-key">Nextra 4.0 is released 🎉</Banner>
const navbar = (
  <Navbar
    logo={<b>Nextra</b>}
    // ... Your additional navbar options
  />
)
const footer = <Footer>MIT {new Date().getFullYear()} © Nextra.</Footer>

export default async function RootLayout({ children }) {
  return (
    <html
      // Not required, but good for SEO
      lang="en"
      // Required to be set
      dir="ltr"
      // Suggested by `next-themes` package https://github.com/pacocoursey/next-themes#with-app
      suppressHydrationWarning
    >
      <Head
      // ... Your additional head options
      >
        {/* Your additional tags should be passed as `children` of `<Head>` element */}
      </Head>
      <body>
        <Layout
          banner={banner}
          navbar={navbar}
          pageMap={await getPageMap()}
          docsRepositoryBase="https://github.com/shuding/nextra/tree/main/docs"
          footer={footer}
          // ... Your additional layout options
        >
          {children}
        </Layout>
      </body>
    </html>
  )
}

6. 渲染 MDX 文件

有 2 种方式可以渲染 MDX 文件:

  1. 使用 .mdx 后缀的文件,Nextra 默认支持这种文件格式,可以直接渲染。
  2. 使用 content 目录,并将 .mdx 文件放在 content 目录下。

这里推荐第 2 种方式,更方便对文档进行维护和管理。此外,我后续希望给站点添加一个登录功能,所有这里我们新建一个 docs 专门用于 MDX 文档渲染,而 content 目录专门用于存放文档内容。

项目结构
app/
  (auth)/
    page.tsx
  docs/
    [[...mdxPath]]/
      page.jsx
    layout.tsx
content/

7. 运行项目

最后,我们再次执行 npm run dev 来启动开发服务,浏览器访问 http://localhost:3000 就可以看到 Nextra 框架搭建的文档站点了。

8. 文档界面优化

启动后界面有 6 处地方显示需要进行优化,可在 Layout 组件中通过参数进行调整:

  1. 编辑此页
  2. 更新此页
  3. 反馈信息
  4. 复制本页面
  5. 搜索框信息
  6. 暗黑模式的选项
app/docs/layout.tsx
import { Footer, Layout, Navbar, LastUpdated } from 'nextra-theme-docs'
import { Search } from 'nextra/components'
import { getPageMap } from 'nextra/page-map'
import 'nextra-theme-docs/style.css'

const navbar = (
  <Navbar
    logo={<b>Mononote</b>}
    // ... Your additional navbar options
  />
)
const footer = <Footer>MIT {new Date().getFullYear()} © Nextra.</Footer>

export default async function RootLayout({ children }: { children: React.ReactNode }) {
  return (
    <div>
      <Layout
        navbar={navbar}
        pageMap={await getPageMap()}
        docsRepositoryBase="https://github.com/shuding/nextra/tree/main/docs"
        footer={footer}
        editLink={false}
        feedback={{content: null}}
        lastUpdated={
          <LastUpdated locale='zh'>更新于</LastUpdated>
        }
        copyPageButton={false}
        search={<Search placeholder="搜索" />}
        themeSwitch={{
            dark: '暗黑',
            light: '明亮',
            system: '系统'
        }}
      >
        {children}
      </Layout>
    </div>
  )
}

9. 添加登录功能

安装 shadcn/ui 组件库:

npm install tailwindcss @tailwindcss/postcss postcss
npm install react-hook-form next-auth tailwind-merge @hookform/resolvers zod class-variance-authority @radix-ui/react-slot
npm install lucide-react
npx shadcn@latest add sonner input button label field card input-group

错误

Something went wrong. Please check the error below for more details. If the problem persists, please open an issue on GitHub.

Command failed with exit code 1: npm install sonner next-themes clsx tailwind-merge 'shadcn@latest' class-variance-authority tw-animate-css radix-ui lucide-react

npm error code EEXIST npm error syscall rename npm error path ~/.npm/_cacache/tmp/602eb3c2 npm error dest ~/.npm/_cacache/content-v2/sha512/4c/7e/4e3e6489ec4444e98e0f979fef269e8889b0a4c0de4f072b4a1cd97106736e703be161e09d4be63a22124cb551d92997665b992200ae8bc49f808b2002a7 npm error errno EEXIST npm error Invalid response body while trying to fetch https://registry.npmjs.org/@radix-ui%2freact-toggle-group: EACCES: permission denied, rename '~/.npm/_cacache/tmp/602eb3c2' -> '~/.npm/_cacache/content-v2/sha512/4c/7e/4e3e6489ec4444e98e0f979fef269e8889b0a4c0de4f072b4a1cd97106736e703be161e09d4be63a22124cb551d92997665b992200ae8bc49f808b2002a7' npm error File exists: ~/.npm/_cacache/content-v2/sha512/4c/7e/4e3e6489ec4444e98e0f979fef269e8889b0a4c0de4f072b4a1cd97106736e703be161e09d4be63a22124cb551d92997665b992200ae8bc49f808b2002a7 npm error Remove the existing file and try again, or run npm npm error with --force to overwrite files recklessly. npm error A complete log of this run can be found in: ~/.npm/_logs/2026-03-17T07_40_13_141Z-debug-0.log

这是因为电脑上的 npm 缓存文件权限冲突 导致的,执行 sudo npm cache clean --force

参考文档:

7个提高FastAPI开发效率的扩展

原文地址: https://mp.weixin.qq.com/s/O0Y5_Ovx8N84XGWc9fYL-Q

1. FastAPI Users

每次在 Flask 中自己实现身份验证,都感觉像是在用方轮子重新发明轮子。直到我遇到了 FastAPI Users——一个即插即用的身份验证和用户管理库。

它开箱即用地支持 JWT、OAuth2,甚至社交登录。再也不用面对那些样板代码的疯狂了。

import sqlalchemy as sa
from fastapi import FastAPI, Depends
from fastapi_users import FastAPIUsers, models
from fastapi_users.db import SQLAlchemyUserDatabase
from fastapi_users.authentication import JWTAuthentication
from sqlalchemy.ext.declarative import DeclarativeMeta, declarative_base
from sqlalchemy.orm import sessionmaker, Session

# 创建数据库模型
Base: DeclarativeMeta = declarative_base()

class UserTable(Base, models.BaseUserTable):
    # 可以在这里添加自定义字段
    name = sa.Column(sa.String, nullable=True)

# 创建 FastAPI 应用
app = FastAPI()

# 数据库配置(示例)
DATABASE_URL = "sqlite:///./test.db"
engine = sa.create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

# 创建表
Base.metadata.create_all(bind=engine)

# 用户模型定义
class User(models.BaseUser):
    name: str = None

class UserCreate(models.BaseUserCreate):
    name: str = None

class UserUpdate(models.BaseUserUpdate):
    name: str = None

class UserDB(User, models.BaseUserDB):
    pass

# 获取数据库会话
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

# 创建用户数据库适配器
def get_user_db(session: Session = Depends(get_db)):
    yield SQLAlchemyUserDatabase(UserDB, session, UserTable)

# JWT 配置
SECRET = "YOUR_SECRET_KEY"
jwt_authentication = JWTAuthentication(
    secret=SECRET, 
    lifetime_seconds=3600,
    tokenUrl="auth/jwt/login"
)

# 初始化 FastAPI Users
fastapi_users = FastAPIUsers(
    get_user_db,
    [jwt_authentication],
    User,
    UserCreate,
    UserUpdate,
    UserDB,
)

# 包含认证路由
app.include_router(
    fastapi_users.get_auth_router(jwt_authentication),
    prefix="/auth/jwt",
    tags=["auth"]
)

app.include_router(
    fastapi_users.get_register_router(),
    prefix="/auth",
    tags=["auth"]
)

# 受保护的路由示例
@app.get("/protected-route")
async def protected_route(user: User = Depends(fastapi_users.current_user())):
    return {"message": f"Hello {user.email}, you are authenticated!"}

# 运行:uvicorn main:app --reload

2. FastAPI Mail

在 Flask 中发送邮件总感觉像是在拆炸弹——一个配置错误就会导致全盘皆输。FastAPI-Mail 让这一切变得轻松愉快。

from fastapi import FastAPI, BackgroundTasks
from fastapi_mail import FastMail, MessageSchema, ConnectionConfig
from pydantic import EmailStr, BaseModel
from typing import List

app = FastAPI()

# 邮件配置
conf = ConnectionConfig(
    MAIL_USERNAME = "your-email@gmail.com",
    MAIL_PASSWORD = "your-app-password",  # 注意:使用应用专用密码
    MAIL_FROM = "your-email@gmail.com",
    MAIL_PORT = 587,
    MAIL_SERVER = "smtp.gmail.com",
    MAIL_TLS = True,
    MAIL_SSL = False,
    USE_CREDENTIALS = True,
    VALIDATE_CERTS = True
)

fm = FastMail(conf)

# 邮件数据模型
class EmailSchema(BaseModel):
    email: List[EmailStr]
    subject: str = "FastAPI Mail"
    body: str

async def send_email_async(email: EmailSchema):
    message = MessageSchema(
        subject=email.subject,
        recipients=email.email,
        body=email.body,
        subtype="html"# 或 "plain"
    )
    await fm.send_message(message)

@app.post("/send-email")
async def send_email(
    background_tasks: BackgroundTasks, 
    email_data: EmailSchema
):
    """
    异步发送邮件
    """
    background_tasks.add_task(send_email_async, email_data)
    return {"message": "邮件已加入发送队列"}

# 同步发送示例
@app.post("/send-email-sync")
async def send_email_sync(email_data: EmailSchema):
    message = MessageSchema(
        subject=email_data.subject,
        recipients=email_data.email,
        body=email_data.body,
    )
    await fm.send_message(message)
    return {"message": "邮件发送成功"}

# 发送带附件的邮件
@app.post("/send-email-with-attachment")
async def send_with_attachment(email_data: EmailSchema):
    message = MessageSchema(
        subject=email_data.subject,
        recipients=email_data.email,
        body=email_data.body,
        attachments=[{
            "file": "path/to/file.pdf",
            "filename": "document.pdf"
        }]
    )
    await fm.send_message(message)
    return {"message": "带附件的邮件发送成功"}

3. FastAPI SocketIO

以前我把 SocketIO 硬塞进 Flask 时,总感觉像是在自行车上绑火箭。有了 FastAPI-SocketIO,实时功能变得如此自然。

from fastapi import FastAPI
from fastapi_socketio import SocketManager
from typing import Optional
import asyncio

app = FastAPI()
socket_manager = SocketManager(app=app, mount_location="/ws/")

# 连接事件
@socket_manager.on("connect")
async def handle_connect(sid, environ, auth):
    print(f"客户端 {sid} 已连接")
    await socket_manager.emit("welcome", {"msg": "欢迎加入聊天室"}, to=sid)

# 消息事件
@socket_manager.on("message")
async def handle_message(sid, data):
    print(f"来自 {sid} 的消息: {data}")
    # 广播给所有客户端
    await socket_manager.emit("response", {
        "from": sid[:8],  # 显示短ID
        "msg": data["message"]
    })

# 私聊示例
@socket_manager.on("private_message")
async def handle_private_message(sid, data):
    target_sid = data.get("target_sid")
    message = data.get("message")
    
    if target_sid:
        await socket_manager.emit("private", {
            "from": sid[:8],
            "msg": message
        }, to=target_sid)
    else:
        await socket_manager.emit("error", {"msg": "目标用户未指定"}, to=sid)

# 断开连接事件
@socket_manager.on("disconnect")
async def handle_disconnect(sid):
    print(f"客户端 {sid} 已断开连接")
    # 通知其他用户
    await socket_manager.emit("user_left", {"user": sid[:8]})

# HTTP 端点与 WebSocket 结合
@app.get("/online-users")
async def get_online_users():
    """获取在线用户列表"""
    # socket_manager.server.rooms 包含连接信息
    return {"online_count": len(socket_manager.get_participants("/"))}

# 从 HTTP 端点触发 WebSocket 事件
@app.post("/broadcast")
async def broadcast_message(message: str):
    """向所有连接的客户端广播消息"""
    await socket_manager.emit("broadcast", {
        "from": "server",
        "msg": message,
        "timestamp": asyncio.get_event_loop().time()
    })
    return {"status": "广播发送成功"}

4. FastAPI Limiter

我构建的每个公共 API 最终都会遇到滥用问题。在 Flask 中,我只能凑合着写中间件。在 FastAPI 中,我只需添加 FastAPI-Limiter。

from fastapi import FastAPI, Depends, Request, HTTPException
from fastapi_limiter import FastAPILimiter
from fastapi_limiter.depends import RateLimiter
import aioredis
import asyncio

app = FastAPI()

@app.on_event("startup")
async def startup():
    """
    初始化 Redis 连接和限流器
    """
    redis = await aioredis.create_redis_pool("redis://localhost")
    await FastAPILimiter.init(redis)

# 基础限流:每分钟 5 次请求
@app.get("/api/data", dependencies=[Depends(RateLimiter(times=5, seconds=60))])
async def get_data():
    return {"message": "您在速率限制内!"}

# 更复杂的限流策略
@app.get("/api/premium-data", dependencies=[Depends(RateLimiter(times=10, seconds=60))])
async def get_premium_data(request: Request):
    """
    针对付费用户的高限额
    """
    # 可以通过请求头或其他方式识别用户身份
    user_type = request.headers.get("X-User-Type", "free")
    
    if user_type == "premium":
        # 动态调整限制
        pass
    
    return {"data": "高级数据内容"}

# 基于 IP 的限流
@app.get("/api/public")
@RateLimiter(times=2, seconds=30, key_func=lambda request: request.client.host)
async def public_api():
    return {"message": "公共 API,限制较严格"}

# 异常处理
@app.exception_handler(429)
async def rate_limit_exception_handler(request: Request, exc):
    """
    处理速率限制超出异常
    """
    return JSONResponse(
        status_code=429,
        content={
            "error": "请求过多",
            "message": "请稍后再试",
            "retry_after": 60# 建议的重试时间(秒)
        }
    )

# 监控端点(用于调试)
@app.get("/rate-limit-info")
async def rate_limit_info(request: Request):
    """
    获取当前请求的限流信息
    """
    client_ip = request.client.host
    # 这里可以添加获取具体限流状态的逻辑
    return {
        "client_ip": client_ip,
        "rate_limited": False,
        "remaining": 5
    }

5. FastAPI Cache

Flask 中的缓存……只能说很有“创意”。有了 FastAPI-Cache,我根本不需要考虑它。

from fastapi import FastAPI, Query
from fastapi_cache import FastAPICache, JsonCoder
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import aioredis
from datetime import timedelta
from typing import Optional

app = FastAPI()

@app.on_event("startup")
async def startup():
    """
    初始化 Redis 缓存后端
    """
    redis = aioredis.from_url("redis://localhost")
    FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")

# 基础缓存:60秒过期
@app.get("/products")
@cache(expire=60)
async def get_products():
    """
    获取产品列表 - 结果缓存60秒
    """
    # 模拟数据库查询
    await asyncio.sleep(2)  # 模拟耗时操作
    return {"products": ["手机", "电脑", "平板", "耳机"]}

# 带参数的缓存
@app.get("/product/{product_id}")
@cache(expire=30, key_builder=lambda *args, **kwargs: f"product:{kwargs['product_id']}")
async def get_product(product_id: int):
    """
    获取单个产品信息
    """
    await asyncio.sleep(1)
    return {"id": product_id, "name": f"产品{product_id}", "price": 99.99}

# 条件缓存
@app.get("/search")
@cache(expire=60, unless=lambda response: response.status_code != 200)
async def search_products(
    q: str = Query(None, min_length=1),
    page: int = Query(1, ge=1)
):
    """
    搜索产品 - 只在响应成功时缓存
    """
    ifnot q:
        return {"error": "请输入搜索关键词"}
    
    await asyncio.sleep(1.5)
    return {
        "query": q,
        "page": page,
        "results": [f"{q}结果{i}"for i in range(10)],
        "total": 100
    }

# 手动缓存操作
from fastapi_cache import FastAPICache
from fastapi_cache.coder import JsonCoder

@app.get("/manual-cache")
async def manual_cache_demo():
    """
    手动缓存控制示例
    """
    backend = FastAPICache.get_backend()
    cache_key = "manual:data"
    
    # 尝试从缓存获取
    cached_data = await backend.get(cache_key)
    if cached_data:
        return {"source": "cache", "data": JsonCoder().decode(cached_data)}
    
    # 生成新数据
    new_data = {"id": 1, "value": "新生成的数据"}
    
    # 存入缓存
    await backend.set(cache_key, JsonCoder().encode(new_data), expire=120)
    
    return {"source": "database", "data": new_data}

# 清除缓存
@app.post("/clear-cache/{pattern}")
async def clear_cache(pattern: str = "*"):
    """
    清除匹配模式的缓存
    """
    backend = FastAPICache.get_backend()
    if isinstance(backend, RedisBackend):
        redis_client = backend.redis
        keys = await redis_client.keys(f"{FastAPICache.get_prefix()}{pattern}")
        if keys:
            await redis_client.delete(*keys)
            return {"cleared": len(keys)}
    return {"cleared": 0}

6. FastAPI CrudRouter

在 Flask 中,我总是需要手动编写 CRUD 端点。有了 FastAPI-CrudRouter,我确实能节省数小时。

from fastapi import FastAPI
from fastapi_crudrouter import SQLAlchemyCRUDRouter
from pydantic import BaseModel
from typing import Optional, List
import sqlalchemy as sa
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from datetime import datetime

# 创建数据库模型
Base = declarative_base()

class ItemModel(Base):
    __tablename__ = "items"
    
    id = sa.Column(sa.Integer, primary_key=True, index=True)
    name = sa.Column(sa.String, index=True)
    description = sa.Column(sa.String, nullable=True)
    price = sa.Column(sa.Float)
    created_at = sa.Column(sa.DateTime, default=datetime.utcnow)
    updated_at = sa.Column(sa.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

# Pydantic 模型
class ItemBase(BaseModel):
    name: str
    description: Optional[str] = None
    price: float

class ItemCreate(ItemBase):
    pass

class ItemUpdate(ItemBase):
    name: Optional[str] = None
    price: Optional[float] = None

class ItemResponse(ItemBase):
    id: int
    created_at: datetime
    updated_at: datetime
    
    class Config:
        orm_mode = True

# 数据库配置
DATABASE_URL = "sqlite:///./crud.db"
engine = sa.create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base.metadata.create_all(bind=engine)

def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

app = FastAPI()

# 神奇的一行:创建完整 CRUD 路由
item_router = SQLAlchemyCRUDRouter(
    schema=ItemResponse,
    create_schema=ItemCreate,
    update_schema=ItemUpdate,
    db_model=ItemModel,
    db=get_db,
    prefix="items",
    tags=["商品管理"]
)

app.include_router(item_router)

# 扩展功能:自定义路由
@app.get("/items/search/{keyword}")
def search_items(keyword: str, db: Session = Depends(get_db)):
    """
    自定义搜索端点
    """
    results = db.query(ItemModel).filter(ItemModel.name.contains(keyword)).all()
    return results

# 复杂查询示例
@app.get("/items/expensive/{min_price}")
def get_expensive_items(min_price: float, db: Session = Depends(get_db)):
    """
    获取价格高于指定值的商品
    """
    items = db.query(ItemModel).filter(ItemModel.price >= min_price).order_by(ItemModel.price.desc()).all()
    return items

# 统计端点
@app.get("/items/stats")
def get_item_stats(db: Session = Depends(get_db)):
    """
    获取商品统计信息
    """
    total_items = db.query(ItemModel).count()
    total_value = db.query(sa.func.sum(ItemModel.price)).scalar() or0
    avg_price = db.query(sa.func.avg(ItemModel.price)).scalar() or0
    
    return {
        "total_items": total_items,
        "total_value": total_value,
        "average_price": round(avg_price, 2),
        "most_expensive": db.query(ItemModel).order_by(ItemModel.price.desc()).first()
    }

7. FastAPI Plugins

最后,FastAPI-Plugins 感觉像是把所有工具打包进了一个箱子里。它为你提供了 Redis、调度器、缓存和日志记录,全部开箱即用。

from fastapi import FastAPI, Depends
from fastapi_plugins import (
    RedisSettings, 
    depends_redis, 
    redis_plugin,
    RedisPlugin
)
from fastapi_plugins.cache import cache_plugin, CacheSettings
import aioredis
from contextlib import asynccontextmanager
from typing import Any

# 配置设置
class AppSettings(RedisSettings, CacheSettings):
    api_name: str = "fastapi-plugins-demo"
    redis_url: str = "redis://localhost:6379/0"
    cache_ttl: int = 300# 缓存过期时间(秒)

settings = AppSettings()

@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    应用生命周期管理
    """
    # 启动时
    await redis_plugin.init_app(app, config=settings)
    await redis_plugin.init()
    await cache_plugin.init_app(app, config=settings)
    await cache_plugin.init()
    
    yield
    
    # 关闭时
    await redis_plugin.terminate()
    await cache_plugin.terminate()

app = FastAPI(lifespan=lifespan)

# Redis 操作示例
@app.get("/redis-demo")
async def redis_demo(redis: aioredis.Redis = Depends(depends_redis)):
    """
    演示基本的 Redis 操作
    """
    # 设置值
    await redis.set("my_key", "Hello from FastAPI!")
    
    # 获取值
    value = await redis.get("my_key")
    
    # 设置过期时间
    await redis.setex("temp_key", 60, "临时数据")
    
    # 增加计数器
    await redis.incr("counter")
    counter = await redis.get("counter")
    
    # 存储列表
    await redis.lpush("my_list", "item1", "item2", "item3")
    list_items = await redis.lrange("my_list", 0, -1)
    
    # 存储哈希
    await redis.hset("user:1000", mapping={"name": "Alice", "age": "30"})
    user_data = await redis.hgetall("user:1000")
    
    return {
        "simple_value": value.decode() if value elseNone,
        "counter": counter.decode() if counter elseNone,
        "list_items": [item.decode() for item in list_items],
        "user_data": {k.decode(): v.decode() for k, v in user_data.items()}
    }

# 缓存示例
from fastapi_plugins.cache import depends_cache

@app.get("/cached-data")
@cache_plugin.cached(ttl=60)
async def get_cached_data():
    """
    自动缓存响应的端点
    """
    # 模拟耗时操作
    import asyncio
    await asyncio.sleep(2)
    return {"data": "这是缓存的数据", "timestamp": datetime.utcnow().isoformat()}

# 发布/订阅示例
@app.get("/publish/{channel}")
async def publish_message(
    channel: str, 
    message: str,
    redis: aioredis.Redis = Depends(depends_redis)
):
    """
    向 Redis 频道发布消息
    """
    subscribers = await redis.publish(channel, message)
    return {"channel": channel, "message": message, "subscribers": subscribers}

# 监控端点
@app.get("/redis-info")
async def redis_info(redis: aioredis.Redis = Depends(depends_redis)):
    """
    获取 Redis 服务器信息
    """
    info = await redis.info()
    
    # 获取所有键
    keys = await redis.keys("*")
    
    return {
        "redis_version": info.get("redis_version"),
        "connected_clients": info.get("connected_clients"),
        "used_memory_human": info.get("used_memory_human"),
        "total_keys": len(keys),
        "sample_keys": [key.decode() for key in keys[:10]] if keys else []
    }

# 任务调度示例(需安装额外依赖)
try:
    from fastapi_plugins.scheduler import scheduler_plugin, SchedulerSettings
    
    class ExtendedSettings(AppSettings, SchedulerSettings):
        pass
    
    settings = ExtendedSettings()
    
    @scheduler_plugin.task("interval", seconds=30)
    async def scheduled_task():
        """
        每30秒运行一次的定时任务
        """
        print(f"定时任务执行于 {datetime.utcnow()}")
        # 这里可以添加清理缓存、发送报告等逻辑
    
    @app.get("/scheduler/jobs")
    async def get_scheduled_jobs():
        """
        获取所有计划任务
        """
        scheduler = scheduler_plugin.get_scheduler()
        jobs = scheduler.get_jobs()
        return {"jobs": [str(job) for job in jobs]}
    
except ImportError:
    print("注意:未安装调度器扩展,相关功能不可用")

授信模型

在信贷风控体系中,授信模型(Credit Limit Model)和 风险定价模型(Risk-Based Pricing Model)是贷前决策的“左膀右臂”。虽然它们都基于用户的信用评分(如 A 卡分数),但它们的 核心目标、输出结果、数学逻辑 以及 业务侧重点 有本质区别。

简单来说:

  • 授信模型 解决的是 “借多少?”(How Much)的问题,关注 额度上限损失敞口
  • 风险定价模型 解决的是 “多少钱借?”(At What Cost)的问题,关注 利率/费率风险覆盖

A卡

在信贷风控体系中,A 卡(Application Score Card,申请评分卡)和 B 卡(Behavior Score Card,行为评分卡)是两套核心模型,它们的根本区别在于数据维度、评估时机以及预测目标。

PreA卡

在信贷风控领域,Pre-A 卡(Pre-Application Score Card)并不是像 A 卡(申请评分卡)、B 卡(行为评分卡)或 C 卡(催收评分卡)那样广泛标准化的术语,但在实际业务和前沿探索中,它通常指 在用户正式提交贷款申请之前,基于已有数据对其信用风险进行初步评估的评分模型。

定义模型的目标

原文地址: https://mp.weixin.qq.com/s/_S-oB4D90sZjBBL42lzEqg

  • 模型上线两周,逾期率不降反升,业务部门天天来投诉。
  • KS值明明有0.4,为什么实际效果还不如规则引擎?
  • 同样的模型,在A客群表现优异,换到B客群就彻底失灵。

这些场景,是不是似曾相识?先定业务,再定模型——你的Y标签定义对了吗?